Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its class_init function, like so:
40  * |[
41  * static void
42  * my_element_class_init (GstMyElementClass *klass)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * The #GstBaseSinkClass.unlock() method is called when the elements should
109  * unblock any blocking operations they perform in the
110  * #GstBaseSinkClass.render() method. This is mostly useful when the
111  * #GstBaseSinkClass.render() method performs a blocking write on a file
112  * descriptor, for example.
113  *
114  * The #GstBaseSink:max-lateness property affects how the sink deals with
115  * buffers that arrive too late in the sink. A buffer arrives too late in the
116  * sink when the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the
122  * #GstBaseSinkClass.get_times() method does not return a valid start time or
123  * max-lateness is set to -1 (the default).
124  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
125  * max-lateness value.
126  *
127  * The #GstBaseSink:qos property will enable the quality-of-service features of
128  * the basesink which gather statistics about the real-time performance of the
129  * clock synchronisation. For each buffer received in the sink, statistics are
130  * gathered and a QOS event is sent upstream with these numbers. This
131  * information can then be used by upstream elements to reduce their processing
132  * rate, for example.
133  *
134  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
135  * sink to never perform an ASYNC state change. This feature is mostly usable
136  * when dealing with non-synchronized streams or sparse streams.
137  *
138  * Last reviewed on 2007-08-29 (0.10.15)
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #  include "config.h"
143 #endif
144
145 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
146  * with newer GLib versions (>= 2.31.0) */
147 #define GLIB_DISABLE_DEPRECATION_WARNINGS
148 #include <gst/gst_private.h>
149
150 #include "gstbasesink.h"
151 #include <gst/gstmarshal.h>
152 #include <gst/gst-i18n-lib.h>
153
154 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
155 #define GST_CAT_DEFAULT gst_base_sink_debug
156
157 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
158    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
159
160 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
161
162 typedef struct
163 {
164   gboolean valid;               /* if this info is valid */
165   guint32 seqnum;               /* the seqnum of the STEP event */
166   GstFormat format;             /* the format of the amount */
167   guint64 amount;               /* the total amount of data to skip */
168   guint64 position;             /* the position in the stepped data */
169   guint64 duration;             /* the duration in time of the skipped data */
170   guint64 start;                /* running_time of the start */
171   gdouble rate;                 /* rate of skipping */
172   gdouble start_rate;           /* rate before skipping */
173   guint64 start_start;          /* start position skipping */
174   guint64 start_stop;           /* stop position skipping */
175   gboolean flush;               /* if this was a flushing step */
176   gboolean intermediate;        /* if this is an intermediate step */
177   gboolean need_preroll;        /* if we need preroll after this step */
178 } GstStepInfo;
179
180 struct _GstBaseSinkPrivate
181 {
182   gint qos_enabled;             /* ATOMIC */
183   gboolean async_enabled;
184   GstClockTimeDiff ts_offset;
185   GstClockTime render_delay;
186
187   /* start, stop of current buffer, stream time, used to report position */
188   GstClockTime current_sstart;
189   GstClockTime current_sstop;
190
191   /* start, stop and jitter of current buffer, running time */
192   GstClockTime current_rstart;
193   GstClockTime current_rstop;
194   GstClockTimeDiff current_jitter;
195   /* the running time of the previous buffer */
196   GstClockTime prev_rstart;
197
198   /* EOS sync time in running time */
199   GstClockTime eos_rtime;
200
201   /* last buffer that arrived in time, running time */
202   GstClockTime last_render_time;
203   /* when the last buffer left the sink, running time */
204   GstClockTime last_left;
205
206   /* running averages go here these are done on running time */
207   GstClockTime avg_pt;
208   GstClockTime avg_duration;
209   gdouble avg_rate;
210   GstClockTime avg_in_diff;
211
212   /* these are done on system time. avg_jitter and avg_render are
213    * compared to eachother to see if the rendering time takes a
214    * huge amount of the processing, If so we are flooded with
215    * buffers. */
216   GstClockTime last_left_systime;
217   GstClockTime avg_jitter;
218   GstClockTime start, stop;
219   GstClockTime avg_render;
220
221   /* number of rendered and dropped frames */
222   guint64 rendered;
223   guint64 dropped;
224
225   /* latency stuff */
226   GstClockTime latency;
227
228   /* if we already commited the state */
229   gboolean commited;
230   /* state change to playing ongoing */
231   gboolean to_playing;
232
233   /* when we received EOS */
234   gboolean received_eos;
235
236   /* when we are prerolled and able to report latency */
237   gboolean have_latency;
238
239   /* the last buffer we prerolled or rendered. Useful for making snapshots */
240   gint enable_last_sample;      /* atomic */
241   GstBuffer *last_buffer;
242   GstCaps *last_caps;
243
244   /* negotiated caps */
245   GstCaps *caps;
246
247   /* blocksize for pulling */
248   guint blocksize;
249
250   gboolean discont;
251
252   /* seqnum of the stream */
253   guint32 seqnum;
254
255   gboolean call_preroll;
256   gboolean step_unlock;
257
258   /* we have a pending and a current step operation */
259   GstStepInfo current_step;
260   GstStepInfo pending_step;
261
262   /* Cached GstClockID */
263   GstClockID cached_clock_id;
264
265   /* for throttling and QoS */
266   GstClockTime earliest_in_time;
267   GstClockTime throttle_time;
268
269   gboolean reset_time;
270 };
271
272 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
273
274 /* generic running average, this has a neutral window size */
275 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
276
277 /* the windows for these running averages are experimentally obtained.
278  * possitive values get averaged more while negative values use a small
279  * window so we can react faster to badness. */
280 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
281 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
282
283 /* BaseSink properties */
284
285 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
286 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
287
288 #define DEFAULT_SYNC                TRUE
289 #define DEFAULT_MAX_LATENESS        -1
290 #define DEFAULT_QOS                 FALSE
291 #define DEFAULT_ASYNC               TRUE
292 #define DEFAULT_TS_OFFSET           0
293 #define DEFAULT_BLOCKSIZE           4096
294 #define DEFAULT_RENDER_DELAY        0
295 #define DEFAULT_ENABLE_LAST_SAMPLE  TRUE
296 #define DEFAULT_THROTTLE_TIME       0
297
298 enum
299 {
300   PROP_0,
301   PROP_SYNC,
302   PROP_MAX_LATENESS,
303   PROP_QOS,
304   PROP_ASYNC,
305   PROP_TS_OFFSET,
306   PROP_ENABLE_LAST_SAMPLE,
307   PROP_LAST_SAMPLE,
308   PROP_BLOCKSIZE,
309   PROP_RENDER_DELAY,
310   PROP_THROTTLE_TIME,
311   PROP_LAST
312 };
313
314 static GstElementClass *parent_class = NULL;
315
316 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
317 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
318 static void gst_base_sink_finalize (GObject * object);
319
320 GType
321 gst_base_sink_get_type (void)
322 {
323   static volatile gsize base_sink_type = 0;
324
325   if (g_once_init_enter (&base_sink_type)) {
326     GType _type;
327     static const GTypeInfo base_sink_info = {
328       sizeof (GstBaseSinkClass),
329       NULL,
330       NULL,
331       (GClassInitFunc) gst_base_sink_class_init,
332       NULL,
333       NULL,
334       sizeof (GstBaseSink),
335       0,
336       (GInstanceInitFunc) gst_base_sink_init,
337     };
338
339     _type = g_type_register_static (GST_TYPE_ELEMENT,
340         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
341     g_once_init_leave (&base_sink_type, _type);
342   }
343   return base_sink_type;
344 }
345
346 static void gst_base_sink_set_property (GObject * object, guint prop_id,
347     const GValue * value, GParamSpec * pspec);
348 static void gst_base_sink_get_property (GObject * object, guint prop_id,
349     GValue * value, GParamSpec * pspec);
350
351 static gboolean gst_base_sink_send_event (GstElement * element,
352     GstEvent * event);
353 static gboolean default_element_query (GstElement * element, GstQuery * query);
354
355 static GstCaps *gst_base_sink_default_get_caps (GstBaseSink * sink,
356     GstCaps * caps);
357 static gboolean gst_base_sink_default_set_caps (GstBaseSink * sink,
358     GstCaps * caps);
359 static void gst_base_sink_default_get_times (GstBaseSink * basesink,
360     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
361 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
362     GstPad * pad, gboolean flushing);
363 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
364     gboolean active);
365 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
366     GstSegment * segment);
367 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
368     GstEvent * event, GstSegment * segment);
369
370 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
371     GstStateChange transition);
372
373 static gboolean gst_base_sink_sink_query (GstPad * pad, GstObject * parent,
374     GstQuery * query);
375 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstObject * parent,
376     GstBuffer * buffer);
377 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad, GstObject * parent,
378     GstBufferList * list);
379
380 static void gst_base_sink_loop (GstPad * pad);
381 static gboolean gst_base_sink_pad_activate (GstPad * pad, GstObject * parent);
382 static gboolean gst_base_sink_pad_activate_mode (GstPad * pad,
383     GstObject * parent, GstPadMode mode, gboolean active);
384 static gboolean gst_base_sink_default_event (GstBaseSink * basesink,
385     GstEvent * event);
386 static GstFlowReturn gst_base_sink_default_wait_eos (GstBaseSink * basesink,
387     GstEvent * event);
388 static gboolean gst_base_sink_event (GstPad * pad, GstObject * parent,
389     GstEvent * event);
390
391 static gboolean gst_base_sink_default_query (GstBaseSink * sink,
392     GstQuery * query);
393
394 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
395 static void gst_base_sink_default_fixate (GstBaseSink * bsink, GstCaps * caps);
396 static void gst_base_sink_fixate (GstBaseSink * bsink, GstCaps * caps);
397
398 /* check if an object was too late */
399 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
400     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
401     GstClockReturn status, GstClockTimeDiff jitter);
402
403 static void
404 gst_base_sink_class_init (GstBaseSinkClass * klass)
405 {
406   GObjectClass *gobject_class;
407   GstElementClass *gstelement_class;
408
409   gobject_class = G_OBJECT_CLASS (klass);
410   gstelement_class = GST_ELEMENT_CLASS (klass);
411
412   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
413       "basesink element");
414
415   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
416
417   parent_class = g_type_class_peek_parent (klass);
418
419   gobject_class->finalize = gst_base_sink_finalize;
420   gobject_class->set_property = gst_base_sink_set_property;
421   gobject_class->get_property = gst_base_sink_get_property;
422
423   g_object_class_install_property (gobject_class, PROP_SYNC,
424       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
425           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
428       g_param_spec_int64 ("max-lateness", "Max Lateness",
429           "Maximum number of nanoseconds that a buffer can be late before it "
430           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432
433   g_object_class_install_property (gobject_class, PROP_QOS,
434       g_param_spec_boolean ("qos", "Qos",
435           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437   /**
438    * GstBaseSink:async
439    *
440    * If set to #TRUE, the basesink will perform asynchronous state changes.
441    * When set to #FALSE, the sink will not signal the parent when it prerolls.
442    * Use this option when dealing with sparse streams or when synchronisation is
443    * not required.
444    *
445    * Since: 0.10.15
446    */
447   g_object_class_install_property (gobject_class, PROP_ASYNC,
448       g_param_spec_boolean ("async", "Async",
449           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
450           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstBaseSink:ts-offset
453    *
454    * Controls the final synchronisation, a negative value will render the buffer
455    * earlier while a positive value delays playback. This property can be
456    * used to fix synchronisation in bad files.
457    *
458    * Since: 0.10.15
459    */
460   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
461       g_param_spec_int64 ("ts-offset", "TS Offset",
462           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
463           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464
465   /**
466    * GstBaseSink:enable-last-sample
467    *
468    * Enable the last-sample property. If FALSE, basesink doesn't keep a
469    * reference to the last buffer arrived and the last-sample property is always
470    * set to NULL. This can be useful if you need buffers to be released as soon
471    * as possible, eg. if you're using a buffer pool.
472    *
473    * Since: 0.10.30
474    */
475   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_SAMPLE,
476       g_param_spec_boolean ("enable-last-sample", "Enable Last Buffer",
477           "Enable the last-sample property", DEFAULT_ENABLE_LAST_SAMPLE,
478           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
479
480   /**
481    * GstBaseSink:last-sample
482    *
483    * The last buffer that arrived in the sink and was used for preroll or for
484    * rendering. This property can be used to generate thumbnails. This property
485    * can be NULL when the sink has not yet received a bufer.
486    *
487    * Since: 0.10.15
488    */
489   g_object_class_install_property (gobject_class, PROP_LAST_SAMPLE,
490       g_param_spec_boxed ("last-sample", "Last Sample",
491           "The last sample received in the sink", GST_TYPE_SAMPLE,
492           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
493   /**
494    * GstBaseSink:blocksize
495    *
496    * The amount of bytes to pull when operating in pull mode.
497    *
498    * Since: 0.10.22
499    */
500   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
501   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
502       g_param_spec_uint ("blocksize", "Block size",
503           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
504           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505   /**
506    * GstBaseSink:render-delay
507    *
508    * The additional delay between synchronisation and actual rendering of the
509    * media. This property will add additional latency to the device in order to
510    * make other sinks compensate for the delay.
511    *
512    * Since: 0.10.22
513    */
514   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
515       g_param_spec_uint64 ("render-delay", "Render Delay",
516           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
517           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
518   /**
519    * GstBaseSink:throttle-time
520    *
521    * The time to insert between buffers. This property can be used to control
522    * the maximum amount of buffers per second to render. Setting this property
523    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
524    *
525    * Since: 0.10.33
526    */
527   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
528       g_param_spec_uint64 ("throttle-time", "Throttle time",
529           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
530           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531
532   gstelement_class->change_state =
533       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
534   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
535   gstelement_class->query = GST_DEBUG_FUNCPTR (default_element_query);
536
537   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_default_get_caps);
538   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_default_set_caps);
539   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_sink_default_fixate);
540   klass->activate_pull =
541       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
542   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_default_get_times);
543   klass->query = GST_DEBUG_FUNCPTR (gst_base_sink_default_query);
544   klass->event = GST_DEBUG_FUNCPTR (gst_base_sink_default_event);
545   klass->wait_eos = GST_DEBUG_FUNCPTR (gst_base_sink_default_wait_eos);
546
547   /* Registering debug symbols for function pointers */
548   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_fixate);
549   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
550   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_mode);
551   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
552   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
553   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
554   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_sink_query);
555 }
556
557 static GstCaps *
558 gst_base_sink_query_caps (GstBaseSink * bsink, GstPad * pad, GstCaps * filter)
559 {
560   GstBaseSinkClass *bclass;
561   GstCaps *caps = NULL;
562   gboolean fixed;
563
564   bclass = GST_BASE_SINK_GET_CLASS (bsink);
565   fixed = GST_PAD_IS_FIXED_CAPS (pad);
566
567   if (fixed || bsink->pad_mode == GST_PAD_MODE_PULL) {
568     /* if we are operating in pull mode or fixed caps, we only accept the
569      * currently negotiated caps */
570     caps = gst_pad_get_current_caps (pad);
571   }
572   if (caps == NULL) {
573     if (bclass->get_caps)
574       caps = bclass->get_caps (bsink, filter);
575
576     if (caps == NULL) {
577       GstPadTemplate *pad_template;
578
579       pad_template =
580           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
581           "sink");
582       if (pad_template != NULL) {
583         caps = gst_pad_template_get_caps (pad_template);
584
585         if (filter) {
586           GstCaps *intersection;
587
588           intersection =
589               gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
590           gst_caps_unref (caps);
591           caps = intersection;
592         }
593       }
594     }
595   }
596
597   return caps;
598 }
599
600 static void
601 gst_base_sink_default_fixate (GstBaseSink * bsink, GstCaps * caps)
602 {
603   GST_DEBUG_OBJECT (bsink, "using default caps fixate function");
604   gst_caps_fixate (caps);
605 }
606
607 static void
608 gst_base_sink_fixate (GstBaseSink * bsink, GstCaps * caps)
609 {
610   GstBaseSinkClass *bclass;
611
612   bclass = GST_BASE_SINK_GET_CLASS (bsink);
613
614   if (bclass->fixate)
615     bclass->fixate (bsink, caps);
616 }
617
618 static void
619 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
620 {
621   GstPadTemplate *pad_template;
622   GstBaseSinkPrivate *priv;
623
624   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
625
626   pad_template =
627       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
628   g_return_if_fail (pad_template != NULL);
629
630   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
631
632   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
633   gst_pad_set_activatemode_function (basesink->sinkpad,
634       gst_base_sink_pad_activate_mode);
635   gst_pad_set_query_function (basesink->sinkpad, gst_base_sink_sink_query);
636   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
637   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
638   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
639   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
640
641   basesink->pad_mode = GST_PAD_MODE_NONE;
642   basesink->preroll_lock = g_mutex_new ();
643   basesink->preroll_cond = g_cond_new ();
644   priv->have_latency = FALSE;
645
646   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
647   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
648
649   basesink->sync = DEFAULT_SYNC;
650   basesink->max_lateness = DEFAULT_MAX_LATENESS;
651   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
652   priv->async_enabled = DEFAULT_ASYNC;
653   priv->ts_offset = DEFAULT_TS_OFFSET;
654   priv->render_delay = DEFAULT_RENDER_DELAY;
655   priv->blocksize = DEFAULT_BLOCKSIZE;
656   priv->cached_clock_id = NULL;
657   g_atomic_int_set (&priv->enable_last_sample, DEFAULT_ENABLE_LAST_SAMPLE);
658   priv->throttle_time = DEFAULT_THROTTLE_TIME;
659
660   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_FLAG_SINK);
661 }
662
663 static void
664 gst_base_sink_finalize (GObject * object)
665 {
666   GstBaseSink *basesink;
667
668   basesink = GST_BASE_SINK (object);
669
670   g_mutex_free (basesink->preroll_lock);
671   g_cond_free (basesink->preroll_cond);
672
673   G_OBJECT_CLASS (parent_class)->finalize (object);
674 }
675
676 /**
677  * gst_base_sink_set_sync:
678  * @sink: the sink
679  * @sync: the new sync value.
680  *
681  * Configures @sink to synchronize on the clock or not. When
682  * @sync is FALSE, incoming samples will be played as fast as
683  * possible. If @sync is TRUE, the timestamps of the incomming
684  * buffers will be used to schedule the exact render time of its
685  * contents.
686  *
687  * Since: 0.10.4
688  */
689 void
690 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
691 {
692   g_return_if_fail (GST_IS_BASE_SINK (sink));
693
694   GST_OBJECT_LOCK (sink);
695   sink->sync = sync;
696   GST_OBJECT_UNLOCK (sink);
697 }
698
699 /**
700  * gst_base_sink_get_sync:
701  * @sink: the sink
702  *
703  * Checks if @sink is currently configured to synchronize against the
704  * clock.
705  *
706  * Returns: TRUE if the sink is configured to synchronize against the clock.
707  *
708  * Since: 0.10.4
709  */
710 gboolean
711 gst_base_sink_get_sync (GstBaseSink * sink)
712 {
713   gboolean res;
714
715   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
716
717   GST_OBJECT_LOCK (sink);
718   res = sink->sync;
719   GST_OBJECT_UNLOCK (sink);
720
721   return res;
722 }
723
724 /**
725  * gst_base_sink_set_max_lateness:
726  * @sink: the sink
727  * @max_lateness: the new max lateness value.
728  *
729  * Sets the new max lateness value to @max_lateness. This value is
730  * used to decide if a buffer should be dropped or not based on the
731  * buffer timestamp and the current clock time. A value of -1 means
732  * an unlimited time.
733  *
734  * Since: 0.10.4
735  */
736 void
737 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
738 {
739   g_return_if_fail (GST_IS_BASE_SINK (sink));
740
741   GST_OBJECT_LOCK (sink);
742   sink->max_lateness = max_lateness;
743   GST_OBJECT_UNLOCK (sink);
744 }
745
746 /**
747  * gst_base_sink_get_max_lateness:
748  * @sink: the sink
749  *
750  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
751  * more details.
752  *
753  * Returns: The maximum time in nanoseconds that a buffer can be late
754  * before it is dropped and not rendered. A value of -1 means an
755  * unlimited time.
756  *
757  * Since: 0.10.4
758  */
759 gint64
760 gst_base_sink_get_max_lateness (GstBaseSink * sink)
761 {
762   gint64 res;
763
764   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
765
766   GST_OBJECT_LOCK (sink);
767   res = sink->max_lateness;
768   GST_OBJECT_UNLOCK (sink);
769
770   return res;
771 }
772
773 /**
774  * gst_base_sink_set_qos_enabled:
775  * @sink: the sink
776  * @enabled: the new qos value.
777  *
778  * Configures @sink to send Quality-of-Service events upstream.
779  *
780  * Since: 0.10.5
781  */
782 void
783 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
784 {
785   g_return_if_fail (GST_IS_BASE_SINK (sink));
786
787   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
788 }
789
790 /**
791  * gst_base_sink_is_qos_enabled:
792  * @sink: the sink
793  *
794  * Checks if @sink is currently configured to send Quality-of-Service events
795  * upstream.
796  *
797  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
798  *
799  * Since: 0.10.5
800  */
801 gboolean
802 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
803 {
804   gboolean res;
805
806   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
807
808   res = g_atomic_int_get (&sink->priv->qos_enabled);
809
810   return res;
811 }
812
813 /**
814  * gst_base_sink_set_async_enabled:
815  * @sink: the sink
816  * @enabled: the new async value.
817  *
818  * Configures @sink to perform all state changes asynchronusly. When async is
819  * disabled, the sink will immediately go to PAUSED instead of waiting for a
820  * preroll buffer. This feature is useful if the sink does not synchronize
821  * against the clock or when it is dealing with sparse streams.
822  *
823  * Since: 0.10.15
824  */
825 void
826 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
827 {
828   g_return_if_fail (GST_IS_BASE_SINK (sink));
829
830   GST_BASE_SINK_PREROLL_LOCK (sink);
831   g_atomic_int_set (&sink->priv->async_enabled, enabled);
832   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
833   GST_BASE_SINK_PREROLL_UNLOCK (sink);
834 }
835
836 /**
837  * gst_base_sink_is_async_enabled:
838  * @sink: the sink
839  *
840  * Checks if @sink is currently configured to perform asynchronous state
841  * changes to PAUSED.
842  *
843  * Returns: TRUE if the sink is configured to perform asynchronous state
844  * changes.
845  *
846  * Since: 0.10.15
847  */
848 gboolean
849 gst_base_sink_is_async_enabled (GstBaseSink * sink)
850 {
851   gboolean res;
852
853   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
854
855   res = g_atomic_int_get (&sink->priv->async_enabled);
856
857   return res;
858 }
859
860 /**
861  * gst_base_sink_set_ts_offset:
862  * @sink: the sink
863  * @offset: the new offset
864  *
865  * Adjust the synchronisation of @sink with @offset. A negative value will
866  * render buffers earlier than their timestamp. A positive value will delay
867  * rendering. This function can be used to fix playback of badly timestamped
868  * buffers.
869  *
870  * Since: 0.10.15
871  */
872 void
873 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
874 {
875   g_return_if_fail (GST_IS_BASE_SINK (sink));
876
877   GST_OBJECT_LOCK (sink);
878   sink->priv->ts_offset = offset;
879   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
880   GST_OBJECT_UNLOCK (sink);
881 }
882
883 /**
884  * gst_base_sink_get_ts_offset:
885  * @sink: the sink
886  *
887  * Get the synchronisation offset of @sink.
888  *
889  * Returns: The synchronisation offset.
890  *
891  * Since: 0.10.15
892  */
893 GstClockTimeDiff
894 gst_base_sink_get_ts_offset (GstBaseSink * sink)
895 {
896   GstClockTimeDiff res;
897
898   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
899
900   GST_OBJECT_LOCK (sink);
901   res = sink->priv->ts_offset;
902   GST_OBJECT_UNLOCK (sink);
903
904   return res;
905 }
906
907 /**
908  * gst_base_sink_get_last_sample:
909  * @sink: the sink
910  *
911  * Get the last sample that arrived in the sink and was used for preroll or for
912  * rendering. This property can be used to generate thumbnails.
913  *
914  * The #GstCaps on the sample can be used to determine the type of the buffer.
915  *
916  * Free-function: gst_sample_unref
917  *
918  * Returns: (transfer full): a #GstSample. gst_sample_unref() after usage.
919  *     This function returns NULL when no buffer has arrived in the sink yet
920  *     or when the sink is not in PAUSED or PLAYING.
921  *
922  * Since: 0.10.15
923  */
924 GstSample *
925 gst_base_sink_get_last_sample (GstBaseSink * sink)
926 {
927   GstSample *res = NULL;
928
929   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
930
931   GST_OBJECT_LOCK (sink);
932   if (sink->priv->last_buffer) {
933     res = gst_sample_new (sink->priv->last_buffer,
934         sink->priv->last_caps, &sink->segment, NULL);
935   }
936   GST_OBJECT_UNLOCK (sink);
937
938   return res;
939 }
940
941 /* with OBJECT_LOCK */
942 static void
943 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
944 {
945   GstBuffer *old;
946
947   old = sink->priv->last_buffer;
948   if (G_LIKELY (old != buffer)) {
949     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
950     if (G_LIKELY (buffer))
951       gst_buffer_ref (buffer);
952     sink->priv->last_buffer = buffer;
953     if (buffer)
954       /* copy over the caps */
955       gst_caps_replace (&sink->priv->last_caps, sink->priv->caps);
956     else
957       gst_caps_replace (&sink->priv->last_caps, NULL);
958   } else {
959     old = NULL;
960   }
961   /* avoid unreffing with the lock because cleanup code might want to take the
962    * lock too */
963   if (G_LIKELY (old)) {
964     GST_OBJECT_UNLOCK (sink);
965     gst_buffer_unref (old);
966     GST_OBJECT_LOCK (sink);
967   }
968 }
969
970 static void
971 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
972 {
973   if (!g_atomic_int_get (&sink->priv->enable_last_sample))
974     return;
975
976   GST_OBJECT_LOCK (sink);
977   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
978   GST_OBJECT_UNLOCK (sink);
979 }
980
981 /**
982  * gst_base_sink_set_last_sample_enabled:
983  * @sink: the sink
984  * @enabled: the new enable-last-sample value.
985  *
986  * Configures @sink to store the last received sample in the last-sample
987  * property.
988  *
989  * Since: 0.10.30
990  */
991 void
992 gst_base_sink_set_last_sample_enabled (GstBaseSink * sink, gboolean enabled)
993 {
994   g_return_if_fail (GST_IS_BASE_SINK (sink));
995
996   /* Only take lock if we change the value */
997   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_sample,
998           !enabled, enabled) && !enabled) {
999     GST_OBJECT_LOCK (sink);
1000     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1001     GST_OBJECT_UNLOCK (sink);
1002   }
1003 }
1004
1005 /**
1006  * gst_base_sink_is_last_sample_enabled:
1007  * @sink: the sink
1008  *
1009  * Checks if @sink is currently configured to store the last received sample in
1010  * the last-sample property.
1011  *
1012  * Returns: TRUE if the sink is configured to store the last received sample.
1013  *
1014  * Since: 0.10.30
1015  */
1016 gboolean
1017 gst_base_sink_is_last_sample_enabled (GstBaseSink * sink)
1018 {
1019   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1020
1021   return g_atomic_int_get (&sink->priv->enable_last_sample);
1022 }
1023
1024 /**
1025  * gst_base_sink_get_latency:
1026  * @sink: the sink
1027  *
1028  * Get the currently configured latency.
1029  *
1030  * Returns: The configured latency.
1031  *
1032  * Since: 0.10.12
1033  */
1034 GstClockTime
1035 gst_base_sink_get_latency (GstBaseSink * sink)
1036 {
1037   GstClockTime res;
1038
1039   GST_OBJECT_LOCK (sink);
1040   res = sink->priv->latency;
1041   GST_OBJECT_UNLOCK (sink);
1042
1043   return res;
1044 }
1045
1046 /**
1047  * gst_base_sink_query_latency:
1048  * @sink: the sink
1049  * @live: (out) (allow-none): if the sink is live
1050  * @upstream_live: (out) (allow-none): if an upstream element is live
1051  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1052  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1053  *
1054  * Query the sink for the latency parameters. The latency will be queried from
1055  * the upstream elements. @live will be TRUE if @sink is configured to
1056  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1057  * element is live.
1058  *
1059  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1060  * for the latency introduced by the upstream elements by setting the
1061  * @min_latency to a strictly possitive value.
1062  *
1063  * This function is mostly used by subclasses.
1064  *
1065  * Returns: TRUE if the query succeeded.
1066  *
1067  * Since: 0.10.12
1068  */
1069 gboolean
1070 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1071     gboolean * upstream_live, GstClockTime * min_latency,
1072     GstClockTime * max_latency)
1073 {
1074   gboolean l, us_live, res, have_latency;
1075   GstClockTime min, max, render_delay;
1076   GstQuery *query;
1077   GstClockTime us_min, us_max;
1078
1079   /* we are live when we sync to the clock */
1080   GST_OBJECT_LOCK (sink);
1081   l = sink->sync;
1082   have_latency = sink->priv->have_latency;
1083   render_delay = sink->priv->render_delay;
1084   GST_OBJECT_UNLOCK (sink);
1085
1086   /* assume no latency */
1087   min = 0;
1088   max = -1;
1089   us_live = FALSE;
1090
1091   if (have_latency) {
1092     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1093     /* we are ready for a latency query this is when we preroll or when we are
1094      * not async. */
1095     query = gst_query_new_latency ();
1096
1097     /* ask the peer for the latency */
1098     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1099       /* get upstream min and max latency */
1100       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1101
1102       if (us_live) {
1103         /* upstream live, use its latency, subclasses should use these
1104          * values to create the complete latency. */
1105         min = us_min;
1106         max = us_max;
1107       }
1108       if (l) {
1109         /* we need to add the render delay if we are live */
1110         if (min != -1)
1111           min += render_delay;
1112         if (max != -1)
1113           max += render_delay;
1114       }
1115     }
1116     gst_query_unref (query);
1117   } else {
1118     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1119     res = FALSE;
1120   }
1121
1122   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1123   if (!res) {
1124     if (!l) {
1125       res = TRUE;
1126       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1127     } else {
1128       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1129     }
1130   }
1131
1132   if (res) {
1133     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1134         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1135         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1136
1137     if (live)
1138       *live = l;
1139     if (upstream_live)
1140       *upstream_live = us_live;
1141     if (min_latency)
1142       *min_latency = min;
1143     if (max_latency)
1144       *max_latency = max;
1145   }
1146   return res;
1147 }
1148
1149 /**
1150  * gst_base_sink_set_render_delay:
1151  * @sink: a #GstBaseSink
1152  * @delay: the new delay
1153  *
1154  * Set the render delay in @sink to @delay. The render delay is the time
1155  * between actual rendering of a buffer and its synchronisation time. Some
1156  * devices might delay media rendering which can be compensated for with this
1157  * function.
1158  *
1159  * After calling this function, this sink will report additional latency and
1160  * other sinks will adjust their latency to delay the rendering of their media.
1161  *
1162  * This function is usually called by subclasses.
1163  *
1164  * Since: 0.10.21
1165  */
1166 void
1167 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1168 {
1169   GstClockTime old_render_delay;
1170
1171   g_return_if_fail (GST_IS_BASE_SINK (sink));
1172
1173   GST_OBJECT_LOCK (sink);
1174   old_render_delay = sink->priv->render_delay;
1175   sink->priv->render_delay = delay;
1176   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1177       GST_TIME_ARGS (delay));
1178   GST_OBJECT_UNLOCK (sink);
1179
1180   if (delay != old_render_delay) {
1181     GST_DEBUG_OBJECT (sink, "posting latency changed");
1182     gst_element_post_message (GST_ELEMENT_CAST (sink),
1183         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1184   }
1185 }
1186
1187 /**
1188  * gst_base_sink_get_render_delay:
1189  * @sink: a #GstBaseSink
1190  *
1191  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1192  * information about the render delay.
1193  *
1194  * Returns: the render delay of @sink.
1195  *
1196  * Since: 0.10.21
1197  */
1198 GstClockTime
1199 gst_base_sink_get_render_delay (GstBaseSink * sink)
1200 {
1201   GstClockTimeDiff res;
1202
1203   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1204
1205   GST_OBJECT_LOCK (sink);
1206   res = sink->priv->render_delay;
1207   GST_OBJECT_UNLOCK (sink);
1208
1209   return res;
1210 }
1211
1212 /**
1213  * gst_base_sink_set_blocksize:
1214  * @sink: a #GstBaseSink
1215  * @blocksize: the blocksize in bytes
1216  *
1217  * Set the number of bytes that the sink will pull when it is operating in pull
1218  * mode.
1219  *
1220  * Since: 0.10.22
1221  */
1222 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1223 void
1224 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1225 {
1226   g_return_if_fail (GST_IS_BASE_SINK (sink));
1227
1228   GST_OBJECT_LOCK (sink);
1229   sink->priv->blocksize = blocksize;
1230   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1231   GST_OBJECT_UNLOCK (sink);
1232 }
1233
1234 /**
1235  * gst_base_sink_get_blocksize:
1236  * @sink: a #GstBaseSink
1237  *
1238  * Get the number of bytes that the sink will pull when it is operating in pull
1239  * mode.
1240  *
1241  * Returns: the number of bytes @sink will pull in pull mode.
1242  *
1243  * Since: 0.10.22
1244  */
1245 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1246 guint
1247 gst_base_sink_get_blocksize (GstBaseSink * sink)
1248 {
1249   guint res;
1250
1251   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1252
1253   GST_OBJECT_LOCK (sink);
1254   res = sink->priv->blocksize;
1255   GST_OBJECT_UNLOCK (sink);
1256
1257   return res;
1258 }
1259
1260 /**
1261  * gst_base_sink_set_throttle_time:
1262  * @sink: a #GstBaseSink
1263  * @throttle: the throttle time in nanoseconds
1264  *
1265  * Set the time that will be inserted between rendered buffers. This
1266  * can be used to control the maximum buffers per second that the sink
1267  * will render. 
1268  *
1269  * Since: 0.10.33
1270  */
1271 void
1272 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1273 {
1274   g_return_if_fail (GST_IS_BASE_SINK (sink));
1275
1276   GST_OBJECT_LOCK (sink);
1277   sink->priv->throttle_time = throttle;
1278   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1279   GST_OBJECT_UNLOCK (sink);
1280 }
1281
1282 /**
1283  * gst_base_sink_get_throttle_time:
1284  * @sink: a #GstBaseSink
1285  *
1286  * Get the time that will be inserted between frames to control the 
1287  * maximum buffers per second.
1288  *
1289  * Returns: the number of nanoseconds @sink will put between frames.
1290  *
1291  * Since: 0.10.33
1292  */
1293 guint64
1294 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1295 {
1296   guint64 res;
1297
1298   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1299
1300   GST_OBJECT_LOCK (sink);
1301   res = sink->priv->throttle_time;
1302   GST_OBJECT_UNLOCK (sink);
1303
1304   return res;
1305 }
1306
1307 static void
1308 gst_base_sink_set_property (GObject * object, guint prop_id,
1309     const GValue * value, GParamSpec * pspec)
1310 {
1311   GstBaseSink *sink = GST_BASE_SINK (object);
1312
1313   switch (prop_id) {
1314     case PROP_SYNC:
1315       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1316       break;
1317     case PROP_MAX_LATENESS:
1318       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1319       break;
1320     case PROP_QOS:
1321       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1322       break;
1323     case PROP_ASYNC:
1324       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1325       break;
1326     case PROP_TS_OFFSET:
1327       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1328       break;
1329     case PROP_BLOCKSIZE:
1330       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1331       break;
1332     case PROP_RENDER_DELAY:
1333       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1334       break;
1335     case PROP_ENABLE_LAST_SAMPLE:
1336       gst_base_sink_set_last_sample_enabled (sink, g_value_get_boolean (value));
1337       break;
1338     case PROP_THROTTLE_TIME:
1339       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1340       break;
1341     default:
1342       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1343       break;
1344   }
1345 }
1346
1347 static void
1348 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1349     GParamSpec * pspec)
1350 {
1351   GstBaseSink *sink = GST_BASE_SINK (object);
1352
1353   switch (prop_id) {
1354     case PROP_SYNC:
1355       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1356       break;
1357     case PROP_MAX_LATENESS:
1358       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1359       break;
1360     case PROP_QOS:
1361       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1362       break;
1363     case PROP_ASYNC:
1364       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1365       break;
1366     case PROP_TS_OFFSET:
1367       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1368       break;
1369     case PROP_LAST_SAMPLE:
1370       gst_value_take_buffer (value, gst_base_sink_get_last_sample (sink));
1371       break;
1372     case PROP_ENABLE_LAST_SAMPLE:
1373       g_value_set_boolean (value, gst_base_sink_is_last_sample_enabled (sink));
1374       break;
1375     case PROP_BLOCKSIZE:
1376       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1377       break;
1378     case PROP_RENDER_DELAY:
1379       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1380       break;
1381     case PROP_THROTTLE_TIME:
1382       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1383       break;
1384     default:
1385       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1386       break;
1387   }
1388 }
1389
1390
1391 static GstCaps *
1392 gst_base_sink_default_get_caps (GstBaseSink * sink, GstCaps * filter)
1393 {
1394   return NULL;
1395 }
1396
1397 static gboolean
1398 gst_base_sink_default_set_caps (GstBaseSink * sink, GstCaps * caps)
1399 {
1400   return TRUE;
1401 }
1402
1403 /* with PREROLL_LOCK, STREAM_LOCK */
1404 static gboolean
1405 gst_base_sink_commit_state (GstBaseSink * basesink)
1406 {
1407   /* commit state and proceed to next pending state */
1408   GstState current, next, pending, post_pending;
1409   gboolean post_paused = FALSE;
1410   gboolean post_async_done = FALSE;
1411   gboolean post_playing = FALSE;
1412   gboolean reset_time;
1413
1414   /* we are certainly not playing async anymore now */
1415   basesink->playing_async = FALSE;
1416
1417   GST_OBJECT_LOCK (basesink);
1418   current = GST_STATE (basesink);
1419   next = GST_STATE_NEXT (basesink);
1420   pending = GST_STATE_PENDING (basesink);
1421   post_pending = pending;
1422   reset_time = basesink->priv->reset_time;
1423   basesink->priv->reset_time = FALSE;
1424
1425   switch (pending) {
1426     case GST_STATE_PLAYING:
1427     {
1428       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1429
1430       basesink->need_preroll = FALSE;
1431       post_async_done = TRUE;
1432       basesink->priv->commited = TRUE;
1433       post_playing = TRUE;
1434       /* post PAUSED too when we were READY */
1435       if (current == GST_STATE_READY) {
1436         post_paused = TRUE;
1437       }
1438       break;
1439     }
1440     case GST_STATE_PAUSED:
1441       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1442       post_paused = TRUE;
1443       post_async_done = TRUE;
1444       basesink->priv->commited = TRUE;
1445       post_pending = GST_STATE_VOID_PENDING;
1446       break;
1447     case GST_STATE_READY:
1448     case GST_STATE_NULL:
1449       goto stopping;
1450     case GST_STATE_VOID_PENDING:
1451       goto nothing_pending;
1452     default:
1453       break;
1454   }
1455
1456   /* we can report latency queries now */
1457   basesink->priv->have_latency = TRUE;
1458
1459   GST_STATE (basesink) = pending;
1460   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1461   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1462   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1463   GST_OBJECT_UNLOCK (basesink);
1464
1465   if (post_paused) {
1466     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1467     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1468         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1469             current, next, post_pending));
1470   }
1471   if (post_async_done) {
1472     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1473     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1474         gst_message_new_async_done (GST_OBJECT_CAST (basesink), reset_time));
1475   }
1476   if (post_playing) {
1477     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1478     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1479         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1480             next, pending, GST_STATE_VOID_PENDING));
1481   }
1482
1483   GST_STATE_BROADCAST (basesink);
1484
1485   return TRUE;
1486
1487 nothing_pending:
1488   {
1489     /* Depending on the state, set our vars. We get in this situation when the
1490      * state change function got a change to update the state vars before the
1491      * streaming thread did. This is fine but we need to make sure that we
1492      * update the need_preroll var since it was TRUE when we got here and might
1493      * become FALSE if we got to PLAYING. */
1494     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1495         gst_element_state_get_name (current));
1496     switch (current) {
1497       case GST_STATE_PLAYING:
1498         basesink->need_preroll = FALSE;
1499         break;
1500       case GST_STATE_PAUSED:
1501         basesink->need_preroll = TRUE;
1502         break;
1503       default:
1504         basesink->need_preroll = FALSE;
1505         basesink->flushing = TRUE;
1506         break;
1507     }
1508     /* we can report latency queries now */
1509     basesink->priv->have_latency = TRUE;
1510     GST_OBJECT_UNLOCK (basesink);
1511     return TRUE;
1512   }
1513 stopping:
1514   {
1515     /* app is going to READY */
1516     GST_DEBUG_OBJECT (basesink, "stopping");
1517     basesink->need_preroll = FALSE;
1518     basesink->flushing = TRUE;
1519     GST_OBJECT_UNLOCK (basesink);
1520     return FALSE;
1521   }
1522 }
1523
1524 static void
1525 start_stepping (GstBaseSink * sink, GstSegment * segment,
1526     GstStepInfo * pending, GstStepInfo * current)
1527 {
1528   gint64 end;
1529   GstMessage *message;
1530
1531   GST_DEBUG_OBJECT (sink, "update pending step");
1532
1533   GST_OBJECT_LOCK (sink);
1534   memcpy (current, pending, sizeof (GstStepInfo));
1535   pending->valid = FALSE;
1536   GST_OBJECT_UNLOCK (sink);
1537
1538   /* post message first */
1539   message =
1540       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1541       current->amount, current->rate, current->flush, current->intermediate);
1542   gst_message_set_seqnum (message, current->seqnum);
1543   gst_element_post_message (GST_ELEMENT (sink), message);
1544
1545   /* get the running time of where we paused and remember it */
1546   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1547   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1548
1549   /* set the new rate for the remainder of the segment */
1550   current->start_rate = segment->rate;
1551   segment->rate *= current->rate;
1552
1553   /* save values */
1554   if (segment->rate > 0.0)
1555     current->start_stop = segment->stop;
1556   else
1557     current->start_start = segment->start;
1558
1559   if (current->format == GST_FORMAT_TIME) {
1560     end = current->start + current->amount;
1561     if (!current->flush) {
1562       /* update the segment clipping regions for non-flushing seeks */
1563       if (segment->rate > 0.0) {
1564         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1565         segment->position = segment->stop;
1566       } else {
1567         gint64 position;
1568
1569         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1570         segment->time = position;
1571         segment->start = position;
1572         segment->position = position;
1573       }
1574     }
1575   }
1576
1577   GST_DEBUG_OBJECT (sink, "segment now %" GST_SEGMENT_FORMAT, segment);
1578   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1579       GST_TIME_ARGS (current->start));
1580
1581   if (current->amount == -1) {
1582     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1583     current->valid = FALSE;
1584   } else {
1585     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1586         "rate: %f", current->amount, gst_format_get_name (current->format),
1587         current->rate);
1588   }
1589 }
1590
1591 static void
1592 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1593     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1594 {
1595   gint64 stop, position;
1596   GstMessage *message;
1597
1598   GST_DEBUG_OBJECT (sink, "step complete");
1599
1600   if (segment->rate > 0.0)
1601     stop = rstart;
1602   else
1603     stop = rstop;
1604
1605   GST_DEBUG_OBJECT (sink,
1606       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1607
1608   if (stop == -1)
1609     current->duration = current->position;
1610   else
1611     current->duration = stop - current->start;
1612
1613   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1614       GST_TIME_ARGS (current->duration));
1615
1616   position = current->start + current->duration;
1617
1618   /* now move the segment to the new running time */
1619   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1620
1621   if (current->flush) {
1622     /* and remove the time we flushed, start time did not change */
1623     segment->base = current->start;
1624   } else {
1625     /* start time is now the stepped position */
1626     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1627   }
1628
1629   /* restore the previous rate */
1630   segment->rate = current->start_rate;
1631
1632   if (segment->rate > 0.0)
1633     segment->stop = current->start_stop;
1634   else
1635     segment->start = current->start_start;
1636
1637   /* post the step done when we know the stepped duration in TIME */
1638   message =
1639       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1640       current->amount, current->rate, current->flush, current->intermediate,
1641       current->duration, eos);
1642   gst_message_set_seqnum (message, current->seqnum);
1643   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1644
1645   if (!current->intermediate)
1646     sink->need_preroll = current->need_preroll;
1647
1648   /* and the current step info finished and becomes invalid */
1649   current->valid = FALSE;
1650 }
1651
1652 static gboolean
1653 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1654     GstStepInfo * current, guint64 * cstart, guint64 * cstop, guint64 * rstart,
1655     guint64 * rstop)
1656 {
1657   gboolean step_end = FALSE;
1658
1659   /* see if we need to skip this buffer because of stepping */
1660   switch (current->format) {
1661     case GST_FORMAT_TIME:
1662     {
1663       guint64 end;
1664       guint64 first, last;
1665       gdouble abs_rate;
1666
1667       if (segment->rate > 0.0) {
1668         if (segment->stop == *cstop)
1669           *rstop = *rstart + current->amount;
1670
1671         first = *rstart;
1672         last = *rstop;
1673       } else {
1674         if (segment->start == *cstart)
1675           *rstart = *rstop + current->amount;
1676
1677         first = *rstop;
1678         last = *rstart;
1679       }
1680
1681       end = current->start + current->amount;
1682       current->position = first - current->start;
1683
1684       abs_rate = ABS (segment->rate);
1685       if (G_UNLIKELY (abs_rate != 1.0))
1686         current->position /= abs_rate;
1687
1688       GST_DEBUG_OBJECT (sink,
1689           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1690           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1691       GST_DEBUG_OBJECT (sink,
1692           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1693           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1694           GST_TIME_ARGS (last - current->start),
1695           GST_TIME_ARGS (current->amount));
1696
1697       if ((current->flush && current->position >= current->amount)
1698           || last >= end) {
1699         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1700         step_end = TRUE;
1701         if (segment->rate > 0.0) {
1702           *rstart = end;
1703           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1704         } else {
1705           *rstop = end;
1706           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1707         }
1708       }
1709       GST_DEBUG_OBJECT (sink,
1710           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1711           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1712       GST_DEBUG_OBJECT (sink,
1713           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1714           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1715       break;
1716     }
1717     case GST_FORMAT_BUFFERS:
1718       GST_DEBUG_OBJECT (sink,
1719           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1720           current->position, current->amount);
1721
1722       if (current->position < current->amount) {
1723         current->position++;
1724       } else {
1725         step_end = TRUE;
1726       }
1727       break;
1728     case GST_FORMAT_DEFAULT:
1729     default:
1730       GST_DEBUG_OBJECT (sink,
1731           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1732           current->position, current->amount);
1733       break;
1734   }
1735   return step_end;
1736 }
1737
1738 /* with STREAM_LOCK, PREROLL_LOCK
1739  *
1740  * Returns TRUE if the object needs synchronisation and takes therefore
1741  * part in prerolling.
1742  *
1743  * rsstart/rsstop contain the start/stop in stream time.
1744  * rrstart/rrstop contain the start/stop in running time.
1745  */
1746 static gboolean
1747 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1748     GstClockTime * rsstart, GstClockTime * rsstop,
1749     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1750     gboolean * stepped, GstStepInfo * step, gboolean * step_end)
1751 {
1752   GstBaseSinkClass *bclass;
1753   GstBuffer *buffer;
1754   GstClockTime start, stop;     /* raw start/stop timestamps */
1755   guint64 cstart, cstop;        /* clipped raw timestamps */
1756   guint64 rstart, rstop;        /* clipped timestamps converted to running time */
1757   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1758   GstFormat format;
1759   GstBaseSinkPrivate *priv;
1760   GstSegment *segment;
1761   gboolean eos;
1762
1763   priv = basesink->priv;
1764   segment = &basesink->segment;
1765
1766   /* start with nothing */
1767   start = stop = GST_CLOCK_TIME_NONE;
1768
1769   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1770     GstEvent *event = GST_EVENT_CAST (obj);
1771
1772     switch (GST_EVENT_TYPE (event)) {
1773         /* EOS event needs syncing */
1774       case GST_EVENT_EOS:
1775       {
1776         if (segment->rate >= 0.0) {
1777           sstart = sstop = priv->current_sstop;
1778           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1779             /* we have not seen a buffer yet, use the segment values */
1780             sstart = sstop = gst_segment_to_stream_time (segment,
1781                 segment->format, segment->stop);
1782           }
1783         } else {
1784           sstart = sstop = priv->current_sstart;
1785           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1786             /* we have not seen a buffer yet, use the segment values */
1787             sstart = sstop = gst_segment_to_stream_time (segment,
1788                 segment->format, segment->start);
1789           }
1790         }
1791
1792         rstart = rstop = priv->eos_rtime;
1793         *do_sync = rstart != -1;
1794         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1795             GST_TIME_ARGS (rstart));
1796         /* if we are stepping, we end now */
1797         *step_end = step->valid;
1798         eos = TRUE;
1799         goto eos_done;
1800       }
1801       default:
1802         /* other events do not need syncing */
1803         return FALSE;
1804     }
1805   }
1806
1807   eos = FALSE;
1808
1809 again:
1810   /* else do buffer sync code */
1811   buffer = GST_BUFFER_CAST (obj);
1812
1813   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1814
1815   /* just get the times to see if we need syncing, if the start returns -1 we
1816    * don't sync. */
1817   if (bclass->get_times)
1818     bclass->get_times (basesink, buffer, &start, &stop);
1819
1820   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1821     /* we don't need to sync but we still want to get the timestamps for
1822      * tracking the position */
1823     gst_base_sink_default_get_times (basesink, buffer, &start, &stop);
1824     *do_sync = FALSE;
1825   } else {
1826     *do_sync = TRUE;
1827   }
1828
1829   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1830       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1831       GST_TIME_ARGS (stop), *do_sync);
1832
1833   /* collect segment and format for code clarity */
1834   format = segment->format;
1835
1836   /* clip */
1837   if (G_UNLIKELY (!gst_segment_clip (segment, format,
1838               start, stop, &cstart, &cstop))) {
1839     if (step->valid) {
1840       GST_DEBUG_OBJECT (basesink, "step out of segment");
1841       /* when we are stepping, pretend we're at the end of the segment */
1842       if (segment->rate > 0.0) {
1843         cstart = segment->stop;
1844         cstop = segment->stop;
1845       } else {
1846         cstart = segment->start;
1847         cstop = segment->start;
1848       }
1849       goto do_times;
1850     }
1851     goto out_of_segment;
1852   }
1853
1854   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1855     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1856         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1857         GST_TIME_ARGS (cstop));
1858   }
1859
1860   /* set last stop position */
1861   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
1862     segment->position = cstop;
1863   else
1864     segment->position = cstart;
1865
1866 do_times:
1867   rstart = gst_segment_to_running_time (segment, format, cstart);
1868   rstop = gst_segment_to_running_time (segment, format, cstop);
1869
1870   if (G_UNLIKELY (step->valid)) {
1871     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1872                 &rstart, &rstop))) {
1873       /* step is still busy, we discard data when we are flushing */
1874       *stepped = step->flush;
1875       GST_DEBUG_OBJECT (basesink, "stepping busy");
1876     }
1877   }
1878   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1879    * upstream is behaving very badly */
1880   sstart = gst_segment_to_stream_time (segment, format, cstart);
1881   sstop = gst_segment_to_stream_time (segment, format, cstop);
1882
1883 eos_done:
1884   /* eos_done label only called when doing EOS, we also stop stepping then */
1885   if (*step_end && step->flush) {
1886     GST_DEBUG_OBJECT (basesink, "flushing step ended");
1887     stop_stepping (basesink, segment, step, rstart, rstop, eos);
1888     *step_end = FALSE;
1889     /* re-determine running start times for adjusted segment
1890      * (which has a flushed amount of running/accumulated time removed) */
1891     if (!GST_IS_EVENT (obj)) {
1892       GST_DEBUG_OBJECT (basesink, "refresh sync times");
1893       goto again;
1894     }
1895   }
1896
1897   /* save times */
1898   *rsstart = sstart;
1899   *rsstop = sstop;
1900   *rrstart = rstart;
1901   *rrstop = rstop;
1902
1903   /* buffers and EOS always need syncing and preroll */
1904   return TRUE;
1905
1906   /* special cases */
1907 out_of_segment:
1908   {
1909     /* we usually clip in the chain function already but stepping could cause
1910      * the segment to be updated later. we return FALSE so that we don't try
1911      * to sync on it. */
1912     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1913     return FALSE;
1914   }
1915 }
1916
1917 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1918  * adjust a timestamp with the latency and timestamp offset. This function does
1919  * not adjust for the render delay. */
1920 static GstClockTime
1921 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1922 {
1923   GstClockTimeDiff ts_offset;
1924
1925   /* don't do anything funny with invalid timestamps */
1926   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1927     return time;
1928
1929   time += basesink->priv->latency;
1930
1931   /* apply offset, be carefull for underflows */
1932   ts_offset = basesink->priv->ts_offset;
1933   if (ts_offset < 0) {
1934     ts_offset = -ts_offset;
1935     if (ts_offset < time)
1936       time -= ts_offset;
1937     else
1938       time = 0;
1939   } else
1940     time += ts_offset;
1941
1942   /* subtract the render delay again, which was included in the latency */
1943   if (time > basesink->priv->render_delay)
1944     time -= basesink->priv->render_delay;
1945   else
1946     time = 0;
1947
1948   return time;
1949 }
1950
1951 /**
1952  * gst_base_sink_wait_clock:
1953  * @sink: the sink
1954  * @time: the running_time to be reached
1955  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
1956  *
1957  * This function will block until @time is reached. It is usually called by
1958  * subclasses that use their own internal synchronisation.
1959  *
1960  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1961  * returned. Likewise, if synchronisation is disabled in the element or there
1962  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1963  *
1964  * This function should only be called with the PREROLL_LOCK held, like when
1965  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
1966  * receiving a buffer in
1967  * the #GstBaseSinkClass.render() vmethod.
1968  *
1969  * The @time argument should be the running_time of when this method should
1970  * return and is not adjusted with any latency or offset configured in the
1971  * sink.
1972  *
1973  * Since: 0.10.20
1974  *
1975  * Returns: #GstClockReturn
1976  */
1977 GstClockReturn
1978 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1979     GstClockTimeDiff * jitter)
1980 {
1981   GstClockReturn ret;
1982   GstClock *clock;
1983   GstClockTime base_time;
1984
1985   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1986     goto invalid_time;
1987
1988   GST_OBJECT_LOCK (sink);
1989   if (G_UNLIKELY (!sink->sync))
1990     goto no_sync;
1991
1992   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1993     goto no_clock;
1994
1995   base_time = GST_ELEMENT_CAST (sink)->base_time;
1996   GST_LOG_OBJECT (sink,
1997       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
1998       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
1999
2000   /* add base_time to running_time to get the time against the clock */
2001   time += base_time;
2002
2003   /* Re-use existing clockid if available */
2004   /* FIXME: Casting to GstClockEntry only works because the types
2005    * are the same */
2006   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2007           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
2008               priv->cached_clock_id) == clock)) {
2009     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2010             time)) {
2011       gst_clock_id_unref (sink->priv->cached_clock_id);
2012       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2013     }
2014   } else {
2015     if (sink->priv->cached_clock_id != NULL)
2016       gst_clock_id_unref (sink->priv->cached_clock_id);
2017     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2018   }
2019   GST_OBJECT_UNLOCK (sink);
2020
2021   /* A blocking wait is performed on the clock. We save the ClockID
2022    * so we can unlock the entry at any time. While we are blocking, we
2023    * release the PREROLL_LOCK so that other threads can interrupt the
2024    * entry. */
2025   sink->clock_id = sink->priv->cached_clock_id;
2026   /* release the preroll lock while waiting */
2027   GST_BASE_SINK_PREROLL_UNLOCK (sink);
2028
2029   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2030
2031   GST_BASE_SINK_PREROLL_LOCK (sink);
2032   sink->clock_id = NULL;
2033
2034   return ret;
2035
2036   /* no syncing needed */
2037 invalid_time:
2038   {
2039     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2040     return GST_CLOCK_BADTIME;
2041   }
2042 no_sync:
2043   {
2044     GST_DEBUG_OBJECT (sink, "sync disabled");
2045     GST_OBJECT_UNLOCK (sink);
2046     return GST_CLOCK_BADTIME;
2047   }
2048 no_clock:
2049   {
2050     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2051     GST_OBJECT_UNLOCK (sink);
2052     return GST_CLOCK_BADTIME;
2053   }
2054 }
2055
2056 /**
2057  * gst_base_sink_wait_preroll:
2058  * @sink: the sink
2059  *
2060  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2061  * against the clock it must unblock when going from PLAYING to the PAUSED state
2062  * and call this method before continuing to render the remaining data.
2063  *
2064  * This function will block until a state change to PLAYING happens (in which
2065  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2066  * to a state change to READY or a FLUSH event (in which case this function
2067  * returns #GST_FLOW_WRONG_STATE).
2068  *
2069  * This function should only be called with the PREROLL_LOCK held, like in the
2070  * render function.
2071  *
2072  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2073  * continue. Any other return value should be returned from the render vmethod.
2074  *
2075  * Since: 0.10.11
2076  */
2077 GstFlowReturn
2078 gst_base_sink_wait_preroll (GstBaseSink * sink)
2079 {
2080   sink->have_preroll = TRUE;
2081   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2082   /* block until the state changes, or we get a flush, or something */
2083   GST_BASE_SINK_PREROLL_WAIT (sink);
2084   sink->have_preroll = FALSE;
2085   if (G_UNLIKELY (sink->flushing))
2086     goto stopping;
2087   if (G_UNLIKELY (sink->priv->step_unlock))
2088     goto step_unlocked;
2089   GST_DEBUG_OBJECT (sink, "continue after preroll");
2090
2091   return GST_FLOW_OK;
2092
2093   /* ERRORS */
2094 stopping:
2095   {
2096     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2097     return GST_FLOW_WRONG_STATE;
2098   }
2099 step_unlocked:
2100   {
2101     sink->priv->step_unlock = FALSE;
2102     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2103     return GST_FLOW_STEP;
2104   }
2105 }
2106
2107 /**
2108  * gst_base_sink_do_preroll:
2109  * @sink: the sink
2110  * @obj: (transfer none): the mini object that caused the preroll
2111  *
2112  * If the @sink spawns its own thread for pulling buffers from upstream it
2113  * should call this method after it has pulled a buffer. If the element needed
2114  * to preroll, this function will perform the preroll and will then block
2115  * until the element state is changed.
2116  *
2117  * This function should be called with the PREROLL_LOCK held.
2118  *
2119  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2120  * continue. Any other return value should be returned from the render vmethod.
2121  *
2122  * Since: 0.10.22
2123  */
2124 GstFlowReturn
2125 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2126 {
2127   GstFlowReturn ret;
2128
2129   while (G_UNLIKELY (sink->need_preroll)) {
2130     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2131
2132     /* if it's a buffer, we need to call the preroll method */
2133     if (sink->priv->call_preroll) {
2134       GstBaseSinkClass *bclass;
2135       GstBuffer *buf;
2136
2137       if (GST_IS_BUFFER_LIST (obj)) {
2138         buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
2139         g_assert (NULL != buf);
2140       } else if (GST_IS_BUFFER (obj)) {
2141         buf = GST_BUFFER_CAST (obj);
2142         /* For buffer lists do not set last buffer for now */
2143         gst_base_sink_set_last_buffer (sink, buf);
2144       } else
2145         buf = NULL;
2146
2147       if (buf) {
2148         GST_DEBUG_OBJECT (sink, "preroll buffer %" GST_TIME_FORMAT,
2149             GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
2150
2151         bclass = GST_BASE_SINK_GET_CLASS (sink);
2152         if (bclass->preroll)
2153           if ((ret = bclass->preroll (sink, buf)) != GST_FLOW_OK)
2154             goto preroll_canceled;
2155
2156         sink->priv->call_preroll = FALSE;
2157       }
2158     }
2159
2160     /* commit state */
2161     if (G_LIKELY (sink->playing_async)) {
2162       if (G_UNLIKELY (!gst_base_sink_commit_state (sink)))
2163         goto stopping;
2164     }
2165
2166     /* need to recheck here because the commit state could have
2167      * made us not need the preroll anymore */
2168     if (G_LIKELY (sink->need_preroll)) {
2169       /* block until the state changes, or we get a flush, or something */
2170       ret = gst_base_sink_wait_preroll (sink);
2171       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2172         goto preroll_failed;
2173     }
2174   }
2175   return GST_FLOW_OK;
2176
2177   /* ERRORS */
2178 preroll_canceled:
2179   {
2180     GST_DEBUG_OBJECT (sink, "preroll failed, abort state");
2181     gst_element_abort_state (GST_ELEMENT_CAST (sink));
2182     return ret;
2183   }
2184 stopping:
2185   {
2186     GST_DEBUG_OBJECT (sink, "stopping while commiting state");
2187     return GST_FLOW_WRONG_STATE;
2188   }
2189 preroll_failed:
2190   {
2191     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2192     return ret;
2193   }
2194 }
2195
2196 /**
2197  * gst_base_sink_wait_eos:
2198  * @sink: the sink
2199  * @time: the running_time to be reached
2200  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2201  *
2202  * This function will block until @time is reached. It is usually called by
2203  * subclasses that use their own internal synchronisation but want to let the
2204  * EOS be handled by the base class.
2205  *
2206  * This function should only be called with the PREROLL_LOCK held, like when
2207  * receiving an EOS event in the ::event vmethod.
2208  *
2209  * The @time argument should be the running_time of when the EOS should happen
2210  * and will be adjusted with any latency and offset configured in the sink.
2211  *
2212  * Returns: #GstFlowReturn
2213  *
2214  * Since: 0.10.15
2215  */
2216 GstFlowReturn
2217 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2218     GstClockTimeDiff * jitter)
2219 {
2220   GstClockReturn status;
2221   GstFlowReturn ret;
2222
2223   do {
2224     GstClockTime stime;
2225
2226     GST_DEBUG_OBJECT (sink, "checking preroll");
2227
2228     /* first wait for the playing state before we can continue */
2229     while (G_UNLIKELY (sink->need_preroll)) {
2230       ret = gst_base_sink_wait_preroll (sink);
2231       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2232         goto flushing;
2233     }
2234
2235     /* preroll done, we can sync since we are in PLAYING now. */
2236     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2237         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2238
2239     /* compensate for latency and ts_offset. We don't adjust for render delay
2240      * because we don't interact with the device on EOS normally. */
2241     stime = gst_base_sink_adjust_time (sink, time);
2242
2243     /* wait for the clock, this can be interrupted because we got shut down or
2244      * we PAUSED. */
2245     status = gst_base_sink_wait_clock (sink, stime, jitter);
2246
2247     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2248
2249     /* invalid time, no clock or sync disabled, just continue then */
2250     if (status == GST_CLOCK_BADTIME)
2251       break;
2252
2253     /* waiting could have been interrupted and we can be flushing now */
2254     if (G_UNLIKELY (sink->flushing))
2255       goto flushing;
2256
2257     /* retry if we got unscheduled, which means we did not reach the timeout
2258      * yet. if some other error occures, we continue. */
2259   } while (status == GST_CLOCK_UNSCHEDULED);
2260
2261   GST_DEBUG_OBJECT (sink, "end of stream");
2262
2263   return GST_FLOW_OK;
2264
2265   /* ERRORS */
2266 flushing:
2267   {
2268     GST_DEBUG_OBJECT (sink, "we are flushing");
2269     return GST_FLOW_WRONG_STATE;
2270   }
2271 }
2272
2273 /* with STREAM_LOCK, PREROLL_LOCK
2274  *
2275  * Make sure we are in PLAYING and synchronize an object to the clock.
2276  *
2277  * If we need preroll, we are not in PLAYING. We try to commit the state
2278  * if needed and then block if we still are not PLAYING.
2279  *
2280  * We start waiting on the clock in PLAYING. If we got interrupted, we
2281  * immediately try to re-preroll.
2282  *
2283  * Some objects do not need synchronisation (most events) and so this function
2284  * immediately returns GST_FLOW_OK.
2285  *
2286  * for objects that arrive later than max-lateness to be synchronized to the
2287  * clock have the @late boolean set to TRUE.
2288  *
2289  * This function keeps a running average of the jitter (the diff between the
2290  * clock time and the requested sync time). The jitter is negative for
2291  * objects that arrive in time and positive for late buffers.
2292  *
2293  * does not take ownership of obj.
2294  */
2295 static GstFlowReturn
2296 gst_base_sink_do_sync (GstBaseSink * basesink,
2297     GstMiniObject * obj, gboolean * late, gboolean * step_end)
2298 {
2299   GstClockTimeDiff jitter = 0;
2300   gboolean syncable;
2301   GstClockReturn status = GST_CLOCK_OK;
2302   GstClockTime rstart, rstop, sstart, sstop, stime;
2303   gboolean do_sync;
2304   GstBaseSinkPrivate *priv;
2305   GstFlowReturn ret;
2306   GstStepInfo *current, *pending;
2307   gboolean stepped;
2308
2309   priv = basesink->priv;
2310
2311 do_step:
2312   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2313   do_sync = TRUE;
2314   stepped = FALSE;
2315
2316   priv->current_rstart = GST_CLOCK_TIME_NONE;
2317
2318   /* get stepping info */
2319   current = &priv->current_step;
2320   pending = &priv->pending_step;
2321
2322   /* get timing information for this object against the render segment */
2323   syncable = gst_base_sink_get_sync_times (basesink, obj,
2324       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, current, step_end);
2325
2326   if (G_UNLIKELY (stepped))
2327     goto step_skipped;
2328
2329   /* a syncable object needs to participate in preroll and
2330    * clocking. All buffers and EOS are syncable. */
2331   if (G_UNLIKELY (!syncable))
2332     goto not_syncable;
2333
2334   /* store timing info for current object */
2335   priv->current_rstart = rstart;
2336   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2337
2338   /* save sync time for eos when the previous object needed sync */
2339   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2340
2341   /* calculate inter frame spacing */
2342   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2343     GstClockTime in_diff;
2344
2345     in_diff = rstart - priv->prev_rstart;
2346
2347     if (priv->avg_in_diff == -1)
2348       priv->avg_in_diff = in_diff;
2349     else
2350       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2351
2352     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2353         GST_TIME_ARGS (priv->avg_in_diff));
2354
2355   }
2356   priv->prev_rstart = rstart;
2357
2358   if (G_UNLIKELY (priv->earliest_in_time != -1
2359           && rstart < priv->earliest_in_time))
2360     goto qos_dropped;
2361
2362 again:
2363   /* first do preroll, this makes sure we commit our state
2364    * to PAUSED and can continue to PLAYING. We cannot perform
2365    * any clock sync in PAUSED because there is no clock. */
2366   ret = gst_base_sink_do_preroll (basesink, obj);
2367   if (G_UNLIKELY (ret != GST_FLOW_OK))
2368     goto preroll_failed;
2369
2370   /* update the segment with a pending step if the current one is invalid and we
2371    * have a new pending one. We only accept new step updates after a preroll */
2372   if (G_UNLIKELY (pending->valid && !current->valid)) {
2373     start_stepping (basesink, &basesink->segment, pending, current);
2374     goto do_step;
2375   }
2376
2377   /* After rendering we store the position of the last buffer so that we can use
2378    * it to report the position. We need to take the lock here. */
2379   GST_OBJECT_LOCK (basesink);
2380   priv->current_sstart = sstart;
2381   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2382   GST_OBJECT_UNLOCK (basesink);
2383
2384   if (!do_sync)
2385     goto done;
2386
2387   /* adjust for latency */
2388   stime = gst_base_sink_adjust_time (basesink, rstart);
2389
2390   /* adjust for render-delay, avoid underflows */
2391   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2392     if (stime > priv->render_delay)
2393       stime -= priv->render_delay;
2394     else
2395       stime = 0;
2396   }
2397
2398   /* preroll done, we can sync since we are in PLAYING now. */
2399   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2400       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2401       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2402
2403   /* This function will return immediately if start == -1, no clock
2404    * or sync is disabled with GST_CLOCK_BADTIME. */
2405   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2406
2407   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2408       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2409
2410   /* invalid time, no clock or sync disabled, just render */
2411   if (status == GST_CLOCK_BADTIME)
2412     goto done;
2413
2414   /* waiting could have been interrupted and we can be flushing now */
2415   if (G_UNLIKELY (basesink->flushing))
2416     goto flushing;
2417
2418   /* check for unlocked by a state change, we are not flushing so
2419    * we can try to preroll on the current buffer. */
2420   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2421     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2422     priv->call_preroll = TRUE;
2423     goto again;
2424   }
2425
2426   /* successful syncing done, record observation */
2427   priv->current_jitter = jitter;
2428
2429   /* check if the object should be dropped */
2430   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2431       status, jitter);
2432
2433 done:
2434   return GST_FLOW_OK;
2435
2436   /* ERRORS */
2437 step_skipped:
2438   {
2439     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2440     *late = TRUE;
2441     return GST_FLOW_OK;
2442   }
2443 not_syncable:
2444   {
2445     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2446     return GST_FLOW_OK;
2447   }
2448 qos_dropped:
2449   {
2450     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2451     *late = TRUE;
2452     return GST_FLOW_OK;
2453   }
2454 flushing:
2455   {
2456     GST_DEBUG_OBJECT (basesink, "we are flushing");
2457     return GST_FLOW_WRONG_STATE;
2458   }
2459 preroll_failed:
2460   {
2461     GST_DEBUG_OBJECT (basesink, "preroll failed");
2462     *step_end = FALSE;
2463     return ret;
2464   }
2465 }
2466
2467 static gboolean
2468 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2469     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2470 {
2471   GstEvent *event;
2472   gboolean res;
2473
2474   /* generate Quality-of-Service event */
2475   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2476       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2477       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2478
2479   event = gst_event_new_qos (type, proportion, diff, time);
2480
2481   /* send upstream */
2482   res = gst_pad_push_event (basesink->sinkpad, event);
2483
2484   return res;
2485 }
2486
2487 static void
2488 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2489 {
2490   GstBaseSinkPrivate *priv;
2491   GstClockTime start, stop;
2492   GstClockTimeDiff jitter;
2493   GstClockTime pt, entered, left;
2494   GstClockTime duration;
2495   gdouble rate;
2496
2497   priv = sink->priv;
2498
2499   start = priv->current_rstart;
2500
2501   if (priv->current_step.valid)
2502     return;
2503
2504   /* if Quality-of-Service disabled, do nothing */
2505   if (!g_atomic_int_get (&priv->qos_enabled) ||
2506       !GST_CLOCK_TIME_IS_VALID (start))
2507     return;
2508
2509   stop = priv->current_rstop;
2510   jitter = priv->current_jitter;
2511
2512   if (jitter < 0) {
2513     /* this is the time the buffer entered the sink */
2514     if (start < -jitter)
2515       entered = 0;
2516     else
2517       entered = start + jitter;
2518     left = start;
2519   } else {
2520     /* this is the time the buffer entered the sink */
2521     entered = start + jitter;
2522     /* this is the time the buffer left the sink */
2523     left = start + jitter;
2524   }
2525
2526   /* calculate duration of the buffer */
2527   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2528     duration = stop - start;
2529   else
2530     duration = priv->avg_in_diff;
2531
2532   /* if we have the time when the last buffer left us, calculate
2533    * processing time */
2534   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2535     if (entered > priv->last_left) {
2536       pt = entered - priv->last_left;
2537     } else {
2538       pt = 0;
2539     }
2540   } else {
2541     pt = priv->avg_pt;
2542   }
2543
2544   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2545       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2546       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2547       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2548       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2549       GST_TIME_ARGS (duration), jitter);
2550
2551   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2552       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2553       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2554       priv->avg_rate);
2555
2556   /* collect running averages. for first observations, we copy the
2557    * values */
2558   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2559     priv->avg_duration = duration;
2560   else
2561     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2562
2563   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2564     priv->avg_pt = pt;
2565   else
2566     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2567
2568   if (priv->avg_duration != 0)
2569     rate =
2570         gst_guint64_to_gdouble (priv->avg_pt) /
2571         gst_guint64_to_gdouble (priv->avg_duration);
2572   else
2573     rate = 1.0;
2574
2575   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2576     if (dropped || priv->avg_rate < 0.0) {
2577       priv->avg_rate = rate;
2578     } else {
2579       if (rate > 1.0)
2580         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2581       else
2582         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2583     }
2584   }
2585
2586   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2587       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2588       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2589       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2590
2591
2592   if (priv->avg_rate >= 0.0) {
2593     GstQOSType type;
2594     GstClockTimeDiff diff;
2595
2596     /* if we have a valid rate, start sending QoS messages */
2597     if (priv->current_jitter < 0) {
2598       /* make sure we never go below 0 when adding the jitter to the
2599        * timestamp. */
2600       if (priv->current_rstart < -priv->current_jitter)
2601         priv->current_jitter = -priv->current_rstart;
2602     }
2603
2604     if (priv->throttle_time > 0) {
2605       diff = priv->throttle_time;
2606       type = GST_QOS_TYPE_THROTTLE;
2607     } else {
2608       diff = priv->current_jitter;
2609       if (diff <= 0)
2610         type = GST_QOS_TYPE_OVERFLOW;
2611       else
2612         type = GST_QOS_TYPE_UNDERFLOW;
2613     }
2614
2615     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2616         diff);
2617   }
2618
2619   /* record when this buffer will leave us */
2620   priv->last_left = left;
2621 }
2622
2623 /* reset all qos measuring */
2624 static void
2625 gst_base_sink_reset_qos (GstBaseSink * sink)
2626 {
2627   GstBaseSinkPrivate *priv;
2628
2629   priv = sink->priv;
2630
2631   priv->last_render_time = GST_CLOCK_TIME_NONE;
2632   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2633   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2634   priv->last_left = GST_CLOCK_TIME_NONE;
2635   priv->avg_duration = GST_CLOCK_TIME_NONE;
2636   priv->avg_pt = GST_CLOCK_TIME_NONE;
2637   priv->avg_rate = -1.0;
2638   priv->avg_render = GST_CLOCK_TIME_NONE;
2639   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2640   priv->rendered = 0;
2641   priv->dropped = 0;
2642
2643 }
2644
2645 /* Checks if the object was scheduled too late.
2646  *
2647  * rstart/rstop contain the running_time start and stop values
2648  * of the object.
2649  *
2650  * status and jitter contain the return values from the clock wait.
2651  *
2652  * returns TRUE if the buffer was too late.
2653  */
2654 static gboolean
2655 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2656     GstClockTime rstart, GstClockTime rstop,
2657     GstClockReturn status, GstClockTimeDiff jitter)
2658 {
2659   gboolean late;
2660   guint64 max_lateness;
2661   GstBaseSinkPrivate *priv;
2662
2663   priv = basesink->priv;
2664
2665   late = FALSE;
2666
2667   /* only for objects that were too late */
2668   if (G_LIKELY (status != GST_CLOCK_EARLY))
2669     goto in_time;
2670
2671   max_lateness = basesink->max_lateness;
2672
2673   /* check if frame dropping is enabled */
2674   if (max_lateness == -1)
2675     goto no_drop;
2676
2677   /* only check for buffers */
2678   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2679     goto not_buffer;
2680
2681   /* can't do check if we don't have a timestamp */
2682   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2683     goto no_timestamp;
2684
2685   /* we can add a valid stop time */
2686   if (GST_CLOCK_TIME_IS_VALID (rstop))
2687     max_lateness += rstop;
2688   else {
2689     max_lateness += rstart;
2690     /* no stop time, use avg frame diff */
2691     if (priv->avg_in_diff != -1)
2692       max_lateness += priv->avg_in_diff;
2693   }
2694
2695   /* if the jitter bigger than duration and lateness we are too late */
2696   if ((late = rstart + jitter > max_lateness)) {
2697     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2698         "buffer is too late %" GST_TIME_FORMAT
2699         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2700         GST_TIME_ARGS (max_lateness));
2701     /* !!emergency!!, if we did not receive anything valid for more than a
2702      * second, render it anyway so the user sees something */
2703     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2704         rstart - priv->last_render_time > GST_SECOND) {
2705       late = FALSE;
2706       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2707           (_("A lot of buffers are being dropped.")),
2708           ("There may be a timestamping problem, or this computer is too slow."));
2709       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2710           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2711           GST_TIME_ARGS (priv->last_render_time));
2712     }
2713   }
2714
2715 done:
2716   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2717     priv->last_render_time = rstart;
2718     /* the next allowed input timestamp */
2719     if (priv->throttle_time > 0)
2720       priv->earliest_in_time = rstart + priv->throttle_time;
2721   }
2722   return late;
2723
2724   /* all is fine */
2725 in_time:
2726   {
2727     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2728     goto done;
2729   }
2730 no_drop:
2731   {
2732     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2733     goto done;
2734   }
2735 not_buffer:
2736   {
2737     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2738     return FALSE;
2739   }
2740 no_timestamp:
2741   {
2742     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2743     return FALSE;
2744   }
2745 }
2746
2747 /* called before and after calling the render vmethod. It keeps track of how
2748  * much time was spent in the render method and is used to check if we are
2749  * flooded */
2750 static void
2751 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2752 {
2753   GstBaseSinkPrivate *priv;
2754
2755   priv = basesink->priv;
2756
2757   if (start) {
2758     priv->start = gst_util_get_timestamp ();
2759   } else {
2760     GstClockTime elapsed;
2761
2762     priv->stop = gst_util_get_timestamp ();
2763
2764     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2765
2766     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2767       priv->avg_render = elapsed;
2768     else
2769       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2770
2771     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2772         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2773   }
2774 }
2775
2776 static void
2777 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2778 {
2779   /* make sure we are not blocked on the clock also clear any pending
2780    * eos state. */
2781   gst_base_sink_set_flushing (basesink, pad, TRUE);
2782
2783   /* we grab the stream lock but that is not needed since setting the
2784    * sink to flushing would make sure no state commit is being done
2785    * anymore */
2786   GST_PAD_STREAM_LOCK (pad);
2787   gst_base_sink_reset_qos (basesink);
2788   /* and we need to commit our state again on the next
2789    * prerolled buffer */
2790   basesink->playing_async = TRUE;
2791   if (basesink->priv->async_enabled) {
2792     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2793   } else {
2794     /* start time reset in above case as well;
2795      * arranges for a.o. proper position reporting when flushing in PAUSED */
2796     gst_element_set_start_time (GST_ELEMENT_CAST (basesink), 0);
2797     basesink->priv->have_latency = TRUE;
2798   }
2799   gst_base_sink_set_last_buffer (basesink, NULL);
2800   GST_PAD_STREAM_UNLOCK (pad);
2801 }
2802
2803 static void
2804 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad,
2805     gboolean reset_time)
2806 {
2807   /* unset flushing so we can accept new data, this also flushes out any EOS
2808    * event. */
2809   gst_base_sink_set_flushing (basesink, pad, FALSE);
2810
2811   /* for position reporting */
2812   GST_OBJECT_LOCK (basesink);
2813   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
2814   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
2815   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
2816   basesink->priv->call_preroll = TRUE;
2817   basesink->priv->current_step.valid = FALSE;
2818   basesink->priv->pending_step.valid = FALSE;
2819   if (basesink->pad_mode == GST_PAD_MODE_PUSH) {
2820     /* we need new segment info after the flush. */
2821     basesink->have_newsegment = FALSE;
2822     if (reset_time) {
2823       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2824     }
2825   }
2826   basesink->priv->reset_time = reset_time;
2827   GST_OBJECT_UNLOCK (basesink);
2828 }
2829
2830 static GstFlowReturn
2831 gst_base_sink_default_wait_eos (GstBaseSink * basesink, GstEvent * event)
2832 {
2833   GstFlowReturn ret;
2834   gboolean late, step_end;
2835
2836   ret = gst_base_sink_do_sync (basesink, GST_MINI_OBJECT_CAST (event),
2837       &late, &step_end);
2838
2839   return ret;
2840 }
2841
2842 static gboolean
2843 gst_base_sink_default_event (GstBaseSink * basesink, GstEvent * event)
2844 {
2845   gboolean result = TRUE;
2846   GstBaseSinkClass *bclass;
2847
2848   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2849
2850   switch (GST_EVENT_TYPE (event)) {
2851     case GST_EVENT_FLUSH_START:
2852     {
2853       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2854       gst_base_sink_flush_start (basesink, basesink->sinkpad);
2855       break;
2856     }
2857     case GST_EVENT_FLUSH_STOP:
2858     {
2859       gboolean reset_time;
2860
2861       gst_event_parse_flush_stop (event, &reset_time);
2862       GST_DEBUG_OBJECT (basesink, "flush-stop %p, reset_time: %d", event,
2863           reset_time);
2864       gst_base_sink_flush_stop (basesink, basesink->sinkpad, reset_time);
2865       break;
2866     }
2867     case GST_EVENT_EOS:
2868     {
2869       GstMessage *message;
2870       guint32 seqnum;
2871
2872       /* we set the received EOS flag here so that we can use it when testing if
2873        * we are prerolled and to refuse more buffers. */
2874       basesink->priv->received_eos = TRUE;
2875
2876       /* wait for EOS */
2877       if (G_LIKELY (bclass->wait_eos)) {
2878         GstFlowReturn ret;
2879
2880         ret = bclass->wait_eos (basesink, event);
2881         if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2882           result = FALSE;
2883           goto done;
2884         }
2885       }
2886
2887       /* the EOS event is completely handled so we mark
2888        * ourselves as being in the EOS state. eos is also
2889        * protected by the object lock so we can read it when
2890        * answering the POSITION query. */
2891       GST_OBJECT_LOCK (basesink);
2892       basesink->eos = TRUE;
2893       GST_OBJECT_UNLOCK (basesink);
2894
2895       /* ok, now we can post the message */
2896       GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2897
2898       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2899       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2900
2901       message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2902       gst_message_set_seqnum (message, seqnum);
2903       gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2904       break;
2905     }
2906     case GST_EVENT_CAPS:
2907     {
2908       GstCaps *caps;
2909
2910       GST_DEBUG_OBJECT (basesink, "caps %p", event);
2911
2912       gst_event_parse_caps (event, &caps);
2913       if (bclass->set_caps)
2914         result = bclass->set_caps (basesink, caps);
2915
2916       if (result) {
2917         GST_OBJECT_LOCK (basesink);
2918         gst_caps_replace (&basesink->priv->caps, caps);
2919         GST_OBJECT_UNLOCK (basesink);
2920       }
2921       break;
2922     }
2923     case GST_EVENT_SEGMENT:
2924       /* configure the segment */
2925       /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
2926        * We protect with the OBJECT_LOCK so that we can use the values to
2927        * safely answer a POSITION query. */
2928       GST_OBJECT_LOCK (basesink);
2929       /* the newsegment event is needed to bring the buffer timestamps to the
2930        * stream time and to drop samples outside of the playback segment. */
2931       gst_event_copy_segment (event, &basesink->segment);
2932       GST_DEBUG_OBJECT (basesink, "configured SEGMENT %" GST_SEGMENT_FORMAT,
2933           &basesink->segment);
2934       basesink->have_newsegment = TRUE;
2935       GST_OBJECT_UNLOCK (basesink);
2936       break;
2937     case GST_EVENT_TAG:
2938     {
2939       GstTagList *taglist;
2940
2941       gst_event_parse_tag (event, &taglist);
2942
2943       gst_element_post_message (GST_ELEMENT_CAST (basesink),
2944           gst_message_new_tag (GST_OBJECT_CAST (basesink),
2945               gst_tag_list_copy (taglist)));
2946       break;
2947     }
2948     case GST_EVENT_SINK_MESSAGE:
2949     {
2950       GstMessage *msg = NULL;
2951
2952       gst_event_parse_sink_message (event, &msg);
2953       if (msg)
2954         gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
2955       break;
2956     }
2957     default:
2958       break;
2959   }
2960 done:
2961   gst_event_unref (event);
2962
2963   return result;
2964 }
2965
2966 static gboolean
2967 gst_base_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
2968 {
2969   GstBaseSink *basesink;
2970   gboolean result = TRUE;
2971   GstBaseSinkClass *bclass;
2972
2973   basesink = GST_BASE_SINK_CAST (parent);
2974   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2975
2976   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
2977       event);
2978
2979   switch (GST_EVENT_TYPE (event)) {
2980     case GST_EVENT_FLUSH_STOP:
2981       /* special case for this serialized event because we don't want to grab
2982        * the PREROLL lock or check if we were flushing */
2983       if (bclass->event)
2984         result = bclass->event (basesink, event);
2985       break;
2986     default:
2987       if (GST_EVENT_IS_SERIALIZED (event)) {
2988         GST_BASE_SINK_PREROLL_LOCK (basesink);
2989         if (G_UNLIKELY (basesink->flushing))
2990           goto flushing;
2991
2992         if (G_UNLIKELY (basesink->priv->received_eos))
2993           goto after_eos;
2994
2995         if (bclass->event)
2996           result = bclass->event (basesink, event);
2997
2998         GST_BASE_SINK_PREROLL_UNLOCK (basesink);
2999       } else {
3000         if (bclass->event)
3001           result = bclass->event (basesink, event);
3002       }
3003       break;
3004   }
3005 done:
3006   return result;
3007
3008   /* ERRORS */
3009 flushing:
3010   {
3011     GST_DEBUG_OBJECT (basesink, "we are flushing");
3012     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3013     gst_event_unref (event);
3014     result = FALSE;
3015     goto done;
3016   }
3017
3018 after_eos:
3019   {
3020     GST_DEBUG_OBJECT (basesink, "Event received after EOS, dropping");
3021     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3022     gst_event_unref (event);
3023     result = FALSE;
3024     goto done;
3025   }
3026 }
3027
3028 /* default implementation to calculate the start and end
3029  * timestamps on a buffer, subclasses can override
3030  */
3031 static void
3032 gst_base_sink_default_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3033     GstClockTime * start, GstClockTime * end)
3034 {
3035   GstClockTime timestamp, duration;
3036
3037   /* first sync on DTS, else use PTS */
3038   timestamp = GST_BUFFER_DTS (buffer);
3039   if (!GST_CLOCK_TIME_IS_VALID (timestamp))
3040     timestamp = GST_BUFFER_PTS (buffer);
3041
3042   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3043     /* get duration to calculate end time */
3044     duration = GST_BUFFER_DURATION (buffer);
3045     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3046       *end = timestamp + duration;
3047     }
3048     *start = timestamp;
3049   }
3050 }
3051
3052 /* must be called with PREROLL_LOCK */
3053 static gboolean
3054 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3055 {
3056   gboolean is_prerolled, res;
3057
3058   /* we have 2 cases where the PREROLL_LOCK is released:
3059    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3060    *  2) we are syncing on the clock
3061    */
3062   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3063   res = !is_prerolled;
3064
3065   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3066       basesink->have_preroll, basesink->priv->received_eos, res);
3067
3068   return res;
3069 }
3070
3071 /* with STREAM_LOCK, PREROLL_LOCK
3072  *
3073  * Takes a buffer and compare the timestamps with the last segment.
3074  * If the buffer falls outside of the segment boundaries, drop it.
3075  * Else send the buffer for preroll and rendering.
3076  *
3077  * This function takes ownership of the buffer.
3078  */
3079 static GstFlowReturn
3080 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3081     gpointer obj)
3082 {
3083   GstBaseSinkClass *bclass;
3084   GstBaseSinkPrivate *priv = basesink->priv;
3085   GstFlowReturn ret;
3086   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3087   GstSegment *segment;
3088   GstBuffer *sync_buf;
3089   gint do_qos;
3090   gboolean late, step_end;
3091
3092   if (G_UNLIKELY (basesink->flushing))
3093     goto flushing;
3094
3095   if (G_UNLIKELY (priv->received_eos))
3096     goto was_eos;
3097
3098   if (GST_IS_BUFFER_LIST (obj)) {
3099     sync_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
3100     g_assert (NULL != sync_buf);
3101   } else {
3102     sync_buf = GST_BUFFER_CAST (obj);
3103   }
3104
3105   /* for code clarity */
3106   segment = &basesink->segment;
3107
3108   if (G_UNLIKELY (!basesink->have_newsegment)) {
3109     gboolean sync;
3110
3111     sync = gst_base_sink_get_sync (basesink);
3112     if (sync) {
3113       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3114           (_("Internal data flow problem.")),
3115           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3116     }
3117
3118     /* this means this sink will assume timestamps start from 0 */
3119     GST_OBJECT_LOCK (basesink);
3120     segment->start = 0;
3121     segment->stop = -1;
3122     basesink->segment.start = 0;
3123     basesink->segment.stop = -1;
3124     basesink->have_newsegment = TRUE;
3125     GST_OBJECT_UNLOCK (basesink);
3126   }
3127
3128   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3129
3130   /* check if the buffer needs to be dropped, we first ask the subclass for the
3131    * start and end */
3132   if (bclass->get_times)
3133     bclass->get_times (basesink, sync_buf, &start, &end);
3134
3135   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3136     /* if the subclass does not want sync, we use our own values so that we at
3137      * least clip the buffer to the segment */
3138     gst_base_sink_default_get_times (basesink, sync_buf, &start, &end);
3139   }
3140
3141   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3142       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3143
3144   /* a dropped buffer does not participate in anything */
3145   if (GST_CLOCK_TIME_IS_VALID (start) && (segment->format == GST_FORMAT_TIME)) {
3146     if (G_UNLIKELY (!gst_segment_clip (segment,
3147                 GST_FORMAT_TIME, start, end, NULL, NULL)))
3148       goto out_of_segment;
3149   }
3150
3151 again:
3152   late = FALSE;
3153   step_end = FALSE;
3154
3155   /* synchronize this object, non syncable objects return OK
3156    * immediately. */
3157   ret = gst_base_sink_do_sync (basesink, GST_MINI_OBJECT_CAST (sync_buf),
3158       &late, &step_end);
3159   if (G_UNLIKELY (ret != GST_FLOW_OK))
3160     goto sync_failed;
3161
3162   /* drop late buffers unconditionally, let's hope it's unlikely */
3163   if (G_UNLIKELY (late))
3164     goto dropped;
3165
3166   /* read once, to get same value before and after */
3167   do_qos = g_atomic_int_get (&priv->qos_enabled);
3168
3169   GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
3170
3171   /* record rendering time for QoS and stats */
3172   if (do_qos)
3173     gst_base_sink_do_render_stats (basesink, TRUE);
3174
3175   if (!GST_IS_BUFFER_LIST (obj)) {
3176     /* For buffer lists do not set last buffer for now. */
3177     gst_base_sink_set_last_buffer (basesink, GST_BUFFER_CAST (obj));
3178
3179     if (bclass->render)
3180       ret = bclass->render (basesink, GST_BUFFER_CAST (obj));
3181   } else {
3182     if (bclass->render_list)
3183       ret = bclass->render_list (basesink, GST_BUFFER_LIST_CAST (obj));
3184   }
3185
3186   if (do_qos)
3187     gst_base_sink_do_render_stats (basesink, FALSE);
3188
3189   if (ret == GST_FLOW_STEP)
3190     goto again;
3191
3192   if (G_UNLIKELY (basesink->flushing))
3193     goto flushing;
3194
3195   priv->rendered++;
3196
3197 done:
3198   if (step_end) {
3199     /* the step ended, check if we need to activate a new step */
3200     GST_DEBUG_OBJECT (basesink, "step ended");
3201     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3202         priv->current_rstart, priv->current_rstop, basesink->eos);
3203     goto again;
3204   }
3205
3206   gst_base_sink_perform_qos (basesink, late);
3207
3208   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3209   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3210
3211   return ret;
3212
3213   /* ERRORS */
3214 flushing:
3215   {
3216     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3217     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3218     return GST_FLOW_WRONG_STATE;
3219   }
3220 was_eos:
3221   {
3222     GST_DEBUG_OBJECT (basesink, "we are EOS, dropping object, return EOS");
3223     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3224     return GST_FLOW_EOS;
3225   }
3226 out_of_segment:
3227   {
3228     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3229     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3230     return GST_FLOW_OK;
3231   }
3232 sync_failed:
3233   {
3234     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3235     goto done;
3236   }
3237 dropped:
3238   {
3239     priv->dropped++;
3240     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3241
3242     if (g_atomic_int_get (&priv->qos_enabled)) {
3243       GstMessage *qos_msg;
3244       GstClockTime timestamp, duration;
3245
3246       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_buf));
3247       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_buf));
3248
3249       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3250           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3251           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3252           GST_TIME_ARGS (priv->current_rstart),
3253           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3254           GST_TIME_ARGS (duration));
3255       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3256           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3257           priv->rendered, priv->dropped);
3258
3259       qos_msg =
3260           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3261           priv->current_rstart, priv->current_sstart, timestamp, duration);
3262       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3263           1000000);
3264       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3265           priv->dropped);
3266       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3267     }
3268     goto done;
3269   }
3270 }
3271
3272 /* with STREAM_LOCK
3273  */
3274 static GstFlowReturn
3275 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad, gpointer obj)
3276 {
3277   GstFlowReturn result;
3278
3279   if (G_UNLIKELY (basesink->pad_mode != GST_PAD_MODE_PUSH))
3280     goto wrong_mode;
3281
3282   GST_BASE_SINK_PREROLL_LOCK (basesink);
3283   result = gst_base_sink_chain_unlocked (basesink, pad, obj);
3284   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3285
3286 done:
3287   return result;
3288
3289   /* ERRORS */
3290 wrong_mode:
3291   {
3292     GST_OBJECT_LOCK (pad);
3293     GST_WARNING_OBJECT (basesink,
3294         "Push on pad %s:%s, but it was not activated in push mode",
3295         GST_DEBUG_PAD_NAME (pad));
3296     GST_OBJECT_UNLOCK (pad);
3297     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3298     /* we don't post an error message this will signal to the peer
3299      * pushing that EOS is reached. */
3300     result = GST_FLOW_EOS;
3301     goto done;
3302   }
3303 }
3304
3305 static GstFlowReturn
3306 gst_base_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
3307 {
3308   GstBaseSink *basesink;
3309
3310   basesink = GST_BASE_SINK (parent);
3311
3312   return gst_base_sink_chain_main (basesink, pad, buf);
3313 }
3314
3315 static GstFlowReturn
3316 gst_base_sink_chain_list (GstPad * pad, GstObject * parent,
3317     GstBufferList * list)
3318 {
3319   GstBaseSink *basesink;
3320   GstBaseSinkClass *bclass;
3321   GstFlowReturn result;
3322
3323   basesink = GST_BASE_SINK (parent);
3324   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3325
3326   if (G_LIKELY (bclass->render_list)) {
3327     result = gst_base_sink_chain_main (basesink, pad, list);
3328   } else {
3329     guint i, len;
3330     GstBuffer *buffer;
3331
3332     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3333
3334     len = gst_buffer_list_length (list);
3335
3336     result = GST_FLOW_OK;
3337     for (i = 0; i < len; i++) {
3338       buffer = gst_buffer_list_get (list, 0);
3339       result = gst_base_sink_chain_main (basesink, pad,
3340           gst_buffer_ref (buffer));
3341       if (result != GST_FLOW_OK)
3342         break;
3343     }
3344     gst_buffer_list_unref (list);
3345   }
3346   return result;
3347 }
3348
3349
3350 static gboolean
3351 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3352 {
3353   gboolean res = TRUE;
3354
3355   /* update our offset if the start/stop position was updated */
3356   if (segment->format == GST_FORMAT_BYTES) {
3357     segment->time = segment->start;
3358   } else if (segment->start == 0) {
3359     /* seek to start, we can implement a default for this. */
3360     segment->time = 0;
3361   } else {
3362     res = FALSE;
3363     GST_INFO_OBJECT (sink, "Can't do a default seek");
3364   }
3365
3366   return res;
3367 }
3368
3369 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3370
3371 static gboolean
3372 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3373     GstEvent * event, GstSegment * segment)
3374 {
3375   /* By default, we try one of 2 things:
3376    *   - For absolute seek positions, convert the requested position to our
3377    *     configured processing format and place it in the output segment \
3378    *   - For relative seek positions, convert our current (input) values to the
3379    *     seek format, adjust by the relative seek offset and then convert back to
3380    *     the processing format
3381    */
3382   GstSeekType cur_type, stop_type;
3383   gint64 cur, stop;
3384   GstSeekFlags flags;
3385   GstFormat seek_format;
3386   gdouble rate;
3387   gboolean update;
3388   gboolean res = TRUE;
3389
3390   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3391       &cur_type, &cur, &stop_type, &stop);
3392
3393   if (seek_format == segment->format) {
3394     gst_segment_do_seek (segment, rate, seek_format, flags,
3395         cur_type, cur, stop_type, stop, &update);
3396     return TRUE;
3397   }
3398
3399   if (cur_type != GST_SEEK_TYPE_NONE) {
3400     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3401     res =
3402         gst_pad_query_convert (sink->sinkpad, seek_format, cur, segment->format,
3403         &cur);
3404     cur_type = GST_SEEK_TYPE_SET;
3405   }
3406
3407   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3408     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3409     res =
3410         gst_pad_query_convert (sink->sinkpad, seek_format, stop,
3411         segment->format, &stop);
3412     stop_type = GST_SEEK_TYPE_SET;
3413   }
3414
3415   /* And finally, configure our output segment in the desired format */
3416   gst_segment_do_seek (segment, rate, segment->format, flags, cur_type, cur,
3417       stop_type, stop, &update);
3418
3419   if (!res)
3420     goto no_format;
3421
3422   return res;
3423
3424 no_format:
3425   {
3426     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3427     return FALSE;
3428   }
3429 }
3430
3431 /* perform a seek, only executed in pull mode */
3432 static gboolean
3433 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3434 {
3435   gboolean flush;
3436   gdouble rate;
3437   GstFormat seek_format, dest_format;
3438   GstSeekFlags flags;
3439   GstSeekType cur_type, stop_type;
3440   gboolean seekseg_configured = FALSE;
3441   gint64 cur, stop;
3442   gboolean update, res = TRUE;
3443   GstSegment seeksegment;
3444
3445   dest_format = sink->segment.format;
3446
3447   if (event) {
3448     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3449     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3450         &cur_type, &cur, &stop_type, &stop);
3451
3452     flush = flags & GST_SEEK_FLAG_FLUSH;
3453   } else {
3454     GST_DEBUG_OBJECT (sink, "performing seek without event");
3455     flush = FALSE;
3456   }
3457
3458   if (flush) {
3459     GST_DEBUG_OBJECT (sink, "flushing upstream");
3460     gst_pad_push_event (pad, gst_event_new_flush_start ());
3461     gst_base_sink_flush_start (sink, pad);
3462   } else {
3463     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3464   }
3465
3466   GST_PAD_STREAM_LOCK (pad);
3467
3468   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3469    * copy the current segment info into the temp segment that we can actually
3470    * attempt the seek with. We only update the real segment if the seek succeeds. */
3471   if (!seekseg_configured) {
3472     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3473
3474     /* now configure the final seek segment */
3475     if (event) {
3476       if (sink->segment.format != seek_format) {
3477         /* OK, here's where we give the subclass a chance to convert the relative
3478          * seek into an absolute one in the processing format. We set up any
3479          * absolute seek above, before taking the stream lock. */
3480         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3481                 &seeksegment)) {
3482           GST_DEBUG_OBJECT (sink,
3483               "Preparing the seek failed after flushing. " "Aborting seek");
3484           res = FALSE;
3485         }
3486       } else {
3487         /* The seek format matches our processing format, no need to ask the
3488          * the subclass to configure the segment. */
3489         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
3490             cur_type, cur, stop_type, stop, &update);
3491       }
3492     }
3493     /* Else, no seek event passed, so we're just (re)starting the
3494        current segment. */
3495   }
3496
3497   if (res) {
3498     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3499         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3500         seeksegment.start, seeksegment.stop, seeksegment.position);
3501
3502     /* do the seek, segment.position contains the new position. */
3503     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3504   }
3505
3506
3507   if (flush) {
3508     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3509     gst_pad_push_event (pad, gst_event_new_flush_stop (TRUE));
3510     gst_base_sink_flush_stop (sink, pad, TRUE);
3511   } else if (res && sink->running) {
3512     /* we are running the current segment and doing a non-flushing seek,
3513      * close the segment first based on the position. */
3514     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3515         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.position);
3516   }
3517
3518   /* The subclass must have converted the segment to the processing format
3519    * by now */
3520   if (res && seeksegment.format != dest_format) {
3521     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3522         "in the correct format. Aborting seek.");
3523     res = FALSE;
3524   }
3525
3526   /* if successful seek, we update our real segment and push
3527    * out the new segment. */
3528   if (res) {
3529     gst_segment_copy_into (&seeksegment, &sink->segment);
3530
3531     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3532       gst_element_post_message (GST_ELEMENT (sink),
3533           gst_message_new_segment_start (GST_OBJECT (sink),
3534               sink->segment.format, sink->segment.position));
3535     }
3536   }
3537
3538   sink->priv->discont = TRUE;
3539   sink->running = TRUE;
3540
3541   GST_PAD_STREAM_UNLOCK (pad);
3542
3543   return res;
3544 }
3545
3546 static void
3547 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3548     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3549     gboolean flush, gboolean intermediate)
3550 {
3551   GST_OBJECT_LOCK (sink);
3552   pending->seqnum = seqnum;
3553   pending->format = format;
3554   pending->amount = amount;
3555   pending->position = 0;
3556   pending->rate = rate;
3557   pending->flush = flush;
3558   pending->intermediate = intermediate;
3559   pending->valid = TRUE;
3560   /* flush invalidates the current stepping segment */
3561   if (flush)
3562     current->valid = FALSE;
3563   GST_OBJECT_UNLOCK (sink);
3564 }
3565
3566 static gboolean
3567 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3568 {
3569   GstBaseSinkPrivate *priv;
3570   GstBaseSinkClass *bclass;
3571   gboolean flush, intermediate;
3572   gdouble rate;
3573   GstFormat format;
3574   guint64 amount;
3575   guint seqnum;
3576   GstStepInfo *pending, *current;
3577   GstMessage *message;
3578
3579   bclass = GST_BASE_SINK_GET_CLASS (sink);
3580   priv = sink->priv;
3581
3582   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3583
3584   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3585   seqnum = gst_event_get_seqnum (event);
3586
3587   pending = &priv->pending_step;
3588   current = &priv->current_step;
3589
3590   /* post message first */
3591   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3592       amount, rate, flush, intermediate);
3593   gst_message_set_seqnum (message, seqnum);
3594   gst_element_post_message (GST_ELEMENT (sink), message);
3595
3596   if (flush) {
3597     /* we need to call ::unlock before locking PREROLL_LOCK
3598      * since we lock it before going into ::render */
3599     if (bclass->unlock)
3600       bclass->unlock (sink);
3601
3602     GST_BASE_SINK_PREROLL_LOCK (sink);
3603     /* now that we have the PREROLL lock, clear our unlock request */
3604     if (bclass->unlock_stop)
3605       bclass->unlock_stop (sink);
3606
3607     /* update the stepinfo and make it valid */
3608     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3609         intermediate);
3610
3611     if (sink->priv->async_enabled) {
3612       /* and we need to commit our state again on the next
3613        * prerolled buffer */
3614       sink->playing_async = TRUE;
3615       priv->pending_step.need_preroll = TRUE;
3616       sink->need_preroll = FALSE;
3617       gst_element_lost_state (GST_ELEMENT_CAST (sink));
3618     } else {
3619       sink->priv->have_latency = TRUE;
3620       sink->need_preroll = FALSE;
3621     }
3622     priv->current_sstart = GST_CLOCK_TIME_NONE;
3623     priv->current_sstop = GST_CLOCK_TIME_NONE;
3624     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3625     priv->call_preroll = TRUE;
3626     gst_base_sink_set_last_buffer (sink, NULL);
3627     gst_base_sink_reset_qos (sink);
3628
3629     if (sink->clock_id) {
3630       gst_clock_id_unschedule (sink->clock_id);
3631     }
3632
3633     if (sink->have_preroll) {
3634       GST_DEBUG_OBJECT (sink, "signal waiter");
3635       priv->step_unlock = TRUE;
3636       GST_BASE_SINK_PREROLL_SIGNAL (sink);
3637     }
3638     GST_BASE_SINK_PREROLL_UNLOCK (sink);
3639   } else {
3640     /* update the stepinfo and make it valid */
3641     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3642         intermediate);
3643   }
3644
3645   return TRUE;
3646 }
3647
3648 /* with STREAM_LOCK
3649  */
3650 static void
3651 gst_base_sink_loop (GstPad * pad)
3652 {
3653   GstObject *parent;
3654   GstBaseSink *basesink;
3655   GstBuffer *buf = NULL;
3656   GstFlowReturn result;
3657   guint blocksize;
3658   guint64 offset;
3659
3660   parent = GST_OBJECT_PARENT (pad);
3661   basesink = GST_BASE_SINK (parent);
3662
3663   g_assert (basesink->pad_mode == GST_PAD_MODE_PULL);
3664
3665   if ((blocksize = basesink->priv->blocksize) == 0)
3666     blocksize = -1;
3667
3668   offset = basesink->segment.position;
3669
3670   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3671       offset, blocksize);
3672
3673   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3674   if (G_UNLIKELY (result != GST_FLOW_OK))
3675     goto paused;
3676
3677   if (G_UNLIKELY (buf == NULL))
3678     goto no_buffer;
3679
3680   offset += gst_buffer_get_size (buf);
3681
3682   basesink->segment.position = offset;
3683
3684   GST_BASE_SINK_PREROLL_LOCK (basesink);
3685   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3686   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3687   if (G_UNLIKELY (result != GST_FLOW_OK))
3688     goto paused;
3689
3690   return;
3691
3692   /* ERRORS */
3693 paused:
3694   {
3695     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3696         gst_flow_get_name (result));
3697     gst_pad_pause_task (pad);
3698     if (result == GST_FLOW_EOS) {
3699       /* perform EOS logic */
3700       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3701         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3702             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3703                 basesink->segment.format, basesink->segment.position));
3704       } else {
3705         gst_base_sink_event (pad, parent, gst_event_new_eos ());
3706       }
3707     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_EOS) {
3708       /* for fatal errors we post an error message, post the error
3709        * first so the app knows about the error first. 
3710        * wrong-state is not a fatal error because it happens due to
3711        * flushing and posting an error message in that case is the
3712        * wrong thing to do, e.g. when basesrc is doing a flushing
3713        * seek. */
3714       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3715           (_("Internal data stream error.")),
3716           ("stream stopped, reason %s", gst_flow_get_name (result)));
3717       gst_base_sink_event (pad, parent, gst_event_new_eos ());
3718     }
3719     return;
3720   }
3721 no_buffer:
3722   {
3723     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3724     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3725         (_("Internal data flow error.")), ("element returned NULL buffer"));
3726     result = GST_FLOW_ERROR;
3727     goto paused;
3728   }
3729 }
3730
3731 static gboolean
3732 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3733     gboolean flushing)
3734 {
3735   GstBaseSinkClass *bclass;
3736
3737   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3738
3739   if (flushing) {
3740     /* unlock any subclasses, we need to do this before grabbing the
3741      * PREROLL_LOCK since we hold this lock before going into ::render. */
3742     if (bclass->unlock)
3743       bclass->unlock (basesink);
3744   }
3745
3746   GST_BASE_SINK_PREROLL_LOCK (basesink);
3747   basesink->flushing = flushing;
3748   if (flushing) {
3749     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3750     if (bclass->unlock_stop)
3751       bclass->unlock_stop (basesink);
3752
3753     /* set need_preroll before we unblock the clock. If the clock is unblocked
3754      * before timing out, we can reuse the buffer for preroll. */
3755     basesink->need_preroll = TRUE;
3756
3757     /* step 2, unblock clock sync (if any) or any other blocking thing */
3758     if (basesink->clock_id) {
3759       gst_clock_id_unschedule (basesink->clock_id);
3760     }
3761
3762     /* flush out the data thread if it's locked in finish_preroll, this will
3763      * also flush out the EOS state */
3764     GST_DEBUG_OBJECT (basesink,
3765         "flushing out data thread, need preroll to TRUE");
3766
3767     /* we can't have EOS anymore now */
3768     basesink->eos = FALSE;
3769     basesink->priv->received_eos = FALSE;
3770     basesink->have_preroll = FALSE;
3771     basesink->priv->step_unlock = FALSE;
3772     /* can't report latency anymore until we preroll again */
3773     if (basesink->priv->async_enabled) {
3774       GST_OBJECT_LOCK (basesink);
3775       basesink->priv->have_latency = FALSE;
3776       GST_OBJECT_UNLOCK (basesink);
3777     }
3778     /* and signal any waiters now */
3779     GST_BASE_SINK_PREROLL_SIGNAL (basesink);
3780   }
3781   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3782
3783   return TRUE;
3784 }
3785
3786 static gboolean
3787 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3788 {
3789   gboolean result;
3790
3791   if (active) {
3792     /* start task */
3793     result = gst_pad_start_task (basesink->sinkpad,
3794         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3795   } else {
3796     /* step 2, make sure streaming finishes */
3797     result = gst_pad_stop_task (basesink->sinkpad);
3798   }
3799
3800   return result;
3801 }
3802
3803 static gboolean
3804 gst_base_sink_pad_activate (GstPad * pad, GstObject * parent)
3805 {
3806   gboolean result = FALSE;
3807   GstBaseSink *basesink;
3808   GstQuery *query;
3809   gboolean pull_mode;
3810
3811   basesink = GST_BASE_SINK (parent);
3812
3813   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3814
3815   gst_base_sink_set_flushing (basesink, pad, FALSE);
3816
3817   /* we need to have the pull mode enabled */
3818   if (!basesink->can_activate_pull) {
3819     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3820     goto fallback;
3821   }
3822
3823   /* check if downstreams supports pull mode at all */
3824   query = gst_query_new_scheduling ();
3825
3826   if (!gst_pad_peer_query (pad, query)) {
3827     gst_query_unref (query);
3828     GST_DEBUG_OBJECT (basesink, "peer query faild, no pull mode");
3829     goto fallback;
3830   }
3831
3832   /* parse result of the query */
3833   pull_mode = gst_query_has_scheduling_mode (query, GST_PAD_MODE_PULL);
3834   gst_query_unref (query);
3835
3836   if (!pull_mode) {
3837     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3838     goto fallback;
3839   }
3840
3841   /* set the pad mode before starting the task so that it's in the
3842    * correct state for the new thread. also the sink set_caps and get_caps
3843    * function checks this */
3844   basesink->pad_mode = GST_PAD_MODE_PULL;
3845
3846   /* we first try to negotiate a format so that when we try to activate
3847    * downstream, it knows about our format */
3848   if (!gst_base_sink_negotiate_pull (basesink)) {
3849     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3850     goto fallback;
3851   }
3852
3853   /* ok activate now */
3854   if (!gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE)) {
3855     /* clear any pending caps */
3856     GST_OBJECT_LOCK (basesink);
3857     gst_caps_replace (&basesink->priv->caps, NULL);
3858     GST_OBJECT_UNLOCK (basesink);
3859     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3860     goto fallback;
3861   }
3862
3863   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3864   result = TRUE;
3865   goto done;
3866
3867   /* push mode fallback */
3868 fallback:
3869   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3870   if ((result = gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE))) {
3871     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3872   }
3873
3874 done:
3875   if (!result) {
3876     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3877     gst_base_sink_set_flushing (basesink, pad, TRUE);
3878   }
3879
3880   return result;
3881 }
3882
3883 static gboolean
3884 gst_base_sink_pad_activate_push (GstPad * pad, GstObject * parent,
3885     gboolean active)
3886 {
3887   gboolean result;
3888   GstBaseSink *basesink;
3889
3890   basesink = GST_BASE_SINK (parent);
3891
3892   if (active) {
3893     if (!basesink->can_activate_push) {
3894       result = FALSE;
3895       basesink->pad_mode = GST_PAD_MODE_NONE;
3896     } else {
3897       result = TRUE;
3898       basesink->pad_mode = GST_PAD_MODE_PUSH;
3899     }
3900   } else {
3901     if (G_UNLIKELY (basesink->pad_mode != GST_PAD_MODE_PUSH)) {
3902       g_warning ("Internal GStreamer activation error!!!");
3903       result = FALSE;
3904     } else {
3905       gst_base_sink_set_flushing (basesink, pad, TRUE);
3906       result = TRUE;
3907       basesink->pad_mode = GST_PAD_MODE_NONE;
3908     }
3909   }
3910
3911   return result;
3912 }
3913
3914 static gboolean
3915 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3916 {
3917   GstCaps *caps;
3918   gboolean result;
3919
3920   result = FALSE;
3921
3922   /* this returns the intersection between our caps and the peer caps. If there
3923    * is no peer, it returns NULL and we can't operate in pull mode so we can
3924    * fail the negotiation. */
3925   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3926   if (caps == NULL || gst_caps_is_empty (caps))
3927     goto no_caps_possible;
3928
3929   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3930
3931   if (gst_caps_is_any (caps)) {
3932     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3933         "allowing pull()");
3934     /* neither side has template caps in this case, so they are prepared for
3935        pull() without setcaps() */
3936     result = TRUE;
3937   } else {
3938     caps = gst_caps_make_writable (caps);
3939     /* try to fixate */
3940     gst_base_sink_fixate (basesink, caps);
3941     GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3942
3943     if (gst_caps_is_fixed (caps)) {
3944       if (!gst_pad_send_event (GST_BASE_SINK_PAD (basesink),
3945               gst_event_new_caps (caps)))
3946         goto could_not_set_caps;
3947
3948       result = TRUE;
3949     }
3950   }
3951
3952   gst_caps_unref (caps);
3953
3954   return result;
3955
3956 no_caps_possible:
3957   {
3958     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3959     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3960     if (caps)
3961       gst_caps_unref (caps);
3962     return FALSE;
3963   }
3964 could_not_set_caps:
3965   {
3966     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3967     gst_caps_unref (caps);
3968     return FALSE;
3969   }
3970 }
3971
3972 /* this won't get called until we implement an activate function */
3973 static gboolean
3974 gst_base_sink_pad_activate_pull (GstPad * pad, GstObject * parent,
3975     gboolean active)
3976 {
3977   gboolean result = FALSE;
3978   GstBaseSink *basesink;
3979   GstBaseSinkClass *bclass;
3980
3981   basesink = GST_BASE_SINK (parent);
3982   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3983
3984   if (active) {
3985     gint64 duration;
3986
3987     /* we mark we have a newsegment here because pull based
3988      * mode works just fine without having a newsegment before the
3989      * first buffer */
3990     gst_segment_init (&basesink->segment, GST_FORMAT_BYTES);
3991     GST_OBJECT_LOCK (basesink);
3992     basesink->have_newsegment = TRUE;
3993     GST_OBJECT_UNLOCK (basesink);
3994
3995     /* get the peer duration in bytes */
3996     result = gst_pad_peer_query_duration (pad, GST_FORMAT_BYTES, &duration);
3997     if (result) {
3998       GST_DEBUG_OBJECT (basesink,
3999           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4000       basesink->segment.duration = duration;
4001     } else {
4002       GST_DEBUG_OBJECT (basesink, "unknown duration");
4003     }
4004
4005     if (bclass->activate_pull)
4006       result = bclass->activate_pull (basesink, TRUE);
4007     else
4008       result = FALSE;
4009
4010     if (!result)
4011       goto activate_failed;
4012
4013   } else {
4014     if (G_UNLIKELY (basesink->pad_mode != GST_PAD_MODE_PULL)) {
4015       g_warning ("Internal GStreamer activation error!!!");
4016       result = FALSE;
4017     } else {
4018       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4019       if (bclass->activate_pull)
4020         result &= bclass->activate_pull (basesink, FALSE);
4021       basesink->pad_mode = GST_PAD_MODE_NONE;
4022     }
4023   }
4024
4025   return result;
4026
4027   /* ERRORS */
4028 activate_failed:
4029   {
4030     /* reset, as starting the thread failed */
4031     basesink->pad_mode = GST_PAD_MODE_NONE;
4032
4033     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4034     return FALSE;
4035   }
4036 }
4037
4038 static gboolean
4039 gst_base_sink_pad_activate_mode (GstPad * pad, GstObject * parent,
4040     GstPadMode mode, gboolean active)
4041 {
4042   gboolean res;
4043
4044   switch (mode) {
4045     case GST_PAD_MODE_PULL:
4046       res = gst_base_sink_pad_activate_pull (pad, parent, active);
4047       break;
4048     case GST_PAD_MODE_PUSH:
4049       res = gst_base_sink_pad_activate_push (pad, parent, active);
4050       break;
4051     default:
4052       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
4053       res = FALSE;
4054       break;
4055   }
4056   return res;
4057 }
4058
4059 /* send an event to our sinkpad peer. */
4060 static gboolean
4061 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4062 {
4063   GstPad *pad;
4064   GstBaseSink *basesink = GST_BASE_SINK (element);
4065   gboolean forward, result = TRUE;
4066   GstPadMode mode;
4067
4068   GST_OBJECT_LOCK (element);
4069   /* get the pad and the scheduling mode */
4070   pad = gst_object_ref (basesink->sinkpad);
4071   mode = basesink->pad_mode;
4072   GST_OBJECT_UNLOCK (element);
4073
4074   /* only push UPSTREAM events upstream */
4075   forward = GST_EVENT_IS_UPSTREAM (event);
4076
4077   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4078       event);
4079
4080   switch (GST_EVENT_TYPE (event)) {
4081     case GST_EVENT_LATENCY:
4082     {
4083       GstClockTime latency;
4084
4085       gst_event_parse_latency (event, &latency);
4086
4087       /* store the latency. We use this to adjust the running_time before syncing
4088        * it to the clock. */
4089       GST_OBJECT_LOCK (element);
4090       basesink->priv->latency = latency;
4091       if (!basesink->priv->have_latency)
4092         forward = FALSE;
4093       GST_OBJECT_UNLOCK (element);
4094       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4095           GST_TIME_ARGS (latency));
4096
4097       /* We forward this event so that all elements know about the global pipeline
4098        * latency. This is interesting for an element when it wants to figure out
4099        * when a particular piece of data will be rendered. */
4100       break;
4101     }
4102     case GST_EVENT_SEEK:
4103       /* in pull mode we will execute the seek */
4104       if (mode == GST_PAD_MODE_PULL)
4105         result = gst_base_sink_perform_seek (basesink, pad, event);
4106       break;
4107     case GST_EVENT_STEP:
4108       result = gst_base_sink_perform_step (basesink, pad, event);
4109       forward = FALSE;
4110       break;
4111     default:
4112       break;
4113   }
4114
4115   if (forward) {
4116     result = gst_pad_push_event (pad, event);
4117   } else {
4118     /* not forwarded, unref the event */
4119     gst_event_unref (event);
4120   }
4121
4122   gst_object_unref (pad);
4123
4124   GST_DEBUG_OBJECT (basesink, "handled event %p %" GST_PTR_FORMAT ": %d", event,
4125       event, result);
4126
4127   return result;
4128 }
4129
4130 static gboolean
4131 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4132     gint64 * cur, gboolean * upstream)
4133 {
4134   GstClock *clock = NULL;
4135   gboolean res = FALSE;
4136   GstFormat oformat;
4137   GstSegment *segment;
4138   GstClockTime now, latency;
4139   GstClockTimeDiff base_time;
4140   gint64 time, base, duration;
4141   gdouble rate;
4142   gint64 last;
4143   gboolean last_seen, with_clock, in_paused;
4144
4145   GST_OBJECT_LOCK (basesink);
4146   /* we can only get the segment when we are not NULL or READY */
4147   if (!basesink->have_newsegment)
4148     goto wrong_state;
4149
4150   in_paused = FALSE;
4151   /* when not in PLAYING or when we're busy with a state change, we
4152    * cannot read from the clock so we report time based on the
4153    * last seen timestamp. */
4154   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4155       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4156     in_paused = TRUE;
4157   }
4158
4159   segment = &basesink->segment;
4160
4161   /* get the format in the segment */
4162   oformat = segment->format;
4163
4164   /* report with last seen position when EOS */
4165   last_seen = basesink->eos;
4166
4167   /* assume we will use the clock for getting the current position */
4168   with_clock = TRUE;
4169   if (basesink->sync == FALSE)
4170     with_clock = FALSE;
4171
4172   /* and we need a clock */
4173   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4174     with_clock = FALSE;
4175   else
4176     gst_object_ref (clock);
4177
4178   /* mainloop might be querying position when going to playing async,
4179    * while (audio) rendering might be quickly advancing stream position,
4180    * so use clock asap rather than last reported position */
4181   if (in_paused && with_clock && g_atomic_int_get (&basesink->priv->to_playing)) {
4182     GST_DEBUG_OBJECT (basesink, "going to PLAYING, so not PAUSED");
4183     in_paused = FALSE;
4184   }
4185
4186   /* collect all data we need holding the lock */
4187   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4188     time = segment->time;
4189   else
4190     time = 0;
4191
4192   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4193     duration = segment->stop - segment->start;
4194   else
4195     duration = 0;
4196
4197   base = segment->base;
4198   rate = segment->rate * segment->applied_rate;
4199   latency = basesink->priv->latency;
4200
4201   if (oformat == GST_FORMAT_TIME) {
4202     gint64 start, stop;
4203
4204     start = basesink->priv->current_sstart;
4205     stop = basesink->priv->current_sstop;
4206
4207     if (in_paused) {
4208       /* in paused we use the last position as a lower bound */
4209       if (stop == -1 || segment->rate > 0.0)
4210         last = start;
4211       else
4212         last = stop;
4213     } else {
4214       /* in playing, use last stop time as upper bound */
4215       if (start == -1 || segment->rate > 0.0)
4216         last = stop;
4217       else
4218         last = start;
4219     }
4220   } else {
4221     /* convert last stop to stream time */
4222     last = gst_segment_to_stream_time (segment, oformat, segment->position);
4223   }
4224
4225   if (in_paused) {
4226     /* in paused, use start_time */
4227     base_time = GST_ELEMENT_START_TIME (basesink);
4228     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4229         GST_TIME_ARGS (base_time));
4230   } else if (with_clock) {
4231     /* else use clock when needed */
4232     base_time = GST_ELEMENT_CAST (basesink)->base_time;
4233     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4234         GST_TIME_ARGS (base_time));
4235   } else {
4236     /* else, no sync or clock -> no base time */
4237     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4238     base_time = -1;
4239   }
4240
4241   /* no base_time, we can't calculate running_time, use last seem timestamp to report
4242    * time */
4243   if (base_time == -1)
4244     last_seen = TRUE;
4245
4246   /* need to release the object lock before we can get the time,
4247    * a clock might take the LOCK of the provider, which could be
4248    * a basesink subclass. */
4249   GST_OBJECT_UNLOCK (basesink);
4250
4251   if (last_seen) {
4252     /* in EOS or when no valid stream_time, report the value of last seen
4253      * timestamp */
4254     if (last == -1) {
4255       /* no timestamp, we need to ask upstream */
4256       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4257       res = FALSE;
4258       *upstream = TRUE;
4259       goto done;
4260     }
4261     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4262         GST_TIME_ARGS (last));
4263     *cur = last;
4264   } else {
4265     if (oformat != GST_FORMAT_TIME) {
4266       /* convert base, time and duration to time */
4267       if (!gst_pad_query_convert (basesink->sinkpad, oformat, base,
4268               GST_FORMAT_TIME, &base))
4269         goto convert_failed;
4270       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4271               GST_FORMAT_TIME, &duration))
4272         goto convert_failed;
4273       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time,
4274               GST_FORMAT_TIME, &time))
4275         goto convert_failed;
4276       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last,
4277               GST_FORMAT_TIME, &last))
4278         goto convert_failed;
4279
4280       /* assume time format from now on */
4281       oformat = GST_FORMAT_TIME;
4282     }
4283
4284     if (!in_paused && with_clock) {
4285       now = gst_clock_get_time (clock);
4286     } else {
4287       now = base_time;
4288       base_time = 0;
4289     }
4290
4291     /* subtract base time and base time from the clock time.
4292      * Make sure we don't go negative. This is the current time in
4293      * the segment which we need to scale with the combined
4294      * rate and applied rate. */
4295     base_time += base;
4296     base_time += latency;
4297     if (GST_CLOCK_DIFF (base_time, now) < 0)
4298       base_time = now;
4299
4300     /* for negative rates we need to count back from the segment
4301      * duration. */
4302     if (rate < 0.0)
4303       time += duration;
4304
4305     *cur = time + gst_guint64_to_gdouble (now - base_time) * rate;
4306
4307     if (in_paused) {
4308       /* never report less than segment values in paused */
4309       if (last != -1)
4310         *cur = MAX (last, *cur);
4311     } else {
4312       /* never report more than last seen position in playing */
4313       if (last != -1)
4314         *cur = MIN (last, *cur);
4315     }
4316
4317     GST_DEBUG_OBJECT (basesink,
4318         "now %" GST_TIME_FORMAT " - base_time %" GST_TIME_FORMAT " - base %"
4319         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4320         GST_TIME_ARGS (now), GST_TIME_ARGS (base_time), GST_TIME_ARGS (base),
4321         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4322   }
4323
4324   if (oformat != format) {
4325     /* convert to final format */
4326     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, format, cur))
4327       goto convert_failed;
4328   }
4329
4330   res = TRUE;
4331
4332 done:
4333   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4334       res, GST_TIME_ARGS (*cur));
4335
4336   if (clock)
4337     gst_object_unref (clock);
4338
4339   return res;
4340
4341   /* special cases */
4342 wrong_state:
4343   {
4344     /* in NULL or READY we always return FALSE and -1 */
4345     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4346     res = FALSE;
4347     *cur = -1;
4348     GST_OBJECT_UNLOCK (basesink);
4349     goto done;
4350   }
4351 convert_failed:
4352   {
4353     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4354     *upstream = TRUE;
4355     res = FALSE;
4356     goto done;
4357   }
4358 }
4359
4360 static gboolean
4361 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4362     gint64 * dur, gboolean * upstream)
4363 {
4364   gboolean res = FALSE;
4365
4366   if (basesink->pad_mode == GST_PAD_MODE_PULL) {
4367     gint64 uduration;
4368
4369     /* get the duration in bytes, in pull mode that's all we are sure to
4370      * know. We have to explicitly get this value from upstream instead of
4371      * using our cached value because it might change. Duration caching
4372      * should be done at a higher level. */
4373     res =
4374         gst_pad_peer_query_duration (basesink->sinkpad, GST_FORMAT_BYTES,
4375         &uduration);
4376     if (res) {
4377       basesink->segment.duration = uduration;
4378       if (format != GST_FORMAT_BYTES) {
4379         /* convert to the requested format */
4380         res =
4381             gst_pad_query_convert (basesink->sinkpad, GST_FORMAT_BYTES,
4382             uduration, format, dur);
4383       } else {
4384         *dur = uduration;
4385       }
4386     }
4387     *upstream = FALSE;
4388   } else {
4389     *upstream = TRUE;
4390   }
4391
4392   return res;
4393 }
4394
4395 static gboolean
4396 default_element_query (GstElement * element, GstQuery * query)
4397 {
4398   gboolean res = FALSE;
4399
4400   GstBaseSink *basesink = GST_BASE_SINK (element);
4401
4402   switch (GST_QUERY_TYPE (query)) {
4403     case GST_QUERY_POSITION:
4404     {
4405       gint64 cur = 0;
4406       GstFormat format;
4407       gboolean upstream = FALSE;
4408
4409       gst_query_parse_position (query, &format, NULL);
4410
4411       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4412           gst_format_get_name (format));
4413
4414       /* first try to get the position based on the clock */
4415       if ((res =
4416               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4417         gst_query_set_position (query, format, cur);
4418       } else if (upstream) {
4419         /* fallback to peer query */
4420         res = gst_pad_peer_query (basesink->sinkpad, query);
4421       }
4422       if (!res) {
4423         /* we can handle a few things if upstream failed */
4424         if (format == GST_FORMAT_PERCENT) {
4425           gint64 dur = 0;
4426
4427           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4428               &upstream);
4429           if (!res && upstream) {
4430             res =
4431                 gst_pad_peer_query_position (basesink->sinkpad, GST_FORMAT_TIME,
4432                 &cur);
4433           }
4434           if (res) {
4435             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4436                 &upstream);
4437             if (!res && upstream) {
4438               res =
4439                   gst_pad_peer_query_duration (basesink->sinkpad,
4440                   GST_FORMAT_TIME, &dur);
4441             }
4442           }
4443           if (res) {
4444             gint64 pos;
4445
4446             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4447                 dur);
4448             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4449           }
4450         }
4451       }
4452       break;
4453     }
4454     case GST_QUERY_DURATION:
4455     {
4456       gint64 dur = 0;
4457       GstFormat format;
4458       gboolean upstream = FALSE;
4459
4460       gst_query_parse_duration (query, &format, NULL);
4461
4462       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4463           gst_format_get_name (format));
4464
4465       if ((res =
4466               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4467         gst_query_set_duration (query, format, dur);
4468       } else if (upstream) {
4469         /* fallback to peer query */
4470         res = gst_pad_peer_query (basesink->sinkpad, query);
4471       }
4472       if (!res) {
4473         /* we can handle a few things if upstream failed */
4474         if (format == GST_FORMAT_PERCENT) {
4475           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4476               GST_FORMAT_PERCENT_MAX);
4477           res = TRUE;
4478         }
4479       }
4480       break;
4481     }
4482     case GST_QUERY_LATENCY:
4483     {
4484       gboolean live, us_live;
4485       GstClockTime min, max;
4486
4487       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4488                   &max))) {
4489         gst_query_set_latency (query, live, min, max);
4490       }
4491       break;
4492     }
4493     case GST_QUERY_JITTER:
4494       break;
4495     case GST_QUERY_RATE:
4496       /* gst_query_set_rate (query, basesink->segment_rate); */
4497       res = TRUE;
4498       break;
4499     case GST_QUERY_SEGMENT:
4500     {
4501       if (basesink->pad_mode == GST_PAD_MODE_PULL) {
4502         gst_query_set_segment (query, basesink->segment.rate,
4503             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4504         res = TRUE;
4505       } else {
4506         res = gst_pad_peer_query (basesink->sinkpad, query);
4507       }
4508       break;
4509     }
4510     case GST_QUERY_SEEKING:
4511     case GST_QUERY_CONVERT:
4512     case GST_QUERY_FORMATS:
4513     default:
4514       res = gst_pad_peer_query (basesink->sinkpad, query);
4515       break;
4516   }
4517   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4518       GST_QUERY_TYPE_NAME (query), res);
4519   return res;
4520 }
4521
4522
4523 static gboolean
4524 gst_base_sink_default_query (GstBaseSink * basesink, GstQuery * query)
4525 {
4526   gboolean res;
4527   GstBaseSinkClass *bclass;
4528
4529   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4530
4531   switch (GST_QUERY_TYPE (query)) {
4532     case GST_QUERY_ALLOCATION:
4533     {
4534       if (bclass->propose_allocation)
4535         res = bclass->propose_allocation (basesink, query);
4536       else
4537         res = FALSE;
4538       break;
4539     }
4540     case GST_QUERY_CAPS:
4541     {
4542       GstCaps *caps, *filter;
4543
4544       gst_query_parse_caps (query, &filter);
4545       caps = gst_base_sink_query_caps (basesink, basesink->sinkpad, filter);
4546       gst_query_set_caps_result (query, caps);
4547       gst_caps_unref (caps);
4548       res = TRUE;
4549       break;
4550     }
4551     default:
4552       res =
4553           gst_pad_query_default (basesink->sinkpad, GST_OBJECT_CAST (basesink),
4554           query);
4555       break;
4556   }
4557   return res;
4558 }
4559
4560 static gboolean
4561 gst_base_sink_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
4562 {
4563   GstBaseSink *basesink;
4564   GstBaseSinkClass *bclass;
4565   gboolean res;
4566
4567   basesink = GST_BASE_SINK_CAST (parent);
4568   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4569
4570   if (bclass->query)
4571     res = bclass->query (basesink, query);
4572   else
4573     res = FALSE;
4574
4575   return res;
4576 }
4577
4578 static GstStateChangeReturn
4579 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4580 {
4581   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4582   GstBaseSink *basesink = GST_BASE_SINK (element);
4583   GstBaseSinkClass *bclass;
4584   GstBaseSinkPrivate *priv;
4585
4586   priv = basesink->priv;
4587
4588   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4589
4590   switch (transition) {
4591     case GST_STATE_CHANGE_NULL_TO_READY:
4592       if (bclass->start)
4593         if (!bclass->start (basesink))
4594           goto start_failed;
4595       break;
4596     case GST_STATE_CHANGE_READY_TO_PAUSED:
4597       /* need to complete preroll before this state change completes, there
4598        * is no data flow in READY so we can safely assume we need to preroll. */
4599       GST_BASE_SINK_PREROLL_LOCK (basesink);
4600       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4601       basesink->have_newsegment = FALSE;
4602       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4603       basesink->offset = 0;
4604       basesink->have_preroll = FALSE;
4605       priv->step_unlock = FALSE;
4606       basesink->need_preroll = TRUE;
4607       basesink->playing_async = TRUE;
4608       basesink->priv->reset_time = FALSE;
4609       priv->current_sstart = GST_CLOCK_TIME_NONE;
4610       priv->current_sstop = GST_CLOCK_TIME_NONE;
4611       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4612       priv->latency = 0;
4613       basesink->eos = FALSE;
4614       priv->received_eos = FALSE;
4615       gst_base_sink_reset_qos (basesink);
4616       priv->commited = FALSE;
4617       priv->call_preroll = TRUE;
4618       priv->current_step.valid = FALSE;
4619       priv->pending_step.valid = FALSE;
4620       if (priv->async_enabled) {
4621         GST_DEBUG_OBJECT (basesink, "doing async state change");
4622         /* when async enabled, post async-start message and return ASYNC from
4623          * the state change function */
4624         ret = GST_STATE_CHANGE_ASYNC;
4625         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4626             gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
4627       } else {
4628         priv->have_latency = TRUE;
4629       }
4630       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4631       break;
4632     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4633       GST_BASE_SINK_PREROLL_LOCK (basesink);
4634       g_atomic_int_set (&basesink->priv->to_playing, TRUE);
4635       if (!gst_base_sink_needs_preroll (basesink)) {
4636         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4637         /* no preroll needed anymore now. */
4638         basesink->playing_async = FALSE;
4639         basesink->need_preroll = FALSE;
4640         if (basesink->eos) {
4641           GstMessage *message;
4642
4643           /* need to post EOS message here */
4644           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4645           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4646           gst_message_set_seqnum (message, basesink->priv->seqnum);
4647           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4648         } else {
4649           GST_DEBUG_OBJECT (basesink, "signal preroll");
4650           GST_BASE_SINK_PREROLL_SIGNAL (basesink);
4651         }
4652       } else {
4653         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4654         basesink->need_preroll = TRUE;
4655         basesink->playing_async = TRUE;
4656         priv->call_preroll = TRUE;
4657         priv->commited = FALSE;
4658         if (priv->async_enabled) {
4659           GST_DEBUG_OBJECT (basesink, "doing async state change");
4660           ret = GST_STATE_CHANGE_ASYNC;
4661           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4662               gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
4663         }
4664       }
4665       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4666       break;
4667     default:
4668       break;
4669   }
4670
4671   {
4672     GstStateChangeReturn bret;
4673
4674     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4675     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4676       goto activate_failed;
4677   }
4678
4679   switch (transition) {
4680     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4681       /* completed transition, so need not be marked any longer
4682        * And it should be unmarked, since e.g. losing our position upon flush
4683        * does not really change state to PAUSED ... */
4684       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
4685       break;
4686     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4687       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
4688       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4689       /* FIXME, make sure we cannot enter _render first */
4690
4691       /* we need to call ::unlock before locking PREROLL_LOCK
4692        * since we lock it before going into ::render */
4693       if (bclass->unlock)
4694         bclass->unlock (basesink);
4695
4696       GST_BASE_SINK_PREROLL_LOCK (basesink);
4697       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4698       /* now that we have the PREROLL lock, clear our unlock request */
4699       if (bclass->unlock_stop)
4700         bclass->unlock_stop (basesink);
4701
4702       /* we need preroll again and we set the flag before unlocking the clockid
4703        * because if the clockid is unlocked before a current buffer expired, we
4704        * can use that buffer to preroll with */
4705       basesink->need_preroll = TRUE;
4706
4707       if (basesink->clock_id) {
4708         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4709         gst_clock_id_unschedule (basesink->clock_id);
4710       }
4711
4712       /* if we don't have a preroll buffer we need to wait for a preroll and
4713        * return ASYNC. */
4714       if (!gst_base_sink_needs_preroll (basesink)) {
4715         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4716         basesink->playing_async = FALSE;
4717       } else {
4718         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4719           GST_DEBUG_OBJECT (basesink, "element is <= READY");
4720           ret = GST_STATE_CHANGE_SUCCESS;
4721         } else {
4722           GST_DEBUG_OBJECT (basesink,
4723               "PLAYING to PAUSED, we are not prerolled");
4724           basesink->playing_async = TRUE;
4725           priv->commited = FALSE;
4726           priv->call_preroll = TRUE;
4727           if (priv->async_enabled) {
4728             GST_DEBUG_OBJECT (basesink, "doing async state change");
4729             ret = GST_STATE_CHANGE_ASYNC;
4730             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4731                 gst_message_new_async_start (GST_OBJECT_CAST (basesink)));
4732           }
4733         }
4734       }
4735       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4736           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4737
4738       gst_base_sink_reset_qos (basesink);
4739       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4740       break;
4741     case GST_STATE_CHANGE_PAUSED_TO_READY:
4742       GST_BASE_SINK_PREROLL_LOCK (basesink);
4743       /* start by resetting our position state with the object lock so that the
4744        * position query gets the right idea. We do this before we post the
4745        * messages so that the message handlers pick this up. */
4746       GST_OBJECT_LOCK (basesink);
4747       basesink->have_newsegment = FALSE;
4748       priv->current_sstart = GST_CLOCK_TIME_NONE;
4749       priv->current_sstop = GST_CLOCK_TIME_NONE;
4750       priv->have_latency = FALSE;
4751       if (priv->cached_clock_id) {
4752         gst_clock_id_unref (priv->cached_clock_id);
4753         priv->cached_clock_id = NULL;
4754       }
4755       gst_caps_replace (&basesink->priv->caps, NULL);
4756       GST_OBJECT_UNLOCK (basesink);
4757
4758       gst_base_sink_set_last_buffer (basesink, NULL);
4759       priv->call_preroll = FALSE;
4760
4761       if (!priv->commited) {
4762         if (priv->async_enabled) {
4763           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4764
4765           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4766               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4767                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4768
4769           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4770               gst_message_new_async_done (GST_OBJECT_CAST (basesink), FALSE));
4771         }
4772         priv->commited = TRUE;
4773       } else {
4774         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4775       }
4776       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4777       break;
4778     case GST_STATE_CHANGE_READY_TO_NULL:
4779       if (bclass->stop) {
4780         if (!bclass->stop (basesink)) {
4781           GST_WARNING_OBJECT (basesink, "failed to stop");
4782         }
4783       }
4784       gst_base_sink_set_last_buffer (basesink, NULL);
4785       priv->call_preroll = FALSE;
4786       break;
4787     default:
4788       break;
4789   }
4790
4791   return ret;
4792
4793   /* ERRORS */
4794 start_failed:
4795   {
4796     GST_DEBUG_OBJECT (basesink, "failed to start");
4797     return GST_STATE_CHANGE_FAILURE;
4798   }
4799 activate_failed:
4800   {
4801     GST_DEBUG_OBJECT (basesink,
4802         "element failed to change states -- activation problem?");
4803     return GST_STATE_CHANGE_FAILURE;
4804   }
4805 }