Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its class_init function, like so:
40  * |[
41  * static void
42  * my_element_class_init (GstMyElementClass *klass)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * The #GstBaseSinkClass.unlock() method is called when the elements should
109  * unblock any blocking operations they perform in the
110  * #GstBaseSinkClass.render() method. This is mostly useful when the
111  * #GstBaseSinkClass.render() method performs a blocking write on a file
112  * descriptor, for example.
113  *
114  * The #GstBaseSink:max-lateness property affects how the sink deals with
115  * buffers that arrive too late in the sink. A buffer arrives too late in the
116  * sink when the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the
122  * #GstBaseSinkClass.get_times() method does not return a valid start time or
123  * max-lateness is set to -1 (the default).
124  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
125  * max-lateness value.
126  *
127  * The #GstBaseSink:qos property will enable the quality-of-service features of
128  * the basesink which gather statistics about the real-time performance of the
129  * clock synchronisation. For each buffer received in the sink, statistics are
130  * gathered and a QOS event is sent upstream with these numbers. This
131  * information can then be used by upstream elements to reduce their processing
132  * rate, for example.
133  *
134  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
135  * sink to never perform an ASYNC state change. This feature is mostly usable
136  * when dealing with non-synchronized streams or sparse streams.
137  *
138  * Last reviewed on 2007-08-29 (0.10.15)
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #  include "config.h"
143 #endif
144
145 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
146  * with newer GLib versions (>= 2.31.0) */
147 #define GLIB_DISABLE_DEPRECATION_WARNINGS
148 #include <gst/gst_private.h>
149
150 #include "gstbasesink.h"
151 #include <gst/gstmarshal.h>
152 #include <gst/gst-i18n-lib.h>
153
154 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
155 #define GST_CAT_DEFAULT gst_base_sink_debug
156
157 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
158    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
159
160 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
161
162 typedef struct
163 {
164   gboolean valid;               /* if this info is valid */
165   guint32 seqnum;               /* the seqnum of the STEP event */
166   GstFormat format;             /* the format of the amount */
167   guint64 amount;               /* the total amount of data to skip */
168   guint64 position;             /* the position in the stepped data */
169   guint64 duration;             /* the duration in time of the skipped data */
170   guint64 start;                /* running_time of the start */
171   gdouble rate;                 /* rate of skipping */
172   gdouble start_rate;           /* rate before skipping */
173   guint64 start_start;          /* start position skipping */
174   guint64 start_stop;           /* stop position skipping */
175   gboolean flush;               /* if this was a flushing step */
176   gboolean intermediate;        /* if this is an intermediate step */
177   gboolean need_preroll;        /* if we need preroll after this step */
178 } GstStepInfo;
179
180 struct _GstBaseSinkPrivate
181 {
182   gint qos_enabled;             /* ATOMIC */
183   gboolean async_enabled;
184   GstClockTimeDiff ts_offset;
185   GstClockTime render_delay;
186
187   /* start, stop of current buffer, stream time, used to report position */
188   GstClockTime current_sstart;
189   GstClockTime current_sstop;
190
191   /* start, stop and jitter of current buffer, running time */
192   GstClockTime current_rstart;
193   GstClockTime current_rstop;
194   GstClockTimeDiff current_jitter;
195   /* the running time of the previous buffer */
196   GstClockTime prev_rstart;
197
198   /* EOS sync time in running time */
199   GstClockTime eos_rtime;
200
201   /* last buffer that arrived in time, running time */
202   GstClockTime last_render_time;
203   /* when the last buffer left the sink, running time */
204   GstClockTime last_left;
205
206   /* running averages go here these are done on running time */
207   GstClockTime avg_pt;
208   GstClockTime avg_duration;
209   gdouble avg_rate;
210   GstClockTime avg_in_diff;
211
212   /* these are done on system time. avg_jitter and avg_render are
213    * compared to eachother to see if the rendering time takes a
214    * huge amount of the processing, If so we are flooded with
215    * buffers. */
216   GstClockTime last_left_systime;
217   GstClockTime avg_jitter;
218   GstClockTime start, stop;
219   GstClockTime avg_render;
220
221   /* number of rendered and dropped frames */
222   guint64 rendered;
223   guint64 dropped;
224
225   /* latency stuff */
226   GstClockTime latency;
227
228   /* if we already commited the state */
229   gboolean commited;
230   /* state change to playing ongoing */
231   gboolean to_playing;
232
233   /* when we received EOS */
234   gboolean received_eos;
235
236   /* when we are prerolled and able to report latency */
237   gboolean have_latency;
238
239   /* the last buffer we prerolled or rendered. Useful for making snapshots */
240   gint enable_last_sample;      /* atomic */
241   GstBuffer *last_buffer;
242   GstCaps *last_caps;
243
244   /* negotiated caps */
245   GstCaps *caps;
246
247   /* blocksize for pulling */
248   guint blocksize;
249
250   gboolean discont;
251
252   /* seqnum of the stream */
253   guint32 seqnum;
254
255   gboolean call_preroll;
256   gboolean step_unlock;
257
258   /* we have a pending and a current step operation */
259   GstStepInfo current_step;
260   GstStepInfo pending_step;
261
262   /* Cached GstClockID */
263   GstClockID cached_clock_id;
264
265   /* for throttling and QoS */
266   GstClockTime earliest_in_time;
267   GstClockTime throttle_time;
268
269   gboolean reset_time;
270 };
271
272 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
273
274 /* generic running average, this has a neutral window size */
275 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
276
277 /* the windows for these running averages are experimentally obtained.
278  * possitive values get averaged more while negative values use a small
279  * window so we can react faster to badness. */
280 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
281 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
282
283 /* BaseSink properties */
284
285 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
286 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
287
288 #define DEFAULT_SYNC                TRUE
289 #define DEFAULT_MAX_LATENESS        -1
290 #define DEFAULT_QOS                 FALSE
291 #define DEFAULT_ASYNC               TRUE
292 #define DEFAULT_TS_OFFSET           0
293 #define DEFAULT_BLOCKSIZE           4096
294 #define DEFAULT_RENDER_DELAY        0
295 #define DEFAULT_ENABLE_LAST_SAMPLE  TRUE
296 #define DEFAULT_THROTTLE_TIME       0
297
298 enum
299 {
300   PROP_0,
301   PROP_SYNC,
302   PROP_MAX_LATENESS,
303   PROP_QOS,
304   PROP_ASYNC,
305   PROP_TS_OFFSET,
306   PROP_ENABLE_LAST_SAMPLE,
307   PROP_LAST_SAMPLE,
308   PROP_BLOCKSIZE,
309   PROP_RENDER_DELAY,
310   PROP_THROTTLE_TIME,
311   PROP_LAST
312 };
313
314 static GstElementClass *parent_class = NULL;
315
316 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
317 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
318 static void gst_base_sink_finalize (GObject * object);
319
320 GType
321 gst_base_sink_get_type (void)
322 {
323   static volatile gsize base_sink_type = 0;
324
325   if (g_once_init_enter (&base_sink_type)) {
326     GType _type;
327     static const GTypeInfo base_sink_info = {
328       sizeof (GstBaseSinkClass),
329       NULL,
330       NULL,
331       (GClassInitFunc) gst_base_sink_class_init,
332       NULL,
333       NULL,
334       sizeof (GstBaseSink),
335       0,
336       (GInstanceInitFunc) gst_base_sink_init,
337     };
338
339     _type = g_type_register_static (GST_TYPE_ELEMENT,
340         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
341     g_once_init_leave (&base_sink_type, _type);
342   }
343   return base_sink_type;
344 }
345
346 static void gst_base_sink_set_property (GObject * object, guint prop_id,
347     const GValue * value, GParamSpec * pspec);
348 static void gst_base_sink_get_property (GObject * object, guint prop_id,
349     GValue * value, GParamSpec * pspec);
350
351 static gboolean gst_base_sink_send_event (GstElement * element,
352     GstEvent * event);
353 static gboolean default_element_query (GstElement * element, GstQuery * query);
354
355 static GstCaps *gst_base_sink_default_get_caps (GstBaseSink * sink,
356     GstCaps * caps);
357 static gboolean gst_base_sink_default_set_caps (GstBaseSink * sink,
358     GstCaps * caps);
359 static void gst_base_sink_default_get_times (GstBaseSink * basesink,
360     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
361 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
362     GstPad * pad, gboolean flushing);
363 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
364     gboolean active);
365 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
366     GstSegment * segment);
367 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
368     GstEvent * event, GstSegment * segment);
369
370 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
371     GstStateChange transition);
372
373 static gboolean gst_base_sink_sink_query (GstPad * pad, GstObject * parent,
374     GstQuery * query);
375 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstObject * parent,
376     GstBuffer * buffer);
377 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad, GstObject * parent,
378     GstBufferList * list);
379
380 static void gst_base_sink_loop (GstPad * pad);
381 static gboolean gst_base_sink_pad_activate (GstPad * pad, GstObject * parent);
382 static gboolean gst_base_sink_pad_activate_mode (GstPad * pad,
383     GstObject * parent, GstPadMode mode, gboolean active);
384 static gboolean gst_base_sink_default_event (GstBaseSink * basesink,
385     GstEvent * event);
386 static GstFlowReturn gst_base_sink_default_wait_eos (GstBaseSink * basesink,
387     GstEvent * event);
388 static gboolean gst_base_sink_event (GstPad * pad, GstObject * parent,
389     GstEvent * event);
390
391 static gboolean gst_base_sink_default_query (GstBaseSink * sink,
392     GstQuery * query);
393
394 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
395 static void gst_base_sink_default_fixate (GstBaseSink * bsink, GstCaps * caps);
396 static void gst_base_sink_fixate (GstBaseSink * bsink, GstCaps * caps);
397
398 /* check if an object was too late */
399 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
400     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
401     GstClockReturn status, GstClockTimeDiff jitter);
402
403 static void
404 gst_base_sink_class_init (GstBaseSinkClass * klass)
405 {
406   GObjectClass *gobject_class;
407   GstElementClass *gstelement_class;
408
409   gobject_class = G_OBJECT_CLASS (klass);
410   gstelement_class = GST_ELEMENT_CLASS (klass);
411
412   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
413       "basesink element");
414
415   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
416
417   parent_class = g_type_class_peek_parent (klass);
418
419   gobject_class->finalize = gst_base_sink_finalize;
420   gobject_class->set_property = gst_base_sink_set_property;
421   gobject_class->get_property = gst_base_sink_get_property;
422
423   g_object_class_install_property (gobject_class, PROP_SYNC,
424       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
425           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
428       g_param_spec_int64 ("max-lateness", "Max Lateness",
429           "Maximum number of nanoseconds that a buffer can be late before it "
430           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432
433   g_object_class_install_property (gobject_class, PROP_QOS,
434       g_param_spec_boolean ("qos", "Qos",
435           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437   /**
438    * GstBaseSink:async
439    *
440    * If set to #TRUE, the basesink will perform asynchronous state changes.
441    * When set to #FALSE, the sink will not signal the parent when it prerolls.
442    * Use this option when dealing with sparse streams or when synchronisation is
443    * not required.
444    *
445    * Since: 0.10.15
446    */
447   g_object_class_install_property (gobject_class, PROP_ASYNC,
448       g_param_spec_boolean ("async", "Async",
449           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
450           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstBaseSink:ts-offset
453    *
454    * Controls the final synchronisation, a negative value will render the buffer
455    * earlier while a positive value delays playback. This property can be
456    * used to fix synchronisation in bad files.
457    *
458    * Since: 0.10.15
459    */
460   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
461       g_param_spec_int64 ("ts-offset", "TS Offset",
462           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
463           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464
465   /**
466    * GstBaseSink:enable-last-sample
467    *
468    * Enable the last-sample property. If FALSE, basesink doesn't keep a
469    * reference to the last buffer arrived and the last-sample property is always
470    * set to NULL. This can be useful if you need buffers to be released as soon
471    * as possible, eg. if you're using a buffer pool.
472    *
473    * Since: 0.10.30
474    */
475   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_SAMPLE,
476       g_param_spec_boolean ("enable-last-sample", "Enable Last Buffer",
477           "Enable the last-sample property", DEFAULT_ENABLE_LAST_SAMPLE,
478           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
479
480   /**
481    * GstBaseSink:last-sample
482    *
483    * The last buffer that arrived in the sink and was used for preroll or for
484    * rendering. This property can be used to generate thumbnails. This property
485    * can be NULL when the sink has not yet received a bufer.
486    *
487    * Since: 0.10.15
488    */
489   g_object_class_install_property (gobject_class, PROP_LAST_SAMPLE,
490       g_param_spec_boxed ("last-sample", "Last Sample",
491           "The last sample received in the sink", GST_TYPE_SAMPLE,
492           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
493   /**
494    * GstBaseSink:blocksize
495    *
496    * The amount of bytes to pull when operating in pull mode.
497    *
498    * Since: 0.10.22
499    */
500   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
501   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
502       g_param_spec_uint ("blocksize", "Block size",
503           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
504           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505   /**
506    * GstBaseSink:render-delay
507    *
508    * The additional delay between synchronisation and actual rendering of the
509    * media. This property will add additional latency to the device in order to
510    * make other sinks compensate for the delay.
511    *
512    * Since: 0.10.22
513    */
514   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
515       g_param_spec_uint64 ("render-delay", "Render Delay",
516           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
517           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
518   /**
519    * GstBaseSink:throttle-time
520    *
521    * The time to insert between buffers. This property can be used to control
522    * the maximum amount of buffers per second to render. Setting this property
523    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
524    *
525    * Since: 0.10.33
526    */
527   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
528       g_param_spec_uint64 ("throttle-time", "Throttle time",
529           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
530           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531
532   gstelement_class->change_state =
533       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
534   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
535   gstelement_class->query = GST_DEBUG_FUNCPTR (default_element_query);
536
537   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_default_get_caps);
538   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_default_set_caps);
539   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_sink_default_fixate);
540   klass->activate_pull =
541       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
542   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_default_get_times);
543   klass->query = GST_DEBUG_FUNCPTR (gst_base_sink_default_query);
544   klass->event = GST_DEBUG_FUNCPTR (gst_base_sink_default_event);
545   klass->wait_eos = GST_DEBUG_FUNCPTR (gst_base_sink_default_wait_eos);
546
547   /* Registering debug symbols for function pointers */
548   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_fixate);
549   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
550   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_mode);
551   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
552   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
553   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
554   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_sink_query);
555 }
556
557 static GstCaps *
558 gst_base_sink_query_caps (GstBaseSink * bsink, GstPad * pad, GstCaps * filter)
559 {
560   GstBaseSinkClass *bclass;
561   GstCaps *caps = NULL;
562   gboolean fixed;
563
564   bclass = GST_BASE_SINK_GET_CLASS (bsink);
565   fixed = GST_PAD_IS_FIXED_CAPS (pad);
566
567   if (fixed || bsink->pad_mode == GST_PAD_MODE_PULL) {
568     /* if we are operating in pull mode or fixed caps, we only accept the
569      * currently negotiated caps */
570     caps = gst_pad_get_current_caps (pad);
571   }
572   if (caps == NULL) {
573     if (bclass->get_caps)
574       caps = bclass->get_caps (bsink, filter);
575
576     if (caps == NULL) {
577       GstPadTemplate *pad_template;
578
579       pad_template =
580           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
581           "sink");
582       if (pad_template != NULL) {
583         caps = gst_pad_template_get_caps (pad_template);
584
585         if (filter) {
586           GstCaps *intersection;
587
588           intersection =
589               gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
590           gst_caps_unref (caps);
591           caps = intersection;
592         }
593       }
594     }
595   }
596
597   return caps;
598 }
599
600 static void
601 gst_base_sink_default_fixate (GstBaseSink * bsink, GstCaps * caps)
602 {
603   GST_DEBUG_OBJECT (bsink, "using default caps fixate function");
604   gst_caps_fixate (caps);
605 }
606
607 static void
608 gst_base_sink_fixate (GstBaseSink * bsink, GstCaps * caps)
609 {
610   GstBaseSinkClass *bclass;
611
612   bclass = GST_BASE_SINK_GET_CLASS (bsink);
613
614   if (bclass->fixate)
615     bclass->fixate (bsink, caps);
616 }
617
618 static void
619 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
620 {
621   GstPadTemplate *pad_template;
622   GstBaseSinkPrivate *priv;
623
624   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
625
626   pad_template =
627       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
628   g_return_if_fail (pad_template != NULL);
629
630   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
631
632   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
633   gst_pad_set_activatemode_function (basesink->sinkpad,
634       gst_base_sink_pad_activate_mode);
635   gst_pad_set_query_function (basesink->sinkpad, gst_base_sink_sink_query);
636   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
637   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
638   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
639   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
640
641   basesink->pad_mode = GST_PAD_MODE_NONE;
642   basesink->preroll_lock = g_mutex_new ();
643   basesink->preroll_cond = g_cond_new ();
644   priv->have_latency = FALSE;
645
646   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
647   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
648
649   basesink->sync = DEFAULT_SYNC;
650   basesink->max_lateness = DEFAULT_MAX_LATENESS;
651   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
652   priv->async_enabled = DEFAULT_ASYNC;
653   priv->ts_offset = DEFAULT_TS_OFFSET;
654   priv->render_delay = DEFAULT_RENDER_DELAY;
655   priv->blocksize = DEFAULT_BLOCKSIZE;
656   priv->cached_clock_id = NULL;
657   g_atomic_int_set (&priv->enable_last_sample, DEFAULT_ENABLE_LAST_SAMPLE);
658   priv->throttle_time = DEFAULT_THROTTLE_TIME;
659
660   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_FLAG_SINK);
661 }
662
663 static void
664 gst_base_sink_finalize (GObject * object)
665 {
666   GstBaseSink *basesink;
667
668   basesink = GST_BASE_SINK (object);
669
670   g_mutex_free (basesink->preroll_lock);
671   g_cond_free (basesink->preroll_cond);
672
673   G_OBJECT_CLASS (parent_class)->finalize (object);
674 }
675
676 /**
677  * gst_base_sink_set_sync:
678  * @sink: the sink
679  * @sync: the new sync value.
680  *
681  * Configures @sink to synchronize on the clock or not. When
682  * @sync is FALSE, incoming samples will be played as fast as
683  * possible. If @sync is TRUE, the timestamps of the incomming
684  * buffers will be used to schedule the exact render time of its
685  * contents.
686  *
687  * Since: 0.10.4
688  */
689 void
690 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
691 {
692   g_return_if_fail (GST_IS_BASE_SINK (sink));
693
694   GST_OBJECT_LOCK (sink);
695   sink->sync = sync;
696   GST_OBJECT_UNLOCK (sink);
697 }
698
699 /**
700  * gst_base_sink_get_sync:
701  * @sink: the sink
702  *
703  * Checks if @sink is currently configured to synchronize against the
704  * clock.
705  *
706  * Returns: TRUE if the sink is configured to synchronize against the clock.
707  *
708  * Since: 0.10.4
709  */
710 gboolean
711 gst_base_sink_get_sync (GstBaseSink * sink)
712 {
713   gboolean res;
714
715   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
716
717   GST_OBJECT_LOCK (sink);
718   res = sink->sync;
719   GST_OBJECT_UNLOCK (sink);
720
721   return res;
722 }
723
724 /**
725  * gst_base_sink_set_max_lateness:
726  * @sink: the sink
727  * @max_lateness: the new max lateness value.
728  *
729  * Sets the new max lateness value to @max_lateness. This value is
730  * used to decide if a buffer should be dropped or not based on the
731  * buffer timestamp and the current clock time. A value of -1 means
732  * an unlimited time.
733  *
734  * Since: 0.10.4
735  */
736 void
737 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
738 {
739   g_return_if_fail (GST_IS_BASE_SINK (sink));
740
741   GST_OBJECT_LOCK (sink);
742   sink->max_lateness = max_lateness;
743   GST_OBJECT_UNLOCK (sink);
744 }
745
746 /**
747  * gst_base_sink_get_max_lateness:
748  * @sink: the sink
749  *
750  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
751  * more details.
752  *
753  * Returns: The maximum time in nanoseconds that a buffer can be late
754  * before it is dropped and not rendered. A value of -1 means an
755  * unlimited time.
756  *
757  * Since: 0.10.4
758  */
759 gint64
760 gst_base_sink_get_max_lateness (GstBaseSink * sink)
761 {
762   gint64 res;
763
764   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
765
766   GST_OBJECT_LOCK (sink);
767   res = sink->max_lateness;
768   GST_OBJECT_UNLOCK (sink);
769
770   return res;
771 }
772
773 /**
774  * gst_base_sink_set_qos_enabled:
775  * @sink: the sink
776  * @enabled: the new qos value.
777  *
778  * Configures @sink to send Quality-of-Service events upstream.
779  *
780  * Since: 0.10.5
781  */
782 void
783 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
784 {
785   g_return_if_fail (GST_IS_BASE_SINK (sink));
786
787   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
788 }
789
790 /**
791  * gst_base_sink_is_qos_enabled:
792  * @sink: the sink
793  *
794  * Checks if @sink is currently configured to send Quality-of-Service events
795  * upstream.
796  *
797  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
798  *
799  * Since: 0.10.5
800  */
801 gboolean
802 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
803 {
804   gboolean res;
805
806   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
807
808   res = g_atomic_int_get (&sink->priv->qos_enabled);
809
810   return res;
811 }
812
813 /**
814  * gst_base_sink_set_async_enabled:
815  * @sink: the sink
816  * @enabled: the new async value.
817  *
818  * Configures @sink to perform all state changes asynchronusly. When async is
819  * disabled, the sink will immediately go to PAUSED instead of waiting for a
820  * preroll buffer. This feature is useful if the sink does not synchronize
821  * against the clock or when it is dealing with sparse streams.
822  *
823  * Since: 0.10.15
824  */
825 void
826 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
827 {
828   g_return_if_fail (GST_IS_BASE_SINK (sink));
829
830   GST_BASE_SINK_PREROLL_LOCK (sink);
831   g_atomic_int_set (&sink->priv->async_enabled, enabled);
832   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
833   GST_BASE_SINK_PREROLL_UNLOCK (sink);
834 }
835
836 /**
837  * gst_base_sink_is_async_enabled:
838  * @sink: the sink
839  *
840  * Checks if @sink is currently configured to perform asynchronous state
841  * changes to PAUSED.
842  *
843  * Returns: TRUE if the sink is configured to perform asynchronous state
844  * changes.
845  *
846  * Since: 0.10.15
847  */
848 gboolean
849 gst_base_sink_is_async_enabled (GstBaseSink * sink)
850 {
851   gboolean res;
852
853   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
854
855   res = g_atomic_int_get (&sink->priv->async_enabled);
856
857   return res;
858 }
859
860 /**
861  * gst_base_sink_set_ts_offset:
862  * @sink: the sink
863  * @offset: the new offset
864  *
865  * Adjust the synchronisation of @sink with @offset. A negative value will
866  * render buffers earlier than their timestamp. A positive value will delay
867  * rendering. This function can be used to fix playback of badly timestamped
868  * buffers.
869  *
870  * Since: 0.10.15
871  */
872 void
873 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
874 {
875   g_return_if_fail (GST_IS_BASE_SINK (sink));
876
877   GST_OBJECT_LOCK (sink);
878   sink->priv->ts_offset = offset;
879   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
880   GST_OBJECT_UNLOCK (sink);
881 }
882
883 /**
884  * gst_base_sink_get_ts_offset:
885  * @sink: the sink
886  *
887  * Get the synchronisation offset of @sink.
888  *
889  * Returns: The synchronisation offset.
890  *
891  * Since: 0.10.15
892  */
893 GstClockTimeDiff
894 gst_base_sink_get_ts_offset (GstBaseSink * sink)
895 {
896   GstClockTimeDiff res;
897
898   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
899
900   GST_OBJECT_LOCK (sink);
901   res = sink->priv->ts_offset;
902   GST_OBJECT_UNLOCK (sink);
903
904   return res;
905 }
906
907 /**
908  * gst_base_sink_get_last_sample:
909  * @sink: the sink
910  *
911  * Get the last sample that arrived in the sink and was used for preroll or for
912  * rendering. This property can be used to generate thumbnails.
913  *
914  * The #GstCaps on the sample can be used to determine the type of the buffer.
915  *
916  * Free-function: gst_sample_unref
917  *
918  * Returns: (transfer full): a #GstSample. gst_sample_unref() after usage.
919  *     This function returns NULL when no buffer has arrived in the sink yet
920  *     or when the sink is not in PAUSED or PLAYING.
921  *
922  * Since: 0.10.15
923  */
924 GstSample *
925 gst_base_sink_get_last_sample (GstBaseSink * sink)
926 {
927   GstSample *res = NULL;
928
929   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
930
931   GST_OBJECT_LOCK (sink);
932   if (sink->priv->last_buffer) {
933     res = gst_sample_new (sink->priv->last_buffer,
934         sink->priv->last_caps, &sink->segment, NULL);
935   }
936   GST_OBJECT_UNLOCK (sink);
937
938   return res;
939 }
940
941 /* with OBJECT_LOCK */
942 static void
943 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
944 {
945   GstBuffer *old;
946
947   old = sink->priv->last_buffer;
948   if (G_LIKELY (old != buffer)) {
949     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
950     if (G_LIKELY (buffer))
951       gst_buffer_ref (buffer);
952     sink->priv->last_buffer = buffer;
953     if (buffer)
954       /* copy over the caps */
955       gst_caps_replace (&sink->priv->last_caps, sink->priv->caps);
956     else
957       gst_caps_replace (&sink->priv->last_caps, NULL);
958   } else {
959     old = NULL;
960   }
961   /* avoid unreffing with the lock because cleanup code might want to take the
962    * lock too */
963   if (G_LIKELY (old)) {
964     GST_OBJECT_UNLOCK (sink);
965     gst_buffer_unref (old);
966     GST_OBJECT_LOCK (sink);
967   }
968 }
969
970 static void
971 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
972 {
973   if (!g_atomic_int_get (&sink->priv->enable_last_sample))
974     return;
975
976   GST_OBJECT_LOCK (sink);
977   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
978   GST_OBJECT_UNLOCK (sink);
979 }
980
981 /**
982  * gst_base_sink_set_last_sample_enabled:
983  * @sink: the sink
984  * @enabled: the new enable-last-sample value.
985  *
986  * Configures @sink to store the last received sample in the last-sample
987  * property.
988  *
989  * Since: 0.10.30
990  */
991 void
992 gst_base_sink_set_last_sample_enabled (GstBaseSink * sink, gboolean enabled)
993 {
994   g_return_if_fail (GST_IS_BASE_SINK (sink));
995
996   /* Only take lock if we change the value */
997   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_sample,
998           !enabled, enabled) && !enabled) {
999     GST_OBJECT_LOCK (sink);
1000     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1001     GST_OBJECT_UNLOCK (sink);
1002   }
1003 }
1004
1005 /**
1006  * gst_base_sink_is_last_sample_enabled:
1007  * @sink: the sink
1008  *
1009  * Checks if @sink is currently configured to store the last received sample in
1010  * the last-sample property.
1011  *
1012  * Returns: TRUE if the sink is configured to store the last received sample.
1013  *
1014  * Since: 0.10.30
1015  */
1016 gboolean
1017 gst_base_sink_is_last_sample_enabled (GstBaseSink * sink)
1018 {
1019   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1020
1021   return g_atomic_int_get (&sink->priv->enable_last_sample);
1022 }
1023
1024 /**
1025  * gst_base_sink_get_latency:
1026  * @sink: the sink
1027  *
1028  * Get the currently configured latency.
1029  *
1030  * Returns: The configured latency.
1031  *
1032  * Since: 0.10.12
1033  */
1034 GstClockTime
1035 gst_base_sink_get_latency (GstBaseSink * sink)
1036 {
1037   GstClockTime res;
1038
1039   GST_OBJECT_LOCK (sink);
1040   res = sink->priv->latency;
1041   GST_OBJECT_UNLOCK (sink);
1042
1043   return res;
1044 }
1045
1046 /**
1047  * gst_base_sink_query_latency:
1048  * @sink: the sink
1049  * @live: (out) (allow-none): if the sink is live
1050  * @upstream_live: (out) (allow-none): if an upstream element is live
1051  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1052  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1053  *
1054  * Query the sink for the latency parameters. The latency will be queried from
1055  * the upstream elements. @live will be TRUE if @sink is configured to
1056  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1057  * element is live.
1058  *
1059  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1060  * for the latency introduced by the upstream elements by setting the
1061  * @min_latency to a strictly possitive value.
1062  *
1063  * This function is mostly used by subclasses.
1064  *
1065  * Returns: TRUE if the query succeeded.
1066  *
1067  * Since: 0.10.12
1068  */
1069 gboolean
1070 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1071     gboolean * upstream_live, GstClockTime * min_latency,
1072     GstClockTime * max_latency)
1073 {
1074   gboolean l, us_live, res, have_latency;
1075   GstClockTime min, max, render_delay;
1076   GstQuery *query;
1077   GstClockTime us_min, us_max;
1078
1079   /* we are live when we sync to the clock */
1080   GST_OBJECT_LOCK (sink);
1081   l = sink->sync;
1082   have_latency = sink->priv->have_latency;
1083   render_delay = sink->priv->render_delay;
1084   GST_OBJECT_UNLOCK (sink);
1085
1086   /* assume no latency */
1087   min = 0;
1088   max = -1;
1089   us_live = FALSE;
1090
1091   if (have_latency) {
1092     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1093     /* we are ready for a latency query this is when we preroll or when we are
1094      * not async. */
1095     query = gst_query_new_latency ();
1096
1097     /* ask the peer for the latency */
1098     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1099       /* get upstream min and max latency */
1100       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1101
1102       if (us_live) {
1103         /* upstream live, use its latency, subclasses should use these
1104          * values to create the complete latency. */
1105         min = us_min;
1106         max = us_max;
1107       }
1108       if (l) {
1109         /* we need to add the render delay if we are live */
1110         if (min != -1)
1111           min += render_delay;
1112         if (max != -1)
1113           max += render_delay;
1114       }
1115     }
1116     gst_query_unref (query);
1117   } else {
1118     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1119     res = FALSE;
1120   }
1121
1122   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1123   if (!res) {
1124     if (!l) {
1125       res = TRUE;
1126       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1127     } else {
1128       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1129     }
1130   }
1131
1132   if (res) {
1133     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1134         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1135         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1136
1137     if (live)
1138       *live = l;
1139     if (upstream_live)
1140       *upstream_live = us_live;
1141     if (min_latency)
1142       *min_latency = min;
1143     if (max_latency)
1144       *max_latency = max;
1145   }
1146   return res;
1147 }
1148
1149 /**
1150  * gst_base_sink_set_render_delay:
1151  * @sink: a #GstBaseSink
1152  * @delay: the new delay
1153  *
1154  * Set the render delay in @sink to @delay. The render delay is the time
1155  * between actual rendering of a buffer and its synchronisation time. Some
1156  * devices might delay media rendering which can be compensated for with this
1157  * function.
1158  *
1159  * After calling this function, this sink will report additional latency and
1160  * other sinks will adjust their latency to delay the rendering of their media.
1161  *
1162  * This function is usually called by subclasses.
1163  *
1164  * Since: 0.10.21
1165  */
1166 void
1167 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1168 {
1169   GstClockTime old_render_delay;
1170
1171   g_return_if_fail (GST_IS_BASE_SINK (sink));
1172
1173   GST_OBJECT_LOCK (sink);
1174   old_render_delay = sink->priv->render_delay;
1175   sink->priv->render_delay = delay;
1176   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1177       GST_TIME_ARGS (delay));
1178   GST_OBJECT_UNLOCK (sink);
1179
1180   if (delay != old_render_delay) {
1181     GST_DEBUG_OBJECT (sink, "posting latency changed");
1182     gst_element_post_message (GST_ELEMENT_CAST (sink),
1183         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1184   }
1185 }
1186
1187 /**
1188  * gst_base_sink_get_render_delay:
1189  * @sink: a #GstBaseSink
1190  *
1191  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1192  * information about the render delay.
1193  *
1194  * Returns: the render delay of @sink.
1195  *
1196  * Since: 0.10.21
1197  */
1198 GstClockTime
1199 gst_base_sink_get_render_delay (GstBaseSink * sink)
1200 {
1201   GstClockTimeDiff res;
1202
1203   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1204
1205   GST_OBJECT_LOCK (sink);
1206   res = sink->priv->render_delay;
1207   GST_OBJECT_UNLOCK (sink);
1208
1209   return res;
1210 }
1211
1212 /**
1213  * gst_base_sink_set_blocksize:
1214  * @sink: a #GstBaseSink
1215  * @blocksize: the blocksize in bytes
1216  *
1217  * Set the number of bytes that the sink will pull when it is operating in pull
1218  * mode.
1219  *
1220  * Since: 0.10.22
1221  */
1222 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1223 void
1224 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1225 {
1226   g_return_if_fail (GST_IS_BASE_SINK (sink));
1227
1228   GST_OBJECT_LOCK (sink);
1229   sink->priv->blocksize = blocksize;
1230   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1231   GST_OBJECT_UNLOCK (sink);
1232 }
1233
1234 /**
1235  * gst_base_sink_get_blocksize:
1236  * @sink: a #GstBaseSink
1237  *
1238  * Get the number of bytes that the sink will pull when it is operating in pull
1239  * mode.
1240  *
1241  * Returns: the number of bytes @sink will pull in pull mode.
1242  *
1243  * Since: 0.10.22
1244  */
1245 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1246 guint
1247 gst_base_sink_get_blocksize (GstBaseSink * sink)
1248 {
1249   guint res;
1250
1251   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1252
1253   GST_OBJECT_LOCK (sink);
1254   res = sink->priv->blocksize;
1255   GST_OBJECT_UNLOCK (sink);
1256
1257   return res;
1258 }
1259
1260 /**
1261  * gst_base_sink_set_throttle_time:
1262  * @sink: a #GstBaseSink
1263  * @throttle: the throttle time in nanoseconds
1264  *
1265  * Set the time that will be inserted between rendered buffers. This
1266  * can be used to control the maximum buffers per second that the sink
1267  * will render. 
1268  *
1269  * Since: 0.10.33
1270  */
1271 void
1272 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1273 {
1274   g_return_if_fail (GST_IS_BASE_SINK (sink));
1275
1276   GST_OBJECT_LOCK (sink);
1277   sink->priv->throttle_time = throttle;
1278   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1279   GST_OBJECT_UNLOCK (sink);
1280 }
1281
1282 /**
1283  * gst_base_sink_get_throttle_time:
1284  * @sink: a #GstBaseSink
1285  *
1286  * Get the time that will be inserted between frames to control the 
1287  * maximum buffers per second.
1288  *
1289  * Returns: the number of nanoseconds @sink will put between frames.
1290  *
1291  * Since: 0.10.33
1292  */
1293 guint64
1294 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1295 {
1296   guint64 res;
1297
1298   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1299
1300   GST_OBJECT_LOCK (sink);
1301   res = sink->priv->throttle_time;
1302   GST_OBJECT_UNLOCK (sink);
1303
1304   return res;
1305 }
1306
1307 static void
1308 gst_base_sink_set_property (GObject * object, guint prop_id,
1309     const GValue * value, GParamSpec * pspec)
1310 {
1311   GstBaseSink *sink = GST_BASE_SINK (object);
1312
1313   switch (prop_id) {
1314     case PROP_SYNC:
1315       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1316       break;
1317     case PROP_MAX_LATENESS:
1318       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1319       break;
1320     case PROP_QOS:
1321       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1322       break;
1323     case PROP_ASYNC:
1324       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1325       break;
1326     case PROP_TS_OFFSET:
1327       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1328       break;
1329     case PROP_BLOCKSIZE:
1330       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1331       break;
1332     case PROP_RENDER_DELAY:
1333       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1334       break;
1335     case PROP_ENABLE_LAST_SAMPLE:
1336       gst_base_sink_set_last_sample_enabled (sink, g_value_get_boolean (value));
1337       break;
1338     case PROP_THROTTLE_TIME:
1339       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1340       break;
1341     default:
1342       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1343       break;
1344   }
1345 }
1346
1347 static void
1348 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1349     GParamSpec * pspec)
1350 {
1351   GstBaseSink *sink = GST_BASE_SINK (object);
1352
1353   switch (prop_id) {
1354     case PROP_SYNC:
1355       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1356       break;
1357     case PROP_MAX_LATENESS:
1358       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1359       break;
1360     case PROP_QOS:
1361       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1362       break;
1363     case PROP_ASYNC:
1364       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1365       break;
1366     case PROP_TS_OFFSET:
1367       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1368       break;
1369     case PROP_LAST_SAMPLE:
1370       gst_value_take_buffer (value, gst_base_sink_get_last_sample (sink));
1371       break;
1372     case PROP_ENABLE_LAST_SAMPLE:
1373       g_value_set_boolean (value, gst_base_sink_is_last_sample_enabled (sink));
1374       break;
1375     case PROP_BLOCKSIZE:
1376       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1377       break;
1378     case PROP_RENDER_DELAY:
1379       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1380       break;
1381     case PROP_THROTTLE_TIME:
1382       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1383       break;
1384     default:
1385       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1386       break;
1387   }
1388 }
1389
1390
1391 static GstCaps *
1392 gst_base_sink_default_get_caps (GstBaseSink * sink, GstCaps * filter)
1393 {
1394   return NULL;
1395 }
1396
1397 static gboolean
1398 gst_base_sink_default_set_caps (GstBaseSink * sink, GstCaps * caps)
1399 {
1400   return TRUE;
1401 }
1402
1403 /* with PREROLL_LOCK, STREAM_LOCK */
1404 static gboolean
1405 gst_base_sink_commit_state (GstBaseSink * basesink)
1406 {
1407   /* commit state and proceed to next pending state */
1408   GstState current, next, pending, post_pending;
1409   gboolean post_paused = FALSE;
1410   gboolean post_async_done = FALSE;
1411   gboolean post_playing = FALSE;
1412   gboolean reset_time;
1413
1414   /* we are certainly not playing async anymore now */
1415   basesink->playing_async = FALSE;
1416
1417   GST_OBJECT_LOCK (basesink);
1418   current = GST_STATE (basesink);
1419   next = GST_STATE_NEXT (basesink);
1420   pending = GST_STATE_PENDING (basesink);
1421   post_pending = pending;
1422   reset_time = basesink->priv->reset_time;
1423   basesink->priv->reset_time = FALSE;
1424
1425   switch (pending) {
1426     case GST_STATE_PLAYING:
1427     {
1428       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1429
1430       basesink->need_preroll = FALSE;
1431       post_async_done = TRUE;
1432       basesink->priv->commited = TRUE;
1433       post_playing = TRUE;
1434       /* post PAUSED too when we were READY */
1435       if (current == GST_STATE_READY) {
1436         post_paused = TRUE;
1437       }
1438       break;
1439     }
1440     case GST_STATE_PAUSED:
1441       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1442       post_paused = TRUE;
1443       post_async_done = TRUE;
1444       basesink->priv->commited = TRUE;
1445       post_pending = GST_STATE_VOID_PENDING;
1446       break;
1447     case GST_STATE_READY:
1448     case GST_STATE_NULL:
1449       goto stopping;
1450     case GST_STATE_VOID_PENDING:
1451       goto nothing_pending;
1452     default:
1453       break;
1454   }
1455
1456   /* we can report latency queries now */
1457   basesink->priv->have_latency = TRUE;
1458
1459   GST_STATE (basesink) = pending;
1460   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1461   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1462   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1463   GST_OBJECT_UNLOCK (basesink);
1464
1465   if (post_paused) {
1466     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1467     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1468         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1469             current, next, post_pending));
1470   }
1471   if (post_async_done) {
1472     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1473     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1474         gst_message_new_async_done (GST_OBJECT_CAST (basesink), reset_time));
1475   }
1476   if (post_playing) {
1477     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1478     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1479         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1480             next, pending, GST_STATE_VOID_PENDING));
1481   }
1482
1483   GST_STATE_BROADCAST (basesink);
1484
1485   return TRUE;
1486
1487 nothing_pending:
1488   {
1489     /* Depending on the state, set our vars. We get in this situation when the
1490      * state change function got a change to update the state vars before the
1491      * streaming thread did. This is fine but we need to make sure that we
1492      * update the need_preroll var since it was TRUE when we got here and might
1493      * become FALSE if we got to PLAYING. */
1494     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1495         gst_element_state_get_name (current));
1496     switch (current) {
1497       case GST_STATE_PLAYING:
1498         basesink->need_preroll = FALSE;
1499         break;
1500       case GST_STATE_PAUSED:
1501         basesink->need_preroll = TRUE;
1502         break;
1503       default:
1504         basesink->need_preroll = FALSE;
1505         basesink->flushing = TRUE;
1506         break;
1507     }
1508     /* we can report latency queries now */
1509     basesink->priv->have_latency = TRUE;
1510     GST_OBJECT_UNLOCK (basesink);
1511     return TRUE;
1512   }
1513 stopping:
1514   {
1515     /* app is going to READY */
1516     GST_DEBUG_OBJECT (basesink, "stopping");
1517     basesink->need_preroll = FALSE;
1518     basesink->flushing = TRUE;
1519     GST_OBJECT_UNLOCK (basesink);
1520     return FALSE;
1521   }
1522 }
1523
1524 static void
1525 start_stepping (GstBaseSink * sink, GstSegment * segment,
1526     GstStepInfo * pending, GstStepInfo * current)
1527 {
1528   gint64 end;
1529   GstMessage *message;
1530
1531   GST_DEBUG_OBJECT (sink, "update pending step");
1532
1533   GST_OBJECT_LOCK (sink);
1534   memcpy (current, pending, sizeof (GstStepInfo));
1535   pending->valid = FALSE;
1536   GST_OBJECT_UNLOCK (sink);
1537
1538   /* post message first */
1539   message =
1540       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1541       current->amount, current->rate, current->flush, current->intermediate);
1542   gst_message_set_seqnum (message, current->seqnum);
1543   gst_element_post_message (GST_ELEMENT (sink), message);
1544
1545   /* get the running time of where we paused and remember it */
1546   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1547   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1548
1549   /* set the new rate for the remainder of the segment */
1550   current->start_rate = segment->rate;
1551   segment->rate *= current->rate;
1552
1553   /* save values */
1554   if (segment->rate > 0.0)
1555     current->start_stop = segment->stop;
1556   else
1557     current->start_start = segment->start;
1558
1559   if (current->format == GST_FORMAT_TIME) {
1560     end = current->start + current->amount;
1561     if (!current->flush) {
1562       /* update the segment clipping regions for non-flushing seeks */
1563       if (segment->rate > 0.0) {
1564         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1565         segment->position = segment->stop;
1566       } else {
1567         gint64 position;
1568
1569         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1570         segment->time = position;
1571         segment->start = position;
1572         segment->position = position;
1573       }
1574     }
1575   }
1576
1577   GST_DEBUG_OBJECT (sink, "segment now %" GST_SEGMENT_FORMAT, segment);
1578   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1579       GST_TIME_ARGS (current->start));
1580
1581   if (current->amount == -1) {
1582     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1583     current->valid = FALSE;
1584   } else {
1585     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1586         "rate: %f", current->amount, gst_format_get_name (current->format),
1587         current->rate);
1588   }
1589 }
1590
1591 static void
1592 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1593     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1594 {
1595   gint64 stop, position;
1596   GstMessage *message;
1597
1598   GST_DEBUG_OBJECT (sink, "step complete");
1599
1600   if (segment->rate > 0.0)
1601     stop = rstart;
1602   else
1603     stop = rstop;
1604
1605   GST_DEBUG_OBJECT (sink,
1606       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1607
1608   if (stop == -1)
1609     current->duration = current->position;
1610   else
1611     current->duration = stop - current->start;
1612
1613   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1614       GST_TIME_ARGS (current->duration));
1615
1616   position = current->start + current->duration;
1617
1618   /* now move the segment to the new running time */
1619   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1620
1621   if (current->flush) {
1622     /* and remove the time we flushed, start time did not change */
1623     segment->base = current->start;
1624   } else {
1625     /* start time is now the stepped position */
1626     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1627   }
1628
1629   /* restore the previous rate */
1630   segment->rate = current->start_rate;
1631
1632   if (segment->rate > 0.0)
1633     segment->stop = current->start_stop;
1634   else
1635     segment->start = current->start_start;
1636
1637   /* post the step done when we know the stepped duration in TIME */
1638   message =
1639       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1640       current->amount, current->rate, current->flush, current->intermediate,
1641       current->duration, eos);
1642   gst_message_set_seqnum (message, current->seqnum);
1643   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1644
1645   if (!current->intermediate)
1646     sink->need_preroll = current->need_preroll;
1647
1648   /* and the current step info finished and becomes invalid */
1649   current->valid = FALSE;
1650 }
1651
1652 static gboolean
1653 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1654     GstStepInfo * current, guint64 * cstart, guint64 * cstop, guint64 * rstart,
1655     guint64 * rstop)
1656 {
1657   gboolean step_end = FALSE;
1658
1659   /* see if we need to skip this buffer because of stepping */
1660   switch (current->format) {
1661     case GST_FORMAT_TIME:
1662     {
1663       guint64 end;
1664       guint64 first, last;
1665       gdouble abs_rate;
1666
1667       if (segment->rate > 0.0) {
1668         if (segment->stop == *cstop)
1669           *rstop = *rstart + current->amount;
1670
1671         first = *rstart;
1672         last = *rstop;
1673       } else {
1674         if (segment->start == *cstart)
1675           *rstart = *rstop + current->amount;
1676
1677         first = *rstop;
1678         last = *rstart;
1679       }
1680
1681       end = current->start + current->amount;
1682       current->position = first - current->start;
1683
1684       abs_rate = ABS (segment->rate);
1685       if (G_UNLIKELY (abs_rate != 1.0))
1686         current->position /= abs_rate;
1687
1688       GST_DEBUG_OBJECT (sink,
1689           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1690           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1691       GST_DEBUG_OBJECT (sink,
1692           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1693           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1694           GST_TIME_ARGS (last - current->start),
1695           GST_TIME_ARGS (current->amount));
1696
1697       if ((current->flush && current->position >= current->amount)
1698           || last >= end) {
1699         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1700         step_end = TRUE;
1701         if (segment->rate > 0.0) {
1702           *rstart = end;
1703           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1704         } else {
1705           *rstop = end;
1706           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1707         }
1708       }
1709       GST_DEBUG_OBJECT (sink,
1710           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1711           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1712       GST_DEBUG_OBJECT (sink,
1713           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1714           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1715       break;
1716     }
1717     case GST_FORMAT_BUFFERS:
1718       GST_DEBUG_OBJECT (sink,
1719           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1720           current->position, current->amount);
1721
1722       if (current->position < current->amount) {
1723         current->position++;
1724       } else {
1725         step_end = TRUE;
1726       }
1727       break;
1728     case GST_FORMAT_DEFAULT:
1729     default:
1730       GST_DEBUG_OBJECT (sink,
1731           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1732           current->position, current->amount);
1733       break;
1734   }
1735   return step_end;
1736 }
1737
1738 /* with STREAM_LOCK, PREROLL_LOCK
1739  *
1740  * Returns TRUE if the object needs synchronisation and takes therefore
1741  * part in prerolling.
1742  *
1743  * rsstart/rsstop contain the start/stop in stream time.
1744  * rrstart/rrstop contain the start/stop in running time.
1745  */
1746 static gboolean
1747 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1748     GstClockTime * rsstart, GstClockTime * rsstop,
1749     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1750     gboolean * stepped, GstStepInfo * step, gboolean * step_end)
1751 {
1752   GstBaseSinkClass *bclass;
1753   GstBuffer *buffer;
1754   GstClockTime start, stop;     /* raw start/stop timestamps */
1755   guint64 cstart, cstop;        /* clipped raw timestamps */
1756   guint64 rstart, rstop;        /* clipped timestamps converted to running time */
1757   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1758   GstFormat format;
1759   GstBaseSinkPrivate *priv;
1760   GstSegment *segment;
1761   gboolean eos;
1762
1763   priv = basesink->priv;
1764   segment = &basesink->segment;
1765
1766   /* start with nothing */
1767   start = stop = GST_CLOCK_TIME_NONE;
1768
1769   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1770     GstEvent *event = GST_EVENT_CAST (obj);
1771
1772     switch (GST_EVENT_TYPE (event)) {
1773         /* EOS event needs syncing */
1774       case GST_EVENT_EOS:
1775       {
1776         if (segment->rate >= 0.0) {
1777           sstart = sstop = priv->current_sstop;
1778           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1779             /* we have not seen a buffer yet, use the segment values */
1780             sstart = sstop = gst_segment_to_stream_time (segment,
1781                 segment->format, segment->stop);
1782           }
1783         } else {
1784           sstart = sstop = priv->current_sstart;
1785           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1786             /* we have not seen a buffer yet, use the segment values */
1787             sstart = sstop = gst_segment_to_stream_time (segment,
1788                 segment->format, segment->start);
1789           }
1790         }
1791
1792         rstart = rstop = priv->eos_rtime;
1793         *do_sync = rstart != -1;
1794         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1795             GST_TIME_ARGS (rstart));
1796         /* if we are stepping, we end now */
1797         *step_end = step->valid;
1798         eos = TRUE;
1799         goto eos_done;
1800       }
1801       default:
1802         /* other events do not need syncing */
1803         return FALSE;
1804     }
1805   }
1806
1807   eos = FALSE;
1808
1809 again:
1810   /* else do buffer sync code */
1811   buffer = GST_BUFFER_CAST (obj);
1812
1813   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1814
1815   /* just get the times to see if we need syncing, if the start returns -1 we
1816    * don't sync. */
1817   if (bclass->get_times)
1818     bclass->get_times (basesink, buffer, &start, &stop);
1819
1820   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1821     /* we don't need to sync but we still want to get the timestamps for
1822      * tracking the position */
1823     gst_base_sink_default_get_times (basesink, buffer, &start, &stop);
1824     *do_sync = FALSE;
1825   } else {
1826     *do_sync = TRUE;
1827   }
1828
1829   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1830       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1831       GST_TIME_ARGS (stop), *do_sync);
1832
1833   /* collect segment and format for code clarity */
1834   format = segment->format;
1835
1836   /* clip */
1837   if (G_UNLIKELY (!gst_segment_clip (segment, format,
1838               start, stop, &cstart, &cstop))) {
1839     if (step->valid) {
1840       GST_DEBUG_OBJECT (basesink, "step out of segment");
1841       /* when we are stepping, pretend we're at the end of the segment */
1842       if (segment->rate > 0.0) {
1843         cstart = segment->stop;
1844         cstop = segment->stop;
1845       } else {
1846         cstart = segment->start;
1847         cstop = segment->start;
1848       }
1849       goto do_times;
1850     }
1851     goto out_of_segment;
1852   }
1853
1854   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1855     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1856         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1857         GST_TIME_ARGS (cstop));
1858   }
1859
1860   /* set last stop position */
1861   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
1862     segment->position = cstop;
1863   else
1864     segment->position = cstart;
1865
1866 do_times:
1867   rstart = gst_segment_to_running_time (segment, format, cstart);
1868   rstop = gst_segment_to_running_time (segment, format, cstop);
1869
1870   if (G_UNLIKELY (step->valid)) {
1871     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1872                 &rstart, &rstop))) {
1873       /* step is still busy, we discard data when we are flushing */
1874       *stepped = step->flush;
1875       GST_DEBUG_OBJECT (basesink, "stepping busy");
1876     }
1877   }
1878   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1879    * upstream is behaving very badly */
1880   sstart = gst_segment_to_stream_time (segment, format, cstart);
1881   sstop = gst_segment_to_stream_time (segment, format, cstop);
1882
1883 eos_done:
1884   /* eos_done label only called when doing EOS, we also stop stepping then */
1885   if (*step_end && step->flush) {
1886     GST_DEBUG_OBJECT (basesink, "flushing step ended");
1887     stop_stepping (basesink, segment, step, rstart, rstop, eos);
1888     *step_end = FALSE;
1889     /* re-determine running start times for adjusted segment
1890      * (which has a flushed amount of running/accumulated time removed) */
1891     if (!GST_IS_EVENT (obj)) {
1892       GST_DEBUG_OBJECT (basesink, "refresh sync times");
1893       goto again;
1894     }
1895   }
1896
1897   /* save times */
1898   *rsstart = sstart;
1899   *rsstop = sstop;
1900   *rrstart = rstart;
1901   *rrstop = rstop;
1902
1903   /* buffers and EOS always need syncing and preroll */
1904   return TRUE;
1905
1906   /* special cases */
1907 out_of_segment:
1908   {
1909     /* we usually clip in the chain function already but stepping could cause
1910      * the segment to be updated later. we return FALSE so that we don't try
1911      * to sync on it. */
1912     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1913     return FALSE;
1914   }
1915 }
1916
1917 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1918  * adjust a timestamp with the latency and timestamp offset. This function does
1919  * not adjust for the render delay. */
1920 static GstClockTime
1921 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1922 {
1923   GstClockTimeDiff ts_offset;
1924
1925   /* don't do anything funny with invalid timestamps */
1926   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1927     return time;
1928
1929   time += basesink->priv->latency;
1930
1931   /* apply offset, be carefull for underflows */
1932   ts_offset = basesink->priv->ts_offset;
1933   if (ts_offset < 0) {
1934     ts_offset = -ts_offset;
1935     if (ts_offset < time)
1936       time -= ts_offset;
1937     else
1938       time = 0;
1939   } else
1940     time += ts_offset;
1941
1942   /* subtract the render delay again, which was included in the latency */
1943   if (time > basesink->priv->render_delay)
1944     time -= basesink->priv->render_delay;
1945   else
1946     time = 0;
1947
1948   return time;
1949 }
1950
1951 /**
1952  * gst_base_sink_wait_clock:
1953  * @sink: the sink
1954  * @time: the running_time to be reached
1955  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
1956  *
1957  * This function will block until @time is reached. It is usually called by
1958  * subclasses that use their own internal synchronisation.
1959  *
1960  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1961  * returned. Likewise, if synchronisation is disabled in the element or there
1962  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1963  *
1964  * This function should only be called with the PREROLL_LOCK held, like when
1965  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
1966  * receiving a buffer in
1967  * the #GstBaseSinkClass.render() vmethod.
1968  *
1969  * The @time argument should be the running_time of when this method should
1970  * return and is not adjusted with any latency or offset configured in the
1971  * sink.
1972  *
1973  * Since: 0.10.20
1974  *
1975  * Returns: #GstClockReturn
1976  */
1977 GstClockReturn
1978 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1979     GstClockTimeDiff * jitter)
1980 {
1981   GstClockReturn ret;
1982   GstClock *clock;
1983   GstClockTime base_time;
1984
1985   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1986     goto invalid_time;
1987
1988   GST_OBJECT_LOCK (sink);
1989   if (G_UNLIKELY (!sink->sync))
1990     goto no_sync;
1991
1992   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1993     goto no_clock;
1994
1995   base_time = GST_ELEMENT_CAST (sink)->base_time;
1996   GST_LOG_OBJECT (sink,
1997       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
1998       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
1999
2000   /* add base_time to running_time to get the time against the clock */
2001   time += base_time;
2002
2003   /* Re-use existing clockid if available */
2004   /* FIXME: Casting to GstClockEntry only works because the types
2005    * are the same */
2006   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2007           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
2008               priv->cached_clock_id) == clock)) {
2009     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2010             time)) {
2011       gst_clock_id_unref (sink->priv->cached_clock_id);
2012       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2013     }
2014   } else {
2015     if (sink->priv->cached_clock_id != NULL)
2016       gst_clock_id_unref (sink->priv->cached_clock_id);
2017     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2018   }
2019   GST_OBJECT_UNLOCK (sink);
2020
2021   /* A blocking wait is performed on the clock. We save the ClockID
2022    * so we can unlock the entry at any time. While we are blocking, we
2023    * release the PREROLL_LOCK so that other threads can interrupt the
2024    * entry. */
2025   sink->clock_id = sink->priv->cached_clock_id;
2026   /* release the preroll lock while waiting */
2027   GST_BASE_SINK_PREROLL_UNLOCK (sink);
2028
2029   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2030
2031   GST_BASE_SINK_PREROLL_LOCK (sink);
2032   sink->clock_id = NULL;
2033
2034   return ret;
2035
2036   /* no syncing needed */
2037 invalid_time:
2038   {
2039     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2040     return GST_CLOCK_BADTIME;
2041   }
2042 no_sync:
2043   {
2044     GST_DEBUG_OBJECT (sink, "sync disabled");
2045     GST_OBJECT_UNLOCK (sink);
2046     return GST_CLOCK_BADTIME;
2047   }
2048 no_clock:
2049   {
2050     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2051     GST_OBJECT_UNLOCK (sink);
2052     return GST_CLOCK_BADTIME;
2053   }
2054 }
2055
2056 /**
2057  * gst_base_sink_wait_preroll:
2058  * @sink: the sink
2059  *
2060  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2061  * against the clock it must unblock when going from PLAYING to the PAUSED state
2062  * and call this method before continuing to render the remaining data.
2063  *
2064  * This function will block until a state change to PLAYING happens (in which
2065  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2066  * to a state change to READY or a FLUSH event (in which case this function
2067  * returns #GST_FLOW_WRONG_STATE).
2068  *
2069  * This function should only be called with the PREROLL_LOCK held, like in the
2070  * render function.
2071  *
2072  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2073  * continue. Any other return value should be returned from the render vmethod.
2074  *
2075  * Since: 0.10.11
2076  */
2077 GstFlowReturn
2078 gst_base_sink_wait_preroll (GstBaseSink * sink)
2079 {
2080   sink->have_preroll = TRUE;
2081   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2082   /* block until the state changes, or we get a flush, or something */
2083   GST_BASE_SINK_PREROLL_WAIT (sink);
2084   sink->have_preroll = FALSE;
2085   if (G_UNLIKELY (sink->flushing))
2086     goto stopping;
2087   if (G_UNLIKELY (sink->priv->step_unlock))
2088     goto step_unlocked;
2089   GST_DEBUG_OBJECT (sink, "continue after preroll");
2090
2091   return GST_FLOW_OK;
2092
2093   /* ERRORS */
2094 stopping:
2095   {
2096     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2097     return GST_FLOW_WRONG_STATE;
2098   }
2099 step_unlocked:
2100   {
2101     sink->priv->step_unlock = FALSE;
2102     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2103     return GST_FLOW_STEP;
2104   }
2105 }
2106
2107 /**
2108  * gst_base_sink_do_preroll:
2109  * @sink: the sink
2110  * @obj: (transfer none): the mini object that caused the preroll
2111  *
2112  * If the @sink spawns its own thread for pulling buffers from upstream it
2113  * should call this method after it has pulled a buffer. If the element needed
2114  * to preroll, this function will perform the preroll and will then block
2115  * until the element state is changed.
2116  *
2117  * This function should be called with the PREROLL_LOCK held.
2118  *
2119  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2120  * continue. Any other return value should be returned from the render vmethod.
2121  *
2122  * Since: 0.10.22
2123  */
2124 GstFlowReturn
2125 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2126 {
2127   GstFlowReturn ret;
2128
2129   while (G_UNLIKELY (sink->need_preroll)) {
2130     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2131
2132     /* if it's a buffer, we need to call the preroll method */
2133     if (sink->priv->call_preroll) {
2134       GstBaseSinkClass *bclass;
2135       GstBuffer *buf;
2136
2137       if (GST_IS_BUFFER_LIST (obj)) {
2138         buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
2139         g_assert (NULL != buf);
2140       } else if (GST_IS_BUFFER (obj)) {
2141         buf = GST_BUFFER_CAST (obj);
2142         /* For buffer lists do not set last buffer for now */
2143         gst_base_sink_set_last_buffer (sink, buf);
2144       } else
2145         buf = NULL;
2146
2147       if (buf) {
2148         GST_DEBUG_OBJECT (sink, "preroll buffer %" GST_TIME_FORMAT,
2149             GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
2150
2151         bclass = GST_BASE_SINK_GET_CLASS (sink);
2152         if (bclass->preroll)
2153           if ((ret = bclass->preroll (sink, buf)) != GST_FLOW_OK)
2154             goto preroll_canceled;
2155
2156         sink->priv->call_preroll = FALSE;
2157       }
2158     }
2159
2160     /* commit state */
2161     if (G_LIKELY (sink->playing_async)) {
2162       if (G_UNLIKELY (!gst_base_sink_commit_state (sink)))
2163         goto stopping;
2164     }
2165
2166     /* need to recheck here because the commit state could have
2167      * made us not need the preroll anymore */
2168     if (G_LIKELY (sink->need_preroll)) {
2169       /* block until the state changes, or we get a flush, or something */
2170       ret = gst_base_sink_wait_preroll (sink);
2171       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2172         goto preroll_failed;
2173     }
2174   }
2175   return GST_FLOW_OK;
2176
2177   /* ERRORS */
2178 preroll_canceled:
2179   {
2180     GST_DEBUG_OBJECT (sink, "preroll failed, abort state");
2181     gst_element_abort_state (GST_ELEMENT_CAST (sink));
2182     return ret;
2183   }
2184 stopping:
2185   {
2186     GST_DEBUG_OBJECT (sink, "stopping while commiting state");
2187     return GST_FLOW_WRONG_STATE;
2188   }
2189 preroll_failed:
2190   {
2191     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2192     return ret;
2193   }
2194 }
2195
2196 /**
2197  * gst_base_sink_wait_eos:
2198  * @sink: the sink
2199  * @time: the running_time to be reached
2200  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2201  *
2202  * This function will block until @time is reached. It is usually called by
2203  * subclasses that use their own internal synchronisation but want to let the
2204  * EOS be handled by the base class.
2205  *
2206  * This function should only be called with the PREROLL_LOCK held, like when
2207  * receiving an EOS event in the ::event vmethod.
2208  *
2209  * The @time argument should be the running_time of when the EOS should happen
2210  * and will be adjusted with any latency and offset configured in the sink.
2211  *
2212  * Returns: #GstFlowReturn
2213  *
2214  * Since: 0.10.15
2215  */
2216 GstFlowReturn
2217 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2218     GstClockTimeDiff * jitter)
2219 {
2220   GstClockReturn status;
2221   GstFlowReturn ret;
2222
2223   do {
2224     GstClockTime stime;
2225
2226     GST_DEBUG_OBJECT (sink, "checking preroll");
2227
2228     /* first wait for the playing state before we can continue */
2229     while (G_UNLIKELY (sink->need_preroll)) {
2230       ret = gst_base_sink_wait_preroll (sink);
2231       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2232         goto flushing;
2233     }
2234
2235     /* preroll done, we can sync since we are in PLAYING now. */
2236     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2237         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2238
2239     /* compensate for latency, ts_offset and render delay */
2240     stime = gst_base_sink_adjust_time (sink, time);
2241
2242     /* wait for the clock, this can be interrupted because we got shut down or
2243      * we PAUSED. */
2244     status = gst_base_sink_wait_clock (sink, stime, jitter);
2245
2246     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2247
2248     /* invalid time, no clock or sync disabled, just continue then */
2249     if (status == GST_CLOCK_BADTIME)
2250       break;
2251
2252     /* waiting could have been interrupted and we can be flushing now */
2253     if (G_UNLIKELY (sink->flushing))
2254       goto flushing;
2255
2256     /* retry if we got unscheduled, which means we did not reach the timeout
2257      * yet. if some other error occures, we continue. */
2258   } while (status == GST_CLOCK_UNSCHEDULED);
2259
2260   GST_DEBUG_OBJECT (sink, "end of stream");
2261
2262   return GST_FLOW_OK;
2263
2264   /* ERRORS */
2265 flushing:
2266   {
2267     GST_DEBUG_OBJECT (sink, "we are flushing");
2268     return GST_FLOW_WRONG_STATE;
2269   }
2270 }
2271
2272 /* with STREAM_LOCK, PREROLL_LOCK
2273  *
2274  * Make sure we are in PLAYING and synchronize an object to the clock.
2275  *
2276  * If we need preroll, we are not in PLAYING. We try to commit the state
2277  * if needed and then block if we still are not PLAYING.
2278  *
2279  * We start waiting on the clock in PLAYING. If we got interrupted, we
2280  * immediately try to re-preroll.
2281  *
2282  * Some objects do not need synchronisation (most events) and so this function
2283  * immediately returns GST_FLOW_OK.
2284  *
2285  * for objects that arrive later than max-lateness to be synchronized to the
2286  * clock have the @late boolean set to TRUE.
2287  *
2288  * This function keeps a running average of the jitter (the diff between the
2289  * clock time and the requested sync time). The jitter is negative for
2290  * objects that arrive in time and positive for late buffers.
2291  *
2292  * does not take ownership of obj.
2293  */
2294 static GstFlowReturn
2295 gst_base_sink_do_sync (GstBaseSink * basesink,
2296     GstMiniObject * obj, gboolean * late, gboolean * step_end)
2297 {
2298   GstClockTimeDiff jitter = 0;
2299   gboolean syncable;
2300   GstClockReturn status = GST_CLOCK_OK;
2301   GstClockTime rstart, rstop, sstart, sstop, stime;
2302   gboolean do_sync;
2303   GstBaseSinkPrivate *priv;
2304   GstFlowReturn ret;
2305   GstStepInfo *current, *pending;
2306   gboolean stepped;
2307
2308   priv = basesink->priv;
2309
2310 do_step:
2311   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2312   do_sync = TRUE;
2313   stepped = FALSE;
2314
2315   priv->current_rstart = GST_CLOCK_TIME_NONE;
2316
2317   /* get stepping info */
2318   current = &priv->current_step;
2319   pending = &priv->pending_step;
2320
2321   /* get timing information for this object against the render segment */
2322   syncable = gst_base_sink_get_sync_times (basesink, obj,
2323       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, current, step_end);
2324
2325   if (G_UNLIKELY (stepped))
2326     goto step_skipped;
2327
2328   /* a syncable object needs to participate in preroll and
2329    * clocking. All buffers and EOS are syncable. */
2330   if (G_UNLIKELY (!syncable))
2331     goto not_syncable;
2332
2333   /* store timing info for current object */
2334   priv->current_rstart = rstart;
2335   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2336
2337   /* save sync time for eos when the previous object needed sync */
2338   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2339
2340   /* calculate inter frame spacing */
2341   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2342     GstClockTime in_diff;
2343
2344     in_diff = rstart - priv->prev_rstart;
2345
2346     if (priv->avg_in_diff == -1)
2347       priv->avg_in_diff = in_diff;
2348     else
2349       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2350
2351     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2352         GST_TIME_ARGS (priv->avg_in_diff));
2353
2354   }
2355   priv->prev_rstart = rstart;
2356
2357   if (G_UNLIKELY (priv->earliest_in_time != -1
2358           && rstart < priv->earliest_in_time))
2359     goto qos_dropped;
2360
2361 again:
2362   /* first do preroll, this makes sure we commit our state
2363    * to PAUSED and can continue to PLAYING. We cannot perform
2364    * any clock sync in PAUSED because there is no clock. */
2365   ret = gst_base_sink_do_preroll (basesink, obj);
2366   if (G_UNLIKELY (ret != GST_FLOW_OK))
2367     goto preroll_failed;
2368
2369   /* update the segment with a pending step if the current one is invalid and we
2370    * have a new pending one. We only accept new step updates after a preroll */
2371   if (G_UNLIKELY (pending->valid && !current->valid)) {
2372     start_stepping (basesink, &basesink->segment, pending, current);
2373     goto do_step;
2374   }
2375
2376   /* After rendering we store the position of the last buffer so that we can use
2377    * it to report the position. We need to take the lock here. */
2378   GST_OBJECT_LOCK (basesink);
2379   priv->current_sstart = sstart;
2380   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2381   GST_OBJECT_UNLOCK (basesink);
2382
2383   if (!do_sync)
2384     goto done;
2385
2386   /* adjust for latency */
2387   stime = gst_base_sink_adjust_time (basesink, rstart);
2388
2389   /* preroll done, we can sync since we are in PLAYING now. */
2390   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2391       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2392       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2393
2394   /* This function will return immediately if start == -1, no clock
2395    * or sync is disabled with GST_CLOCK_BADTIME. */
2396   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2397
2398   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2399       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2400
2401   /* invalid time, no clock or sync disabled, just render */
2402   if (status == GST_CLOCK_BADTIME)
2403     goto done;
2404
2405   /* waiting could have been interrupted and we can be flushing now */
2406   if (G_UNLIKELY (basesink->flushing))
2407     goto flushing;
2408
2409   /* check for unlocked by a state change, we are not flushing so
2410    * we can try to preroll on the current buffer. */
2411   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2412     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2413     priv->call_preroll = TRUE;
2414     goto again;
2415   }
2416
2417   /* successful syncing done, record observation */
2418   priv->current_jitter = jitter;
2419
2420   /* check if the object should be dropped */
2421   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2422       status, jitter);
2423
2424 done:
2425   return GST_FLOW_OK;
2426
2427   /* ERRORS */
2428 step_skipped:
2429   {
2430     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2431     *late = TRUE;
2432     return GST_FLOW_OK;
2433   }
2434 not_syncable:
2435   {
2436     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2437     return GST_FLOW_OK;
2438   }
2439 qos_dropped:
2440   {
2441     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2442     *late = TRUE;
2443     return GST_FLOW_OK;
2444   }
2445 flushing:
2446   {
2447     GST_DEBUG_OBJECT (basesink, "we are flushing");
2448     return GST_FLOW_WRONG_STATE;
2449   }
2450 preroll_failed:
2451   {
2452     GST_DEBUG_OBJECT (basesink, "preroll failed");
2453     *step_end = FALSE;
2454     return ret;
2455   }
2456 }
2457
2458 static gboolean
2459 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2460     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2461 {
2462   GstEvent *event;
2463   gboolean res;
2464
2465   /* generate Quality-of-Service event */
2466   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2467       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2468       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2469
2470   event = gst_event_new_qos (type, proportion, diff, time);
2471
2472   /* send upstream */
2473   res = gst_pad_push_event (basesink->sinkpad, event);
2474
2475   return res;
2476 }
2477
2478 static void
2479 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2480 {
2481   GstBaseSinkPrivate *priv;
2482   GstClockTime start, stop;
2483   GstClockTimeDiff jitter;
2484   GstClockTime pt, entered, left;
2485   GstClockTime duration;
2486   gdouble rate;
2487
2488   priv = sink->priv;
2489
2490   start = priv->current_rstart;
2491
2492   if (priv->current_step.valid)
2493     return;
2494
2495   /* if Quality-of-Service disabled, do nothing */
2496   if (!g_atomic_int_get (&priv->qos_enabled) ||
2497       !GST_CLOCK_TIME_IS_VALID (start))
2498     return;
2499
2500   stop = priv->current_rstop;
2501   jitter = priv->current_jitter;
2502
2503   if (jitter < 0) {
2504     /* this is the time the buffer entered the sink */
2505     if (start < -jitter)
2506       entered = 0;
2507     else
2508       entered = start + jitter;
2509     left = start;
2510   } else {
2511     /* this is the time the buffer entered the sink */
2512     entered = start + jitter;
2513     /* this is the time the buffer left the sink */
2514     left = start + jitter;
2515   }
2516
2517   /* calculate duration of the buffer */
2518   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2519     duration = stop - start;
2520   else
2521     duration = priv->avg_in_diff;
2522
2523   /* if we have the time when the last buffer left us, calculate
2524    * processing time */
2525   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2526     if (entered > priv->last_left) {
2527       pt = entered - priv->last_left;
2528     } else {
2529       pt = 0;
2530     }
2531   } else {
2532     pt = priv->avg_pt;
2533   }
2534
2535   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2536       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2537       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2538       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2539       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2540       GST_TIME_ARGS (duration), jitter);
2541
2542   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2543       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2544       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2545       priv->avg_rate);
2546
2547   /* collect running averages. for first observations, we copy the
2548    * values */
2549   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2550     priv->avg_duration = duration;
2551   else
2552     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2553
2554   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2555     priv->avg_pt = pt;
2556   else
2557     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2558
2559   if (priv->avg_duration != 0)
2560     rate =
2561         gst_guint64_to_gdouble (priv->avg_pt) /
2562         gst_guint64_to_gdouble (priv->avg_duration);
2563   else
2564     rate = 1.0;
2565
2566   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2567     if (dropped || priv->avg_rate < 0.0) {
2568       priv->avg_rate = rate;
2569     } else {
2570       if (rate > 1.0)
2571         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2572       else
2573         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2574     }
2575   }
2576
2577   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2578       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2579       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2580       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2581
2582
2583   if (priv->avg_rate >= 0.0) {
2584     GstQOSType type;
2585     GstClockTimeDiff diff;
2586
2587     /* if we have a valid rate, start sending QoS messages */
2588     if (priv->current_jitter < 0) {
2589       /* make sure we never go below 0 when adding the jitter to the
2590        * timestamp. */
2591       if (priv->current_rstart < -priv->current_jitter)
2592         priv->current_jitter = -priv->current_rstart;
2593     }
2594
2595     if (priv->throttle_time > 0) {
2596       diff = priv->throttle_time;
2597       type = GST_QOS_TYPE_THROTTLE;
2598     } else {
2599       diff = priv->current_jitter;
2600       if (diff <= 0)
2601         type = GST_QOS_TYPE_OVERFLOW;
2602       else
2603         type = GST_QOS_TYPE_UNDERFLOW;
2604     }
2605
2606     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2607         diff);
2608   }
2609
2610   /* record when this buffer will leave us */
2611   priv->last_left = left;
2612 }
2613
2614 /* reset all qos measuring */
2615 static void
2616 gst_base_sink_reset_qos (GstBaseSink * sink)
2617 {
2618   GstBaseSinkPrivate *priv;
2619
2620   priv = sink->priv;
2621
2622   priv->last_render_time = GST_CLOCK_TIME_NONE;
2623   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2624   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2625   priv->last_left = GST_CLOCK_TIME_NONE;
2626   priv->avg_duration = GST_CLOCK_TIME_NONE;
2627   priv->avg_pt = GST_CLOCK_TIME_NONE;
2628   priv->avg_rate = -1.0;
2629   priv->avg_render = GST_CLOCK_TIME_NONE;
2630   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2631   priv->rendered = 0;
2632   priv->dropped = 0;
2633
2634 }
2635
2636 /* Checks if the object was scheduled too late.
2637  *
2638  * rstart/rstop contain the running_time start and stop values
2639  * of the object.
2640  *
2641  * status and jitter contain the return values from the clock wait.
2642  *
2643  * returns TRUE if the buffer was too late.
2644  */
2645 static gboolean
2646 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2647     GstClockTime rstart, GstClockTime rstop,
2648     GstClockReturn status, GstClockTimeDiff jitter)
2649 {
2650   gboolean late;
2651   guint64 max_lateness;
2652   GstBaseSinkPrivate *priv;
2653
2654   priv = basesink->priv;
2655
2656   late = FALSE;
2657
2658   /* only for objects that were too late */
2659   if (G_LIKELY (status != GST_CLOCK_EARLY))
2660     goto in_time;
2661
2662   max_lateness = basesink->max_lateness;
2663
2664   /* check if frame dropping is enabled */
2665   if (max_lateness == -1)
2666     goto no_drop;
2667
2668   /* only check for buffers */
2669   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2670     goto not_buffer;
2671
2672   /* can't do check if we don't have a timestamp */
2673   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2674     goto no_timestamp;
2675
2676   /* we can add a valid stop time */
2677   if (GST_CLOCK_TIME_IS_VALID (rstop))
2678     max_lateness += rstop;
2679   else {
2680     max_lateness += rstart;
2681     /* no stop time, use avg frame diff */
2682     if (priv->avg_in_diff != -1)
2683       max_lateness += priv->avg_in_diff;
2684   }
2685
2686   /* if the jitter bigger than duration and lateness we are too late */
2687   if ((late = rstart + jitter > max_lateness)) {
2688     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2689         "buffer is too late %" GST_TIME_FORMAT
2690         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2691         GST_TIME_ARGS (max_lateness));
2692     /* !!emergency!!, if we did not receive anything valid for more than a
2693      * second, render it anyway so the user sees something */
2694     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2695         rstart - priv->last_render_time > GST_SECOND) {
2696       late = FALSE;
2697       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2698           (_("A lot of buffers are being dropped.")),
2699           ("There may be a timestamping problem, or this computer is too slow."));
2700       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2701           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2702           GST_TIME_ARGS (priv->last_render_time));
2703     }
2704   }
2705
2706 done:
2707   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2708     priv->last_render_time = rstart;
2709     /* the next allowed input timestamp */
2710     if (priv->throttle_time > 0)
2711       priv->earliest_in_time = rstart + priv->throttle_time;
2712   }
2713   return late;
2714
2715   /* all is fine */
2716 in_time:
2717   {
2718     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2719     goto done;
2720   }
2721 no_drop:
2722   {
2723     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2724     goto done;
2725   }
2726 not_buffer:
2727   {
2728     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2729     return FALSE;
2730   }
2731 no_timestamp:
2732   {
2733     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2734     return FALSE;
2735   }
2736 }
2737
2738 /* called before and after calling the render vmethod. It keeps track of how
2739  * much time was spent in the render method and is used to check if we are
2740  * flooded */
2741 static void
2742 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2743 {
2744   GstBaseSinkPrivate *priv;
2745
2746   priv = basesink->priv;
2747
2748   if (start) {
2749     priv->start = gst_util_get_timestamp ();
2750   } else {
2751     GstClockTime elapsed;
2752
2753     priv->stop = gst_util_get_timestamp ();
2754
2755     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2756
2757     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2758       priv->avg_render = elapsed;
2759     else
2760       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2761
2762     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2763         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2764   }
2765 }
2766
2767 static void
2768 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2769 {
2770   /* make sure we are not blocked on the clock also clear any pending
2771    * eos state. */
2772   gst_base_sink_set_flushing (basesink, pad, TRUE);
2773
2774   /* we grab the stream lock but that is not needed since setting the
2775    * sink to flushing would make sure no state commit is being done
2776    * anymore */
2777   GST_PAD_STREAM_LOCK (pad);
2778   gst_base_sink_reset_qos (basesink);
2779   /* and we need to commit our state again on the next
2780    * prerolled buffer */
2781   basesink->playing_async = TRUE;
2782   if (basesink->priv->async_enabled) {
2783     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2784   } else {
2785     /* start time reset in above case as well;
2786      * arranges for a.o. proper position reporting when flushing in PAUSED */
2787     gst_element_set_start_time (GST_ELEMENT_CAST (basesink), 0);
2788     basesink->priv->have_latency = TRUE;
2789   }
2790   gst_base_sink_set_last_buffer (basesink, NULL);
2791   GST_PAD_STREAM_UNLOCK (pad);
2792 }
2793
2794 static void
2795 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad,
2796     gboolean reset_time)
2797 {
2798   /* unset flushing so we can accept new data, this also flushes out any EOS
2799    * event. */
2800   gst_base_sink_set_flushing (basesink, pad, FALSE);
2801
2802   /* for position reporting */
2803   GST_OBJECT_LOCK (basesink);
2804   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
2805   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
2806   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
2807   basesink->priv->call_preroll = TRUE;
2808   basesink->priv->current_step.valid = FALSE;
2809   basesink->priv->pending_step.valid = FALSE;
2810   if (basesink->pad_mode == GST_PAD_MODE_PUSH) {
2811     /* we need new segment info after the flush. */
2812     basesink->have_newsegment = FALSE;
2813     if (reset_time) {
2814       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2815     }
2816   }
2817   basesink->priv->reset_time = reset_time;
2818   GST_OBJECT_UNLOCK (basesink);
2819 }
2820
2821 static GstFlowReturn
2822 gst_base_sink_default_wait_eos (GstBaseSink * basesink, GstEvent * event)
2823 {
2824   GstFlowReturn ret;
2825   gboolean late, step_end;
2826
2827   ret = gst_base_sink_do_sync (basesink, GST_MINI_OBJECT_CAST (event),
2828       &late, &step_end);
2829
2830   return ret;
2831 }
2832
2833 static gboolean
2834 gst_base_sink_default_event (GstBaseSink * basesink, GstEvent * event)
2835 {
2836   gboolean result = TRUE;
2837   GstBaseSinkClass *bclass;
2838
2839   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2840
2841   switch (GST_EVENT_TYPE (event)) {
2842     case GST_EVENT_FLUSH_START:
2843     {
2844       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2845       gst_base_sink_flush_start (basesink, basesink->sinkpad);
2846       break;
2847     }
2848     case GST_EVENT_FLUSH_STOP:
2849     {
2850       gboolean reset_time;
2851
2852       gst_event_parse_flush_stop (event, &reset_time);
2853       GST_DEBUG_OBJECT (basesink, "flush-stop %p, reset_time: %d", event,
2854           reset_time);
2855       gst_base_sink_flush_stop (basesink, basesink->sinkpad, reset_time);
2856       break;
2857     }
2858     case GST_EVENT_EOS:
2859     {
2860       GstMessage *message;
2861       guint32 seqnum;
2862
2863       /* we set the received EOS flag here so that we can use it when testing if
2864        * we are prerolled and to refuse more buffers. */
2865       basesink->priv->received_eos = TRUE;
2866
2867       /* wait for EOS */
2868       if (G_LIKELY (bclass->wait_eos)) {
2869         GstFlowReturn ret;
2870
2871         ret = bclass->wait_eos (basesink, event);
2872         if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2873           result = FALSE;
2874           goto done;
2875         }
2876       }
2877
2878       /* the EOS event is completely handled so we mark
2879        * ourselves as being in the EOS state. eos is also
2880        * protected by the object lock so we can read it when
2881        * answering the POSITION query. */
2882       GST_OBJECT_LOCK (basesink);
2883       basesink->eos = TRUE;
2884       GST_OBJECT_UNLOCK (basesink);
2885
2886       /* ok, now we can post the message */
2887       GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2888
2889       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2890       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2891
2892       message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2893       gst_message_set_seqnum (message, seqnum);
2894       gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2895       break;
2896     }
2897     case GST_EVENT_CAPS:
2898     {
2899       GstCaps *caps;
2900
2901       GST_DEBUG_OBJECT (basesink, "caps %p", event);
2902
2903       gst_event_parse_caps (event, &caps);
2904       if (bclass->set_caps)
2905         result = bclass->set_caps (basesink, caps);
2906
2907       if (result) {
2908         GST_OBJECT_LOCK (basesink);
2909         gst_caps_replace (&basesink->priv->caps, caps);
2910         GST_OBJECT_UNLOCK (basesink);
2911       }
2912       break;
2913     }
2914     case GST_EVENT_SEGMENT:
2915       /* configure the segment */
2916       /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
2917        * We protect with the OBJECT_LOCK so that we can use the values to
2918        * safely answer a POSITION query. */
2919       GST_OBJECT_LOCK (basesink);
2920       /* the newsegment event is needed to bring the buffer timestamps to the
2921        * stream time and to drop samples outside of the playback segment. */
2922       gst_event_copy_segment (event, &basesink->segment);
2923       GST_DEBUG_OBJECT (basesink, "configured SEGMENT %" GST_SEGMENT_FORMAT,
2924           &basesink->segment);
2925       basesink->have_newsegment = TRUE;
2926       GST_OBJECT_UNLOCK (basesink);
2927       break;
2928     case GST_EVENT_TAG:
2929     {
2930       GstTagList *taglist;
2931
2932       gst_event_parse_tag (event, &taglist);
2933
2934       gst_element_post_message (GST_ELEMENT_CAST (basesink),
2935           gst_message_new_tag (GST_OBJECT_CAST (basesink),
2936               gst_tag_list_copy (taglist)));
2937       break;
2938     }
2939     case GST_EVENT_SINK_MESSAGE:
2940     {
2941       GstMessage *msg = NULL;
2942
2943       gst_event_parse_sink_message (event, &msg);
2944       if (msg)
2945         gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
2946       break;
2947     }
2948     default:
2949       break;
2950   }
2951 done:
2952   gst_event_unref (event);
2953
2954   return result;
2955 }
2956
2957 static gboolean
2958 gst_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
2959 {
2960   GstBaseSink *basesink;
2961   gboolean result = TRUE;
2962   GstBaseSinkClass *bclass;
2963
2964   basesink = GST_BASE_SINK_CAST (parent);
2965   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2966
2967   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
2968       event);
2969
2970   switch (GST_EVENT_TYPE (event)) {
2971     case GST_EVENT_FLUSH_STOP:
2972       /* special case for this serialized event because we don't want to grab
2973        * the PREROLL lock or check if we were flushing */
2974       if (bclass->event)
2975         result = bclass->event (basesink, event);
2976       break;
2977     default:
2978       if (GST_EVENT_IS_SERIALIZED (event)) {
2979         GST_BASE_SINK_PREROLL_LOCK (basesink);
2980         if (G_UNLIKELY (basesink->flushing))
2981           goto flushing;
2982
2983         if (G_UNLIKELY (basesink->priv->received_eos))
2984           goto after_eos;
2985
2986         if (bclass->event)
2987           result = bclass->event (basesink, event);
2988
2989         GST_BASE_SINK_PREROLL_UNLOCK (basesink);
2990       } else {
2991         if (bclass->event)
2992           result = bclass->event (basesink, event);
2993       }
2994       break;
2995   }
2996 done:
2997   return result;
2998
2999   /* ERRORS */
3000 flushing:
3001   {
3002     GST_DEBUG_OBJECT (basesink, "we are flushing");
3003     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3004     gst_event_unref (event);
3005     result = FALSE;
3006     goto done;
3007   }
3008
3009 after_eos:
3010   {
3011     GST_DEBUG_OBJECT (basesink, "Event received after EOS, dropping");
3012     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3013     gst_event_unref (event);
3014     result = FALSE;
3015     goto done;
3016   }
3017 }
3018
3019 /* default implementation to calculate the start and end
3020  * timestamps on a buffer, subclasses can override
3021  */
3022 static void
3023 gst_base_sink_default_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3024     GstClockTime * start, GstClockTime * end)
3025 {
3026   GstClockTime timestamp, duration;
3027
3028   /* first sync on DTS, else use PTS */
3029   timestamp = GST_BUFFER_DTS (buffer);
3030   if (!GST_CLOCK_TIME_IS_VALID (timestamp))
3031     timestamp = GST_BUFFER_PTS (buffer);
3032
3033   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3034     /* get duration to calculate end time */
3035     duration = GST_BUFFER_DURATION (buffer);
3036     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3037       *end = timestamp + duration;
3038     }
3039     *start = timestamp;
3040   }
3041 }
3042
3043 /* must be called with PREROLL_LOCK */
3044 static gboolean
3045 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3046 {
3047   gboolean is_prerolled, res;
3048
3049   /* we have 2 cases where the PREROLL_LOCK is released:
3050    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3051    *  2) we are syncing on the clock
3052    */
3053   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3054   res = !is_prerolled;
3055
3056   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3057       basesink->have_preroll, basesink->priv->received_eos, res);
3058
3059   return res;
3060 }
3061
3062 /* with STREAM_LOCK, PREROLL_LOCK
3063  *
3064  * Takes a buffer and compare the timestamps with the last segment.
3065  * If the buffer falls outside of the segment boundaries, drop it.
3066  * Else send the buffer for preroll and rendering.
3067  *
3068  * This function takes ownership of the buffer.
3069  */
3070 static GstFlowReturn
3071 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3072     gpointer obj)
3073 {
3074   GstBaseSinkClass *bclass;
3075   GstBaseSinkPrivate *priv = basesink->priv;
3076   GstFlowReturn ret;
3077   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3078   GstSegment *segment;
3079   GstBuffer *sync_buf;
3080   gint do_qos;
3081   gboolean late, step_end;
3082
3083   if (G_UNLIKELY (basesink->flushing))
3084     goto flushing;
3085
3086   if (G_UNLIKELY (priv->received_eos))
3087     goto was_eos;
3088
3089   if (GST_IS_BUFFER_LIST (obj)) {
3090     sync_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
3091     g_assert (NULL != sync_buf);
3092   } else {
3093     sync_buf = GST_BUFFER_CAST (obj);
3094   }
3095
3096   /* for code clarity */
3097   segment = &basesink->segment;
3098
3099   if (G_UNLIKELY (!basesink->have_newsegment)) {
3100     gboolean sync;
3101
3102     sync = gst_base_sink_get_sync (basesink);
3103     if (sync) {
3104       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3105           (_("Internal data flow problem.")),
3106           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3107     }
3108
3109     /* this means this sink will assume timestamps start from 0 */
3110     GST_OBJECT_LOCK (basesink);
3111     segment->start = 0;
3112     segment->stop = -1;
3113     basesink->segment.start = 0;
3114     basesink->segment.stop = -1;
3115     basesink->have_newsegment = TRUE;
3116     GST_OBJECT_UNLOCK (basesink);
3117   }
3118
3119   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3120
3121   /* check if the buffer needs to be dropped, we first ask the subclass for the
3122    * start and end */
3123   if (bclass->get_times)
3124     bclass->get_times (basesink, sync_buf, &start, &end);
3125
3126   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3127     /* if the subclass does not want sync, we use our own values so that we at
3128      * least clip the buffer to the segment */
3129     gst_base_sink_default_get_times (basesink, sync_buf, &start, &end);
3130   }
3131
3132   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3133       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3134
3135   /* a dropped buffer does not participate in anything */
3136   if (GST_CLOCK_TIME_IS_VALID (start) && (segment->format == GST_FORMAT_TIME)) {
3137     if (G_UNLIKELY (!gst_segment_clip (segment,
3138                 GST_FORMAT_TIME, start, end, NULL, NULL)))
3139       goto out_of_segment;
3140   }
3141
3142 again:
3143   late = FALSE;
3144   step_end = FALSE;
3145
3146   /* synchronize this object, non syncable objects return OK
3147    * immediately. */
3148   ret = gst_base_sink_do_sync (basesink, GST_MINI_OBJECT_CAST (sync_buf),
3149       &late, &step_end);
3150   if (G_UNLIKELY (ret != GST_FLOW_OK))
3151     goto sync_failed;
3152
3153   /* drop late buffers unconditionally, let's hope it's unlikely */
3154   if (G_UNLIKELY (late))
3155     goto dropped;
3156
3157   /* read once, to get same value before and after */
3158   do_qos = g_atomic_int_get (&priv->qos_enabled);
3159
3160   GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
3161
3162   /* record rendering time for QoS and stats */
3163   if (do_qos)
3164     gst_base_sink_do_render_stats (basesink, TRUE);
3165
3166   if (!GST_IS_BUFFER_LIST (obj)) {
3167     /* For buffer lists do not set last buffer for now. */
3168     gst_base_sink_set_last_buffer (basesink, GST_BUFFER_CAST (obj));
3169
3170     if (bclass->render)
3171       ret = bclass->render (basesink, GST_BUFFER_CAST (obj));
3172   } else {
3173     if (bclass->render_list)
3174       ret = bclass->render_list (basesink, GST_BUFFER_LIST_CAST (obj));
3175   }
3176
3177   if (do_qos)
3178     gst_base_sink_do_render_stats (basesink, FALSE);
3179
3180   if (ret == GST_FLOW_STEP)
3181     goto again;
3182
3183   if (G_UNLIKELY (basesink->flushing))
3184     goto flushing;
3185
3186   priv->rendered++;
3187
3188 done:
3189   if (step_end) {
3190     /* the step ended, check if we need to activate a new step */
3191     GST_DEBUG_OBJECT (basesink, "step ended");
3192     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3193         priv->current_rstart, priv->current_rstop, basesink->eos);
3194     goto again;
3195   }
3196
3197   gst_base_sink_perform_qos (basesink, late);
3198
3199   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3200   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3201
3202   return ret;
3203
3204   /* ERRORS */
3205 flushing:
3206   {
3207     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3208     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3209     return GST_FLOW_WRONG_STATE;
3210   }
3211 was_eos:
3212   {
3213     GST_DEBUG_OBJECT (basesink, "we are EOS, dropping object, return EOS");
3214     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3215     return GST_FLOW_EOS;
3216   }
3217 out_of_segment:
3218   {
3219     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3220     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3221     return GST_FLOW_OK;
3222   }
3223 sync_failed:
3224   {
3225     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3226     goto done;
3227   }
3228 dropped:
3229   {
3230     priv->dropped++;
3231     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3232
3233     if (g_atomic_int_get (&priv->qos_enabled)) {
3234       GstMessage *qos_msg;
3235       GstClockTime timestamp, duration;
3236
3237       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_buf));
3238       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_buf));
3239
3240       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3241           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3242           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3243           GST_TIME_ARGS (priv->current_rstart),
3244           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3245           GST_TIME_ARGS (duration));
3246       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3247           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3248           priv->rendered, priv->dropped);
3249
3250       qos_msg =
3251           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3252           priv->current_rstart, priv->current_sstart, timestamp, duration);
3253       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3254           1000000);
3255       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3256           priv->dropped);
3257       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3258     }
3259     goto done;
3260   }
3261 }
3262
3263 /* with STREAM_LOCK
3264  */
3265 static GstFlowReturn
3266 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad, gpointer obj)
3267 {
3268   GstFlowReturn result;
3269
3270   if (G_UNLIKELY (basesink->pad_mode != GST_PAD_MODE_PUSH))
3271     goto wrong_mode;
3272
3273   GST_BASE_SINK_PREROLL_LOCK (basesink);
3274   result = gst_base_sink_chain_unlocked (basesink, pad, obj);
3275   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3276
3277 done:
3278   return result;
3279
3280   /* ERRORS */
3281 wrong_mode:
3282   {
3283     GST_OBJECT_LOCK (pad);
3284     GST_WARNING_OBJECT (basesink,
3285         "Push on pad %s:%s, but it was not activated in push mode",
3286         GST_DEBUG_PAD_NAME (pad));
3287     GST_OBJECT_UNLOCK (pad);
3288     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3289     /* we don't post an error message this will signal to the peer
3290      * pushing that EOS is reached. */
3291     result = GST_FLOW_EOS;
3292     goto done;
3293   }
3294 }
3295
3296 static GstFlowReturn
3297 gst_base_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
3298 {
3299   GstBaseSink *basesink;
3300
3301   basesink = GST_BASE_SINK (parent);
3302
3303   return gst_base_sink_chain_main (basesink, pad, buf);
3304 }
3305
3306 static GstFlowReturn
3307 gst_base_sink_chain_list (GstPad * pad, GstObject * parent,
3308     GstBufferList * list)
3309 {
3310   GstBaseSink *basesink;
3311   GstBaseSinkClass *bclass;
3312   GstFlowReturn result;
3313
3314   basesink = GST_BASE_SINK (parent);
3315   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3316
3317   if (G_LIKELY (bclass->render_list)) {
3318     result = gst_base_sink_chain_main (basesink, pad, list);
3319   } else {
3320     guint i, len;
3321     GstBuffer *buffer;
3322
3323     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3324
3325     len = gst_buffer_list_length (list);
3326
3327     result = GST_FLOW_OK;
3328     for (i = 0; i < len; i++) {
3329       buffer = gst_buffer_list_get (list, 0);
3330       result = gst_base_sink_chain_main (basesink, pad,
3331           gst_buffer_ref (buffer));
3332       if (result != GST_FLOW_OK)
3333         break;
3334     }
3335     gst_buffer_list_unref (list);
3336   }
3337   return result;
3338 }
3339
3340
3341 static gboolean
3342 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3343 {
3344   gboolean res = TRUE;
3345
3346   /* update our offset if the start/stop position was updated */
3347   if (segment->format == GST_FORMAT_BYTES) {
3348     segment->time = segment->start;
3349   } else if (segment->start == 0) {
3350     /* seek to start, we can implement a default for this. */
3351     segment->time = 0;
3352   } else {
3353     res = FALSE;
3354     GST_INFO_OBJECT (sink, "Can't do a default seek");
3355   }
3356
3357   return res;
3358 }
3359
3360 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3361
3362 static gboolean
3363 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3364     GstEvent * event, GstSegment * segment)
3365 {
3366   /* By default, we try one of 2 things:
3367    *   - For absolute seek positions, convert the requested position to our
3368    *     configured processing format and place it in the output segment \
3369    *   - For relative seek positions, convert our current (input) values to the
3370    *     seek format, adjust by the relative seek offset and then convert back to
3371    *     the processing format
3372    */
3373   GstSeekType cur_type, stop_type;
3374   gint64 cur, stop;
3375   GstSeekFlags flags;
3376   GstFormat seek_format;
3377   gdouble rate;
3378   gboolean update;
3379   gboolean res = TRUE;
3380
3381   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3382       &cur_type, &cur, &stop_type, &stop);
3383
3384   if (seek_format == segment->format) {
3385     gst_segment_do_seek (segment, rate, seek_format, flags,
3386         cur_type, cur, stop_type, stop, &update);
3387     return TRUE;
3388   }
3389
3390   if (cur_type != GST_SEEK_TYPE_NONE) {
3391     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3392     res =
3393         gst_pad_query_convert (sink->sinkpad, seek_format, cur, segment->format,
3394         &cur);
3395     cur_type = GST_SEEK_TYPE_SET;
3396   }
3397
3398   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3399     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3400     res =
3401         gst_pad_query_convert (sink->sinkpad, seek_format, stop,
3402         segment->format, &stop);
3403     stop_type = GST_SEEK_TYPE_SET;
3404   }
3405
3406   /* And finally, configure our output segment in the desired format */
3407   gst_segment_do_seek (segment, rate, segment->format, flags, cur_type, cur,
3408       stop_type, stop, &update);
3409
3410   if (!res)
3411     goto no_format;
3412
3413   return res;
3414
3415 no_format:
3416   {
3417     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3418     return FALSE;
3419   }
3420 }
3421
3422 /* perform a seek, only executed in pull mode */
3423 static gboolean
3424 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3425 {
3426   gboolean flush;
3427   gdouble rate;
3428   GstFormat seek_format, dest_format;
3429   GstSeekFlags flags;
3430   GstSeekType cur_type, stop_type;
3431   gboolean seekseg_configured = FALSE;
3432   gint64 cur, stop;
3433   gboolean update, res = TRUE;
3434   GstSegment seeksegment;
3435
3436   dest_format = sink->segment.format;
3437
3438   if (event) {
3439     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3440     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3441         &cur_type, &cur, &stop_type, &stop);
3442
3443     flush = flags & GST_SEEK_FLAG_FLUSH;
3444   } else {
3445     GST_DEBUG_OBJECT (sink, "performing seek without event");
3446     flush = FALSE;
3447   }
3448
3449   if (flush) {
3450     GST_DEBUG_OBJECT (sink, "flushing upstream");
3451     gst_pad_push_event (pad, gst_event_new_flush_start ());
3452     gst_base_sink_flush_start (sink, pad);
3453   } else {
3454     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3455   }
3456
3457   GST_PAD_STREAM_LOCK (pad);
3458
3459   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3460    * copy the current segment info into the temp segment that we can actually
3461    * attempt the seek with. We only update the real segment if the seek succeeds. */
3462   if (!seekseg_configured) {
3463     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3464
3465     /* now configure the final seek segment */
3466     if (event) {
3467       if (sink->segment.format != seek_format) {
3468         /* OK, here's where we give the subclass a chance to convert the relative
3469          * seek into an absolute one in the processing format. We set up any
3470          * absolute seek above, before taking the stream lock. */
3471         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3472                 &seeksegment)) {
3473           GST_DEBUG_OBJECT (sink,
3474               "Preparing the seek failed after flushing. " "Aborting seek");
3475           res = FALSE;
3476         }
3477       } else {
3478         /* The seek format matches our processing format, no need to ask the
3479          * the subclass to configure the segment. */
3480         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
3481             cur_type, cur, stop_type, stop, &update);
3482       }
3483     }
3484     /* Else, no seek event passed, so we're just (re)starting the
3485        current segment. */
3486   }
3487
3488   if (res) {
3489     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3490         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3491         seeksegment.start, seeksegment.stop, seeksegment.position);
3492
3493     /* do the seek, segment.position contains the new position. */
3494     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3495   }
3496
3497
3498   if (flush) {
3499     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3500     gst_pad_push_event (pad, gst_event_new_flush_stop (TRUE));
3501     gst_base_sink_flush_stop (sink, pad, TRUE);
3502   } else if (res && sink->running) {
3503     /* we are running the current segment and doing a non-flushing seek,
3504      * close the segment first based on the position. */
3505     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3506         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.position);
3507   }
3508
3509   /* The subclass must have converted the segment to the processing format
3510    * by now */
3511   if (res && seeksegment.format != dest_format) {
3512     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3513         "in the correct format. Aborting seek.");
3514     res = FALSE;
3515   }
3516
3517   /* if successful seek, we update our real segment and push
3518    * out the new segment. */
3519   if (res) {
3520     gst_segment_copy_into (&seeksegment, &sink->segment);
3521
3522     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3523       gst_element_post_message (GST_ELEMENT (sink),
3524           gst_message_new_segment_start (GST_OBJECT (sink),
3525               sink->segment.format, sink->segment.position));
3526     }
3527   }
3528
3529   sink->priv->discont = TRUE;
3530   sink->running = TRUE;
3531
3532   GST_PAD_STREAM_UNLOCK (pad);
3533
3534   return res;
3535 }
3536
3537 static void
3538 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3539     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3540     gboolean flush, gboolean intermediate)
3541 {
3542   GST_OBJECT_LOCK (sink);
3543   pending->seqnum = seqnum;
3544   pending->format = format;
3545   pending->amount = amount;
3546   pending->position = 0;
3547   pending->rate = rate;
3548   pending->flush = flush;
3549   pending->intermediate = intermediate;
3550   pending->valid = TRUE;
3551   /* flush invalidates the current stepping segment */
3552   if (flush)
3553     current->valid = FALSE;
3554   GST_OBJECT_UNLOCK (sink);
3555 }
3556
3557 static gboolean
3558 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3559 {
3560   GstBaseSinkPrivate *priv;
3561   GstBaseSinkClass *bclass;
3562   gboolean flush, intermediate;
3563   gdouble rate;
3564   GstFormat format;
3565   guint64 amount;
3566   guint seqnum;
3567   GstStepInfo *pending, *current;
3568   GstMessage *message;
3569
3570   bclass = GST_BASE_SINK_GET_CLASS (sink);
3571   priv = sink->priv;
3572
3573   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3574
3575   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3576   seqnum = gst_event_get_seqnum (event);
3577
3578   pending = &priv->pending_step;
3579   current = &priv->current_step;
3580
3581   /* post message first */
3582   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3583       amount, rate, flush, intermediate);
3584   gst_message_set_seqnum (message, seqnum);
3585   gst_element_post_message (GST_ELEMENT (sink), message);
3586
3587   if (flush) {
3588     /* we need to call ::unlock before locking PREROLL_LOCK
3589      * since we lock it before going into ::render */
3590     if (bclass->unlock)
3591       bclass->unlock (sink);
3592
3593     GST_BASE_SINK_PREROLL_LOCK (sink);
3594     /* now that we have the PREROLL lock, clear our unlock request */
3595     if (bclass->unlock_stop)
3596       bclass->unlock_stop (sink);
3597
3598     /* update the stepinfo and make it valid */
3599     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3600         intermediate);
3601
3602     if (sink->priv->async_enabled) {
3603       /* and we need to commit our state again on the next
3604        * prerolled buffer */
3605       sink->playing_async = TRUE;
3606       priv->pending_step.need_preroll = TRUE;
3607       sink->need_preroll = FALSE;
3608       gst_element_lost_state (GST_ELEMENT_CAST (sink));
3609     } else {
3610       sink->priv->have_latency = TRUE;
3611       sink->need_preroll = FALSE;
3612     }
3613     priv->current_sstart = GST_CLOCK_TIME_NONE;
3614     priv->current_sstop = GST_CLOCK_TIME_NONE;
3615     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3616     priv->call_preroll = TRUE;
3617     gst_base_sink_set_last_buffer (sink, NULL);
3618     gst_base_sink_reset_qos (sink);
3619
3620     if (sink->clock_id) {
3621       gst_clock_id_unschedule (sink->clock_id);
3622     }
3623
3624     if (sink->have_preroll) {
3625       GST_DEBUG_OBJECT (sink, "signal waiter");
3626       priv->step_unlock = TRUE;
3627       GST_BASE_SINK_PREROLL_SIGNAL (sink);
3628     }
3629     GST_BASE_SINK_PREROLL_UNLOCK (sink);
3630   } else {
3631     /* update the stepinfo and make it valid */
3632     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3633         intermediate);
3634   }
3635
3636   return TRUE;
3637 }
3638
3639 /* with STREAM_LOCK
3640  */
3641 static void
3642 gst_base_sink_loop (GstPad * pad)
3643 {
3644   GstObject *parent;
3645   GstBaseSink *basesink;
3646   GstBuffer *buf = NULL;
3647   GstFlowReturn result;
3648   guint blocksize;
3649   guint64 offset;
3650
3651   parent = GST_OBJECT_PARENT (pad);
3652   basesink = GST_BASE_SINK (parent);
3653
3654   g_assert (basesink->pad_mode == GST_PAD_MODE_PULL);
3655
3656   if ((blocksize = basesink->priv->blocksize) == 0)
3657     blocksize = -1;
3658
3659   offset = basesink->segment.position;
3660
3661   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3662       offset, blocksize);
3663
3664   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3665   if (G_UNLIKELY (result != GST_FLOW_OK))
3666     goto paused;
3667
3668   if (G_UNLIKELY (buf == NULL))
3669     goto no_buffer;
3670
3671   offset += gst_buffer_get_size (buf);
3672
3673   basesink->segment.position = offset;
3674
3675   GST_BASE_SINK_PREROLL_LOCK (basesink);
3676   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3677   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3678   if (G_UNLIKELY (result != GST_FLOW_OK))
3679     goto paused;
3680
3681   return;
3682
3683   /* ERRORS */
3684 paused:
3685   {
3686     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3687         gst_flow_get_name (result));
3688     gst_pad_pause_task (pad);
3689     if (result == GST_FLOW_EOS) {
3690       /* perform EOS logic */
3691       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3692         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3693             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3694                 basesink->segment.format, basesink->segment.position));
3695       } else {
3696         gst_base_sink_event (pad, parent, gst_event_new_eos ());
3697       }
3698     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_EOS) {
3699       /* for fatal errors we post an error message, post the error
3700        * first so the app knows about the error first. 
3701        * wrong-state is not a fatal error because it happens due to
3702        * flushing and posting an error message in that case is the
3703        * wrong thing to do, e.g. when basesrc is doing a flushing
3704        * seek. */
3705       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3706           (_("Internal data stream error.")),
3707           ("stream stopped, reason %s", gst_flow_get_name (result)));
3708       gst_base_sink_event (pad, parent, gst_event_new_eos ());
3709     }
3710     return;
3711   }
3712 no_buffer:
3713   {
3714     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3715     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3716         (_("Internal data flow error.")), ("element returned NULL buffer"));
3717     result = GST_FLOW_ERROR;
3718     goto paused;
3719   }
3720 }
3721
3722 static gboolean
3723 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3724     gboolean flushing)
3725 {
3726   GstBaseSinkClass *bclass;
3727
3728   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3729
3730   if (flushing) {
3731     /* unlock any subclasses, we need to do this before grabbing the
3732      * PREROLL_LOCK since we hold this lock before going into ::render. */
3733     if (bclass->unlock)
3734       bclass->unlock (basesink);
3735   }
3736
3737   GST_BASE_SINK_PREROLL_LOCK (basesink);
3738   basesink->flushing = flushing;
3739   if (flushing) {
3740     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3741     if (bclass->unlock_stop)
3742       bclass->unlock_stop (basesink);
3743
3744     /* set need_preroll before we unblock the clock. If the clock is unblocked
3745      * before timing out, we can reuse the buffer for preroll. */
3746     basesink->need_preroll = TRUE;
3747
3748     /* step 2, unblock clock sync (if any) or any other blocking thing */
3749     if (basesink->clock_id) {
3750       gst_clock_id_unschedule (basesink->clock_id);
3751     }
3752
3753     /* flush out the data thread if it's locked in finish_preroll, this will
3754      * also flush out the EOS state */
3755     GST_DEBUG_OBJECT (basesink,
3756         "flushing out data thread, need preroll to TRUE");
3757
3758     /* we can't have EOS anymore now */
3759     basesink->eos = FALSE;
3760     basesink->priv->received_eos = FALSE;
3761     basesink->have_preroll = FALSE;
3762     basesink->priv->step_unlock = FALSE;
3763     /* can't report latency anymore until we preroll again */
3764     if (basesink->priv->async_enabled) {
3765       GST_OBJECT_LOCK (basesink);
3766       basesink->priv->have_latency = FALSE;
3767       GST_OBJECT_UNLOCK (basesink);
3768     }
3769     /* and signal any waiters now */
3770     GST_BASE_SINK_PREROLL_SIGNAL (basesink);
3771   }
3772   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3773
3774   return TRUE;
3775 }
3776
3777 static gboolean
3778 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3779 {
3780   gboolean result;
3781
3782   if (active) {
3783     /* start task */
3784     result = gst_pad_start_task (basesink->sinkpad,
3785         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3786   } else {
3787     /* step 2, make sure streaming finishes */
3788     result = gst_pad_stop_task (basesink->sinkpad);
3789   }
3790
3791   return result;
3792 }
3793
3794 static gboolean
3795 gst_base_sink_pad_activate (GstPad * pad, GstObject * parent)
3796 {
3797   gboolean result = FALSE;
3798   GstBaseSink *basesink;
3799   GstQuery *query;
3800   gboolean pull_mode;
3801
3802   basesink = GST_BASE_SINK (parent);
3803
3804   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3805
3806   gst_base_sink_set_flushing (basesink, pad, FALSE);
3807
3808   /* we need to have the pull mode enabled */
3809   if (!basesink->can_activate_pull) {
3810     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3811     goto fallback;
3812   }
3813
3814   /* check if downstreams supports pull mode at all */
3815   query = gst_query_new_scheduling ();
3816
3817   if (!gst_pad_peer_query (pad, query)) {
3818     gst_query_unref (query);
3819     GST_DEBUG_OBJECT (basesink, "peer query faild, no pull mode");
3820     goto fallback;
3821   }
3822
3823   /* parse result of the query */
3824   pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
3825   gst_query_unref (query);
3826
3827   if (!pull_mode) {
3828     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3829     goto fallback;
3830   }
3831
3832   /* set the pad mode before starting the task so that it's in the
3833    * correct state for the new thread. also the sink set_caps and get_caps
3834    * function checks this */
3835   basesink->pad_mode = GST_PAD_MODE_PULL;
3836
3837   /* we first try to negotiate a format so that when we try to activate
3838    * downstream, it knows about our format */
3839   if (!gst_base_sink_negotiate_pull (basesink)) {
3840     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3841     goto fallback;
3842   }
3843
3844   /* ok activate now */
3845   if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE)) {
3846     /* clear any pending caps */
3847     GST_OBJECT_LOCK (basesink);
3848     gst_caps_replace (&basesink->priv->caps, NULL);
3849     GST_OBJECT_UNLOCK (basesink);
3850     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3851     goto fallback;
3852   }
3853
3854   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3855   result = TRUE;
3856   goto done;
3857
3858   /* push mode fallback */
3859 fallback:
3860   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3861   if ((result = gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE))) {
3862     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3863   }
3864
3865 done:
3866   if (!result) {
3867     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3868     gst_base_sink_set_flushing (basesink, pad, TRUE);
3869   }
3870
3871   return result;
3872 }
3873
3874 static gboolean
3875 gst_base_sink_pad_activate_push (GstPad * pad, GstObject * parent,
3876     gboolean active)
3877 {
3878   gboolean result;
3879   GstBaseSink *basesink;
3880
3881   basesink = GST_BASE_SINK (parent);
3882
3883   if (active) {
3884     if (!basesink->can_activate_push) {
3885       result = FALSE;
3886       basesink->pad_mode = GST_PAD_MODE_NONE;
3887     } else {
3888       result = TRUE;
3889       basesink->pad_mode = GST_PAD_MODE_PUSH;
3890     }
3891   } else {
3892     if (G_UNLIKELY (basesink->pad_mode != GST_PAD_MODE_PUSH)) {
3893       g_warning ("Internal GStreamer activation error!!!");
3894       result = FALSE;
3895     } else {
3896       gst_base_sink_set_flushing (basesink, pad, TRUE);
3897       result = TRUE;
3898       basesink->pad_mode = GST_PAD_MODE_NONE;
3899     }
3900   }
3901
3902   return result;
3903 }
3904
3905 static gboolean
3906 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3907 {
3908   GstCaps *caps;
3909   gboolean result;
3910
3911   result = FALSE;
3912
3913   /* this returns the intersection between our caps and the peer caps. If there
3914    * is no peer, it returns NULL and we can't operate in pull mode so we can
3915    * fail the negotiation. */
3916   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3917   if (caps == NULL || gst_caps_is_empty (caps))
3918     goto no_caps_possible;
3919
3920   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3921
3922   if (gst_caps_is_any (caps)) {
3923     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3924         "allowing pull()");
3925     /* neither side has template caps in this case, so they are prepared for
3926        pull() without setcaps() */
3927     result = TRUE;
3928   } else {
3929     caps = gst_caps_make_writable (caps);
3930     /* try to fixate */
3931     gst_base_sink_fixate (basesink, caps);
3932     GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3933
3934     if (gst_caps_is_fixed (caps)) {
3935       if (!gst_pad_send_event (GST_BASE_SINK_PAD (basesink),
3936               gst_event_new_caps (caps)))
3937         goto could_not_set_caps;
3938
3939       result = TRUE;
3940     }
3941   }
3942
3943   gst_caps_unref (caps);
3944
3945   return result;
3946
3947 no_caps_possible:
3948   {
3949     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3950     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3951     if (caps)
3952       gst_caps_unref (caps);
3953     return FALSE;
3954   }
3955 could_not_set_caps:
3956   {
3957     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3958     gst_caps_unref (caps);
3959     return FALSE;
3960   }
3961 }
3962
3963 /* this won't get called until we implement an activate function */
3964 static gboolean
3965 gst_base_sink_pad_activate_pull (GstPad * pad, GstObject * parent,
3966     gboolean active)
3967 {
3968   gboolean result = FALSE;
3969   GstBaseSink *basesink;
3970   GstBaseSinkClass *bclass;
3971
3972   basesink = GST_BASE_SINK (parent);
3973   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3974
3975   if (active) {
3976     gint64 duration;
3977
3978     /* we mark we have a newsegment here because pull based
3979      * mode works just fine without having a newsegment before the
3980      * first buffer */
3981     gst_segment_init (&basesink->segment, GST_FORMAT_BYTES);
3982     GST_OBJECT_LOCK (basesink);
3983     basesink->have_newsegment = TRUE;
3984     GST_OBJECT_UNLOCK (basesink);
3985
3986     /* get the peer duration in bytes */
3987     result = gst_pad_peer_query_duration (pad, GST_FORMAT_BYTES, &duration);
3988     if (result) {
3989       GST_DEBUG_OBJECT (basesink,
3990           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3991       basesink->segment.duration = duration;
3992     } else {
3993       GST_DEBUG_OBJECT (basesink, "unknown duration");
3994     }
3995
3996     if (bclass->activate_pull)
3997       result = bclass->activate_pull (basesink, TRUE);
3998     else
3999       result = FALSE;
4000
4001     if (!result)
4002       goto activate_failed;
4003
4004   } else {
4005     if (G_UNLIKELY (basesink->pad_mode != GST_PAD_MODE_PULL)) {
4006       g_warning ("Internal GStreamer activation error!!!");
4007       result = FALSE;
4008     } else {
4009       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4010       if (bclass->activate_pull)
4011         result &= bclass->activate_pull (basesink, FALSE);
4012       basesink->pad_mode = GST_PAD_MODE_NONE;
4013     }
4014   }
4015
4016   return result;
4017
4018   /* ERRORS */
4019 activate_failed:
4020   {
4021     /* reset, as starting the thread failed */
4022     basesink->pad_mode = GST_PAD_MODE_NONE;
4023
4024     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4025     return FALSE;
4026   }
4027 }
4028
4029 static gboolean
4030 gst_base_sink_pad_activate_mode (GstPad * pad, GstObject * parent,
4031     GstPadMode mode, gboolean active)
4032 {
4033   gboolean res;
4034
4035   switch (mode) {
4036     case GST_PAD_MODE_PULL:
4037       res = gst_base_sink_pad_activate_pull (pad, parent, active);
4038       break;
4039     case GST_PAD_MODE_PUSH:
4040       res = gst_base_sink_pad_activate_push (pad, parent, active);
4041       break;
4042     default:
4043       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
4044       res = FALSE;
4045       break;
4046   }
4047   return res;
4048 }
4049
4050 /* send an event to our sinkpad peer. */
4051 static gboolean
4052 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4053 {
4054   GstPad *pad;
4055   GstBaseSink *basesink = GST_BASE_SINK (element);
4056   gboolean forward, result = TRUE;
4057   GstPadMode mode;
4058
4059   GST_OBJECT_LOCK (element);
4060   /* get the pad and the scheduling mode */
4061   pad = gst_object_ref (basesink->sinkpad);
4062   mode = basesink->pad_mode;
4063   GST_OBJECT_UNLOCK (element);
4064
4065   /* only push UPSTREAM events upstream */
4066   forward = GST_EVENT_IS_UPSTREAM (event);
4067
4068   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4069       event);
4070
4071   switch (GST_EVENT_TYPE (event)) {
4072     case GST_EVENT_LATENCY:
4073     {
4074       GstClockTime latency;
4075
4076       gst_event_parse_latency (event, &latency);
4077
4078       /* store the latency. We use this to adjust the running_time before syncing
4079        * it to the clock. */
4080       GST_OBJECT_LOCK (element);
4081       basesink->priv->latency = latency;
4082       if (!basesink->priv->have_latency)
4083         forward = FALSE;
4084       GST_OBJECT_UNLOCK (element);
4085       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4086           GST_TIME_ARGS (latency));
4087
4088       /* We forward this event so that all elements know about the global pipeline
4089        * latency. This is interesting for an element when it wants to figure out
4090        * when a particular piece of data will be rendered. */
4091       break;
4092     }
4093     case GST_EVENT_SEEK:
4094       /* in pull mode we will execute the seek */
4095       if (mode == GST_PAD_MODE_PULL)
4096         result = gst_base_sink_perform_seek (basesink, pad, event);
4097       break;
4098     case GST_EVENT_STEP:
4099       result = gst_base_sink_perform_step (basesink, pad, event);
4100       forward = FALSE;
4101       break;
4102     default:
4103       break;
4104   }
4105
4106   if (forward) {
4107     result = gst_pad_push_event (pad, event);
4108   } else {
4109     /* not forwarded, unref the event */
4110     gst_event_unref (event);
4111   }
4112
4113   gst_object_unref (pad);
4114
4115   GST_DEBUG_OBJECT (basesink, "handled event %p %" GST_PTR_FORMAT ": %d", event,
4116       event, result);
4117
4118   return result;
4119 }
4120
4121 static gboolean
4122 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4123     gint64 * cur, gboolean * upstream)
4124 {
4125   GstClock *clock = NULL;
4126   gboolean res = FALSE;
4127   GstFormat oformat;
4128   GstSegment *segment;
4129   GstClockTime now, latency;
4130   GstClockTimeDiff base_time;
4131   gint64 time, base, duration;
4132   gdouble rate;
4133   gint64 last;
4134   gboolean last_seen, with_clock, in_paused;
4135
4136   GST_OBJECT_LOCK (basesink);
4137   /* we can only get the segment when we are not NULL or READY */
4138   if (!basesink->have_newsegment)
4139     goto wrong_state;
4140
4141   in_paused = FALSE;
4142   /* when not in PLAYING or when we're busy with a state change, we
4143    * cannot read from the clock so we report time based on the
4144    * last seen timestamp. */
4145   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4146       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4147     in_paused = TRUE;
4148   }
4149
4150   segment = &basesink->segment;
4151
4152   /* get the format in the segment */
4153   oformat = segment->format;
4154
4155   /* report with last seen position when EOS */
4156   last_seen = basesink->eos;
4157
4158   /* assume we will use the clock for getting the current position */
4159   with_clock = TRUE;
4160   if (basesink->sync == FALSE)
4161     with_clock = FALSE;
4162
4163   /* and we need a clock */
4164   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4165     with_clock = FALSE;
4166   else
4167     gst_object_ref (clock);
4168
4169   /* mainloop might be querying position when going to playing async,
4170    * while (audio) rendering might be quickly advancing stream position,
4171    * so use clock asap rather than last reported position */
4172   if (in_paused && with_clock && g_atomic_int_get (&basesink->priv->to_playing)) {
4173     GST_DEBUG_OBJECT (basesink, "going to PLAYING, so not PAUSED");
4174     in_paused = FALSE;
4175   }
4176
4177   /* collect all data we need holding the lock */
4178   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4179     time = segment->time;
4180   else
4181     time = 0;
4182
4183   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4184     duration = segment->stop - segment->start;
4185   else
4186     duration = 0;
4187
4188   base = segment->base;
4189   rate = segment->rate * segment->applied_rate;
4190   latency = basesink->priv->latency;
4191
4192   if (oformat == GST_FORMAT_TIME) {
4193     gint64 start, stop;
4194
4195     start = basesink->priv->current_sstart;
4196     stop = basesink->priv->current_sstop;
4197
4198     if (in_paused) {
4199       /* in paused we use the last position as a lower bound */
4200       if (stop == -1 || segment->rate > 0.0)
4201         last = start;
4202       else
4203         last = stop;
4204     } else {
4205       /* in playing, use last stop time as upper bound */
4206       if (start == -1 || segment->rate > 0.0)
4207         last = stop;
4208       else
4209         last = start;
4210     }
4211   } else {
4212     /* convert last stop to stream time */
4213     last = gst_segment_to_stream_time (segment, oformat, segment->position);
4214   }
4215
4216   if (in_paused) {
4217     /* in paused, use start_time */
4218     base_time = GST_ELEMENT_START_TIME (basesink);
4219     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4220         GST_TIME_ARGS (base_time));
4221   } else if (with_clock) {
4222     /* else use clock when needed */
4223     base_time = GST_ELEMENT_CAST (basesink)->base_time;
4224     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4225         GST_TIME_ARGS (base_time));
4226   } else {
4227     /* else, no sync or clock -> no base time */
4228     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4229     base_time = -1;
4230   }
4231
4232   /* no base_time, we can't calculate running_time, use last seem timestamp to report
4233    * time */
4234   if (base_time == -1)
4235     last_seen = TRUE;
4236
4237   /* need to release the object lock before we can get the time,
4238    * a clock might take the LOCK of the provider, which could be
4239    * a basesink subclass. */
4240   GST_OBJECT_UNLOCK (basesink);
4241
4242   if (last_seen) {
4243     /* in EOS or when no valid stream_time, report the value of last seen
4244      * timestamp */
4245     if (last == -1) {
4246       /* no timestamp, we need to ask upstream */
4247       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4248       res = FALSE;
4249       *upstream = TRUE;
4250       goto done;
4251     }
4252     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4253         GST_TIME_ARGS (last));
4254     *cur = last;
4255   } else {
4256     if (oformat != GST_FORMAT_TIME) {
4257       /* convert base, time and duration to time */
4258       if (!gst_pad_query_convert (basesink->sinkpad, oformat, base,
4259               GST_FORMAT_TIME, &base))
4260         goto convert_failed;
4261       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4262               GST_FORMAT_TIME, &duration))
4263         goto convert_failed;
4264       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time,
4265               GST_FORMAT_TIME, &time))
4266         goto convert_failed;
4267       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last,
4268               GST_FORMAT_TIME, &last))
4269         goto convert_failed;
4270
4271       /* assume time format from now on */
4272       oformat = GST_FORMAT_TIME;
4273     }
4274
4275     if (!in_paused && with_clock) {
4276       now = gst_clock_get_time (clock);
4277     } else {
4278       now = base_time;
4279       base_time = 0;
4280     }
4281
4282     /* subtract base time and base time from the clock time.
4283      * Make sure we don't go negative. This is the current time in
4284      * the segment which we need to scale with the combined
4285      * rate and applied rate. */
4286     base_time += base;
4287     base_time += latency;
4288     if (GST_CLOCK_DIFF (base_time, now) < 0)
4289       base_time = now;
4290
4291     /* for negative rates we need to count back from the segment
4292      * duration. */
4293     if (rate < 0.0)
4294       time += duration;
4295
4296     *cur = time + gst_guint64_to_gdouble (now - base_time) * rate;
4297
4298     if (in_paused) {
4299       /* never report less than segment values in paused */
4300       if (last != -1)
4301         *cur = MAX (last, *cur);
4302     } else {
4303       /* never report more than last seen position in playing */
4304       if (last != -1)
4305         *cur = MIN (last, *cur);
4306     }
4307
4308     GST_DEBUG_OBJECT (basesink,
4309         "now %" GST_TIME_FORMAT " - base_time %" GST_TIME_FORMAT " - base %"
4310         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4311         GST_TIME_ARGS (now), GST_TIME_ARGS (base_time), GST_TIME_ARGS (base),
4312         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4313   }
4314
4315   if (oformat != format) {
4316     /* convert to final format */
4317     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, format, cur))
4318       goto convert_failed;
4319   }
4320
4321   res = TRUE;
4322
4323 done:
4324   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4325       res, GST_TIME_ARGS (*cur));
4326
4327   if (clock)
4328     gst_object_unref (clock);
4329
4330   return res;
4331
4332   /* special cases */
4333 wrong_state:
4334   {
4335     /* in NULL or READY we always return FALSE and -1 */
4336     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4337     res = FALSE;
4338     *cur = -1;
4339     GST_OBJECT_UNLOCK (basesink);
4340     goto done;
4341   }
4342 convert_failed:
4343   {
4344     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4345     *upstream = TRUE;
4346     res = FALSE;
4347     goto done;
4348   }
4349 }
4350
4351 static gboolean
4352 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4353     gint64 * dur, gboolean * upstream)
4354 {
4355   gboolean res = FALSE;
4356
4357   if (basesink->pad_mode == GST_PAD_MODE_PULL) {
4358     gint64 uduration;
4359
4360     /* get the duration in bytes, in pull mode that's all we are sure to
4361      * know. We have to explicitly get this value from upstream instead of
4362      * using our cached value because it might change. Duration caching
4363      * should be done at a higher level. */
4364     res =
4365         gst_pad_peer_query_duration (basesink->sinkpad, GST_FORMAT_BYTES,
4366         &uduration);
4367     if (res) {
4368       basesink->segment.duration = uduration;
4369       if (format != GST_FORMAT_BYTES) {
4370         /* convert to the requested format */
4371         res =
4372             gst_pad_query_convert (basesink->sinkpad, GST_FORMAT_BYTES,
4373             uduration, format, dur);
4374       } else {
4375         *dur = uduration;
4376       }
4377     }
4378     *upstream = FALSE;
4379   } else {
4380     *upstream = TRUE;
4381   }
4382
4383   return res;
4384 }
4385
4386 static gboolean
4387 default_element_query (GstElement * element, GstQuery * query)
4388 {
4389   gboolean res = FALSE;
4390
4391   GstBaseSink *basesink = GST_BASE_SINK (element);
4392
4393   switch (GST_QUERY_TYPE (query)) {
4394     case GST_QUERY_POSITION:
4395     {
4396       gint64 cur = 0;
4397       GstFormat format;
4398       gboolean upstream = FALSE;
4399
4400       gst_query_parse_position (query, &format, NULL);
4401
4402       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4403           gst_format_get_name (format));
4404
4405       /* first try to get the position based on the clock */
4406       if ((res =
4407               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4408         gst_query_set_position (query, format, cur);
4409       } else if (upstream) {
4410         /* fallback to peer query */
4411         res = gst_pad_peer_query (basesink->sinkpad, query);
4412       }
4413       if (!res) {
4414         /* we can handle a few things if upstream failed */
4415         if (format == GST_FORMAT_PERCENT) {
4416           gint64 dur = 0;
4417
4418           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4419               &upstream);
4420           if (!res && upstream) {
4421             res =
4422                 gst_pad_peer_query_position (basesink->sinkpad, GST_FORMAT_TIME,
4423                 &cur);
4424           }
4425           if (res) {
4426             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4427                 &upstream);
4428             if (!res && upstream) {
4429               res =
4430                   gst_pad_peer_query_duration (basesink->sinkpad,
4431                   GST_FORMAT_TIME, &dur);
4432             }
4433           }
4434           if (res) {
4435             gint64 pos;
4436
4437             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4438                 dur);
4439             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4440           }
4441         }
4442       }
4443       break;
4444     }
4445     case GST_QUERY_DURATION:
4446     {
4447       gint64 dur = 0;
4448       GstFormat format;
4449       gboolean upstream = FALSE;
4450
4451       gst_query_parse_duration (query, &format, NULL);
4452
4453       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4454           gst_format_get_name (format));
4455
4456       if ((res =
4457               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4458         gst_query_set_duration (query, format, dur);
4459       } else if (upstream) {
4460         /* fallback to peer query */
4461         res = gst_pad_peer_query (basesink->sinkpad, query);
4462       }
4463       if (!res) {
4464         /* we can handle a few things if upstream failed */
4465         if (format == GST_FORMAT_PERCENT) {
4466           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4467               GST_FORMAT_PERCENT_MAX);
4468           res = TRUE;
4469         }
4470       }
4471       break;
4472     }
4473     case GST_QUERY_LATENCY:
4474     {
4475       gboolean live, us_live;
4476       GstClockTime min, max;
4477
4478       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4479                   &max))) {
4480         gst_query_set_latency (query, live, min, max);
4481       }
4482       break;
4483     }
4484     case GST_QUERY_JITTER:
4485       break;
4486     case GST_QUERY_RATE:
4487       /* gst_query_set_rate (query, basesink->segment_rate); */
4488       res = TRUE;
4489       break;
4490     case GST_QUERY_SEGMENT:
4491     {
4492       if (basesink->pad_mode == GST_PAD_MODE_PULL) {
4493         gst_query_set_segment (query, basesink->segment.rate,
4494             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4495         res = TRUE;
4496       } else {
4497         res = gst_pad_peer_query (basesink->sinkpad, query);
4498       }
4499       break;
4500     }
4501     case GST_QUERY_SEEKING:
4502     case GST_QUERY_CONVERT:
4503     case GST_QUERY_FORMATS:
4504     default:
4505       res = gst_pad_peer_query (basesink->sinkpad, query);
4506       break;
4507   }
4508   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4509       GST_QUERY_TYPE_NAME (query), res);
4510   return res;
4511 }
4512
4513
4514 static gboolean
4515 gst_base_sink_default_query (GstBaseSink * basesink, GstQuery * query)
4516 {
4517   gboolean res;
4518   GstBaseSinkClass *bclass;
4519
4520   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4521
4522   switch (GST_QUERY_TYPE (query)) {
4523     case GST_QUERY_ALLOCATION:
4524     {
4525       if (bclass->propose_allocation)
4526         res = bclass->propose_allocation (basesink, query);
4527       else
4528         res = FALSE;
4529       break;
4530     }
4531     case GST_QUERY_CAPS:
4532     {
4533       GstCaps *caps, *filter;
4534
4535       gst_query_parse_caps (query, &filter);
4536       caps = gst_base_sink_query_caps (basesink, basesink->sinkpad, filter);
4537       gst_query_set_caps_result (query, caps);
4538       gst_caps_unref (caps);
4539       res = TRUE;
4540       break;
4541     }
4542     default:
4543       res =
4544           gst_pad_query_default (basesink->sinkpad, GST_OBJECT_CAST (basesink),
4545           query);
4546       break;
4547   }
4548   return res;
4549 }
4550
4551 static gboolean
4552 gst_base_sink_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
4553 {
4554   GstBaseSink *basesink;
4555   GstBaseSinkClass *bclass;
4556   gboolean res;
4557
4558   basesink = GST_BASE_SINK_CAST (parent);
4559   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4560
4561   if (bclass->query)
4562     res = bclass->query (basesink, query);
4563   else
4564     res = FALSE;
4565
4566   return res;
4567 }
4568
4569 static GstStateChangeReturn
4570 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4571 {
4572   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4573   GstBaseSink *basesink = GST_BASE_SINK (element);
4574   GstBaseSinkClass *bclass;
4575   GstBaseSinkPrivate *priv;
4576
4577   priv = basesink->priv;
4578
4579   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4580
4581   switch (transition) {
4582     case GST_STATE_CHANGE_NULL_TO_READY:
4583       if (bclass->start)
4584         if (!bclass->start (basesink))
4585           goto start_failed;
4586       break;
4587     case GST_STATE_CHANGE_READY_TO_PAUSED:
4588       /* need to complete preroll before this state change completes, there
4589        * is no data flow in READY so we can safely assume we need to preroll. */
4590       GST_BASE_SINK_PREROLL_LOCK (basesink);
4591       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4592       basesink->have_newsegment = FALSE;
4593       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4594       basesink->offset = 0;
4595       basesink->have_preroll = FALSE;
4596       priv->step_unlock = FALSE;
4597       basesink->need_preroll = TRUE;
4598       basesink->playing_async = TRUE;
4599       basesink->priv->reset_time = FALSE;
4600       priv->current_sstart = GST_CLOCK_TIME_NONE;
4601       priv->current_sstop = GST_CLOCK_TIME_NONE;
4602       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4603       priv->latency = 0;
4604       basesink->eos = FALSE;
4605       priv->received_eos = FALSE;
4606       gst_base_sink_reset_qos (basesink);
4607       priv->commited = FALSE;
4608       priv->call_preroll = TRUE;
4609       priv->current_step.valid = FALSE;
4610       priv->pending_step.valid = FALSE;
4611       if (priv->async_enabled) {
4612         GST_DEBUG_OBJECT (basesink, "doing async state change");
4613         /* when async enabled, post async-start message and return ASYNC from
4614          * the state change function */
4615         ret = GST_STATE_CHANGE_ASYNC;
4616         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4617             gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
4618       } else {
4619         priv->have_latency = TRUE;
4620       }
4621       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4622       break;
4623     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4624       GST_BASE_SINK_PREROLL_LOCK (basesink);
4625       g_atomic_int_set (&basesink->priv->to_playing, TRUE);
4626       if (!gst_base_sink_needs_preroll (basesink)) {
4627         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4628         /* no preroll needed anymore now. */
4629         basesink->playing_async = FALSE;
4630         basesink->need_preroll = FALSE;
4631         if (basesink->eos) {
4632           GstMessage *message;
4633
4634           /* need to post EOS message here */
4635           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4636           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4637           gst_message_set_seqnum (message, basesink->priv->seqnum);
4638           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4639         } else {
4640           GST_DEBUG_OBJECT (basesink, "signal preroll");
4641           GST_BASE_SINK_PREROLL_SIGNAL (basesink);
4642         }
4643       } else {
4644         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4645         basesink->need_preroll = TRUE;
4646         basesink->playing_async = TRUE;
4647         priv->call_preroll = TRUE;
4648         priv->commited = FALSE;
4649         if (priv->async_enabled) {
4650           GST_DEBUG_OBJECT (basesink, "doing async state change");
4651           ret = GST_STATE_CHANGE_ASYNC;
4652           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4653               gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
4654         }
4655       }
4656       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4657       break;
4658     default:
4659       break;
4660   }
4661
4662   {
4663     GstStateChangeReturn bret;
4664
4665     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4666     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4667       goto activate_failed;
4668   }
4669
4670   switch (transition) {
4671     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4672       /* completed transition, so need not be marked any longer
4673        * And it should be unmarked, since e.g. losing our position upon flush
4674        * does not really change state to PAUSED ... */
4675       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
4676       break;
4677     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4678       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
4679       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4680       /* FIXME, make sure we cannot enter _render first */
4681
4682       /* we need to call ::unlock before locking PREROLL_LOCK
4683        * since we lock it before going into ::render */
4684       if (bclass->unlock)
4685         bclass->unlock (basesink);
4686
4687       GST_BASE_SINK_PREROLL_LOCK (basesink);
4688       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4689       /* now that we have the PREROLL lock, clear our unlock request */
4690       if (bclass->unlock_stop)
4691         bclass->unlock_stop (basesink);
4692
4693       /* we need preroll again and we set the flag before unlocking the clockid
4694        * because if the clockid is unlocked before a current buffer expired, we
4695        * can use that buffer to preroll with */
4696       basesink->need_preroll = TRUE;
4697
4698       if (basesink->clock_id) {
4699         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4700         gst_clock_id_unschedule (basesink->clock_id);
4701       }
4702
4703       /* if we don't have a preroll buffer we need to wait for a preroll and
4704        * return ASYNC. */
4705       if (!gst_base_sink_needs_preroll (basesink)) {
4706         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4707         basesink->playing_async = FALSE;
4708       } else {
4709         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4710           GST_DEBUG_OBJECT (basesink, "element is <= READY");
4711           ret = GST_STATE_CHANGE_SUCCESS;
4712         } else {
4713           GST_DEBUG_OBJECT (basesink,
4714               "PLAYING to PAUSED, we are not prerolled");
4715           basesink->playing_async = TRUE;
4716           priv->commited = FALSE;
4717           priv->call_preroll = TRUE;
4718           if (priv->async_enabled) {
4719             GST_DEBUG_OBJECT (basesink, "doing async state change");
4720             ret = GST_STATE_CHANGE_ASYNC;
4721             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4722                 gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
4723           }
4724         }
4725       }
4726       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4727           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4728
4729       gst_base_sink_reset_qos (basesink);
4730       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4731       break;
4732     case GST_STATE_CHANGE_PAUSED_TO_READY:
4733       GST_BASE_SINK_PREROLL_LOCK (basesink);
4734       /* start by resetting our position state with the object lock so that the
4735        * position query gets the right idea. We do this before we post the
4736        * messages so that the message handlers pick this up. */
4737       GST_OBJECT_LOCK (basesink);
4738       basesink->have_newsegment = FALSE;
4739       priv->current_sstart = GST_CLOCK_TIME_NONE;
4740       priv->current_sstop = GST_CLOCK_TIME_NONE;
4741       priv->have_latency = FALSE;
4742       if (priv->cached_clock_id) {
4743         gst_clock_id_unref (priv->cached_clock_id);
4744         priv->cached_clock_id = NULL;
4745       }
4746       gst_caps_replace (&basesink->priv->caps, NULL);
4747       GST_OBJECT_UNLOCK (basesink);
4748
4749       gst_base_sink_set_last_buffer (basesink, NULL);
4750       priv->call_preroll = FALSE;
4751
4752       if (!priv->commited) {
4753         if (priv->async_enabled) {
4754           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4755
4756           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4757               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4758                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4759
4760           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4761               gst_message_new_async_done (GST_OBJECT_CAST (basesink), FALSE));
4762         }
4763         priv->commited = TRUE;
4764       } else {
4765         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4766       }
4767       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4768       break;
4769     case GST_STATE_CHANGE_READY_TO_NULL:
4770       if (bclass->stop) {
4771         if (!bclass->stop (basesink)) {
4772           GST_WARNING_OBJECT (basesink, "failed to stop");
4773         }
4774       }
4775       gst_base_sink_set_last_buffer (basesink, NULL);
4776       priv->call_preroll = FALSE;
4777       break;
4778     default:
4779       break;
4780   }
4781
4782   return ret;
4783
4784   /* ERRORS */
4785 start_failed:
4786   {
4787     GST_DEBUG_OBJECT (basesink, "failed to start");
4788     return GST_STATE_CHANGE_FAILURE;
4789   }
4790 activate_failed:
4791   {
4792     GST_DEBUG_OBJECT (basesink,
4793         "element failed to change states -- activation problem?");
4794     return GST_STATE_CHANGE_FAILURE;
4795   }
4796 }