basesink: add property to configure a throttle-time
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199
200   /* EOS sync time in running time */
201   GstClockTime eos_rtime;
202
203   /* last buffer that arrived in time, running time */
204   GstClockTime last_in_time;
205   /* when the last buffer left the sink, running time */
206   GstClockTime last_left;
207
208   /* running averages go here these are done on running time */
209   GstClockTime avg_pt;
210   GstClockTime avg_duration;
211   gdouble avg_rate;
212
213   /* these are done on system time. avg_jitter and avg_render are
214    * compared to eachother to see if the rendering time takes a
215    * huge amount of the processing, If so we are flooded with
216    * buffers. */
217   GstClockTime last_left_systime;
218   GstClockTime avg_jitter;
219   GstClockTime start, stop;
220   GstClockTime avg_render;
221
222   /* number of rendered and dropped frames */
223   guint64 rendered;
224   guint64 dropped;
225
226   /* latency stuff */
227   GstClockTime latency;
228
229   /* if we already commited the state */
230   gboolean commited;
231
232   /* when we received EOS */
233   gboolean received_eos;
234
235   /* when we are prerolled and able to report latency */
236   gboolean have_latency;
237
238   /* the last buffer we prerolled or rendered. Useful for making snapshots */
239   gint enable_last_buffer;      /* atomic */
240   GstBuffer *last_buffer;
241
242   /* caps for pull based scheduling */
243   GstCaps *pull_caps;
244
245   /* blocksize for pulling */
246   guint blocksize;
247
248   gboolean discont;
249
250   /* seqnum of the stream */
251   guint32 seqnum;
252
253   gboolean call_preroll;
254   gboolean step_unlock;
255
256   /* we have a pending and a current step operation */
257   GstStepInfo current_step;
258   GstStepInfo pending_step;
259
260   /* Cached GstClockID */
261   GstClockID cached_clock_id;
262
263   /* for throttling */
264   GstClockTime throttle_time;
265 };
266
267 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
268
269 /* generic running average, this has a neutral window size */
270 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
271
272 /* the windows for these running averages are experimentally obtained.
273  * possitive values get averaged more while negative values use a small
274  * window so we can react faster to badness. */
275 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
276 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
277
278 enum
279 {
280   _PR_IS_NOTHING = 1 << 0,
281   _PR_IS_BUFFER = 1 << 1,
282   _PR_IS_BUFFERLIST = 1 << 2,
283   _PR_IS_EVENT = 1 << 3
284 } PrivateObjectType;
285
286 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
287 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
288 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
289 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
290
291 /* BaseSink properties */
292
293 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
294 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
295
296 #define DEFAULT_PREROLL_QUEUE_LEN   0
297 #define DEFAULT_SYNC                TRUE
298 #define DEFAULT_MAX_LATENESS        -1
299 #define DEFAULT_QOS                 FALSE
300 #define DEFAULT_ASYNC               TRUE
301 #define DEFAULT_TS_OFFSET           0
302 #define DEFAULT_BLOCKSIZE           4096
303 #define DEFAULT_RENDER_DELAY        0
304 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
305 #define DEFAULT_THROTTLE_TIME       0
306
307 enum
308 {
309   PROP_0,
310   PROP_PREROLL_QUEUE_LEN,
311   PROP_SYNC,
312   PROP_MAX_LATENESS,
313   PROP_QOS,
314   PROP_ASYNC,
315   PROP_TS_OFFSET,
316   PROP_ENABLE_LAST_BUFFER,
317   PROP_LAST_BUFFER,
318   PROP_BLOCKSIZE,
319   PROP_RENDER_DELAY,
320   PROP_THROTTLE_TIME,
321   PROP_LAST
322 };
323
324 static GstElementClass *parent_class = NULL;
325
326 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
327 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
328 static void gst_base_sink_finalize (GObject * object);
329
330 GType
331 gst_base_sink_get_type (void)
332 {
333   static volatile gsize base_sink_type = 0;
334
335   if (g_once_init_enter (&base_sink_type)) {
336     GType _type;
337     static const GTypeInfo base_sink_info = {
338       sizeof (GstBaseSinkClass),
339       NULL,
340       NULL,
341       (GClassInitFunc) gst_base_sink_class_init,
342       NULL,
343       NULL,
344       sizeof (GstBaseSink),
345       0,
346       (GInstanceInitFunc) gst_base_sink_init,
347     };
348
349     _type = g_type_register_static (GST_TYPE_ELEMENT,
350         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
351     g_once_init_leave (&base_sink_type, _type);
352   }
353   return base_sink_type;
354 }
355
356 static void gst_base_sink_set_property (GObject * object, guint prop_id,
357     const GValue * value, GParamSpec * pspec);
358 static void gst_base_sink_get_property (GObject * object, guint prop_id,
359     GValue * value, GParamSpec * pspec);
360
361 static gboolean gst_base_sink_send_event (GstElement * element,
362     GstEvent * event);
363 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
364 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
365
366 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
367 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
368 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
369     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
370 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
371     GstClockTime * start, GstClockTime * end);
372 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
373     GstPad * pad, gboolean flushing);
374 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
375     gboolean active);
376 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
377     GstSegment * segment);
378 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
379     GstEvent * event, GstSegment * segment);
380
381 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
382     GstStateChange transition);
383
384 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
385 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
386     GstBufferList * list);
387
388 static void gst_base_sink_loop (GstPad * pad);
389 static gboolean gst_base_sink_pad_activate (GstPad * pad);
390 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
391 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
392 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
393
394 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
395 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
396 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
397 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
398 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
399     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
400
401
402 /* check if an object was too late */
403 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
404     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
405     GstClockReturn status, GstClockTimeDiff jitter);
406 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
407     guint8 obj_type, GstMiniObject * obj);
408
409 static void
410 gst_base_sink_class_init (GstBaseSinkClass * klass)
411 {
412   GObjectClass *gobject_class;
413   GstElementClass *gstelement_class;
414
415   gobject_class = G_OBJECT_CLASS (klass);
416   gstelement_class = GST_ELEMENT_CLASS (klass);
417
418   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
419       "basesink element");
420
421   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
422
423   parent_class = g_type_class_peek_parent (klass);
424
425   gobject_class->finalize = gst_base_sink_finalize;
426   gobject_class->set_property = gst_base_sink_set_property;
427   gobject_class->get_property = gst_base_sink_get_property;
428
429   /* FIXME, this next value should be configured using an event from the
430    * upstream element, ie, the BUFFER_SIZE event. */
431   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
432       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
433           "Number of buffers to queue during preroll", 0, G_MAXUINT,
434           DEFAULT_PREROLL_QUEUE_LEN,
435           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
436
437   g_object_class_install_property (gobject_class, PROP_SYNC,
438       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
442       g_param_spec_int64 ("max-lateness", "Max Lateness",
443           "Maximum number of nanoseconds that a buffer can be late before it "
444           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_QOS,
448       g_param_spec_boolean ("qos", "Qos",
449           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
450           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstBaseSink:async
453    *
454    * If set to #TRUE, the basesink will perform asynchronous state changes.
455    * When set to #FALSE, the sink will not signal the parent when it prerolls.
456    * Use this option when dealing with sparse streams or when synchronisation is
457    * not required.
458    *
459    * Since: 0.10.15
460    */
461   g_object_class_install_property (gobject_class, PROP_ASYNC,
462       g_param_spec_boolean ("async", "Async",
463           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
464           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
465   /**
466    * GstBaseSink:ts-offset
467    *
468    * Controls the final synchronisation, a negative value will render the buffer
469    * earlier while a positive value delays playback. This property can be
470    * used to fix synchronisation in bad files.
471    *
472    * Since: 0.10.15
473    */
474   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
475       g_param_spec_int64 ("ts-offset", "TS Offset",
476           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
477           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478
479   /**
480    * GstBaseSink:enable-last-buffer
481    *
482    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
483    * reference to the last buffer arrived and the last-buffer property is always
484    * set to NULL. This can be useful if you need buffers to be released as soon
485    * as possible, eg. if you're using a buffer pool.
486    *
487    * Since: 0.10.30
488    */
489   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
490       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
491           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
492           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
493
494   /**
495    * GstBaseSink:last-buffer
496    *
497    * The last buffer that arrived in the sink and was used for preroll or for
498    * rendering. This property can be used to generate thumbnails. This property
499    * can be NULL when the sink has not yet received a bufer.
500    *
501    * Since: 0.10.15
502    */
503   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
504       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
505           "The last buffer received in the sink", GST_TYPE_BUFFER,
506           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
507   /**
508    * GstBaseSink:blocksize
509    *
510    * The amount of bytes to pull when operating in pull mode.
511    *
512    * Since: 0.10.22
513    */
514   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
515   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
516       g_param_spec_uint ("blocksize", "Block size",
517           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
518           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
519   /**
520    * GstBaseSink:render-delay
521    *
522    * The additional delay between synchronisation and actual rendering of the
523    * media. This property will add additional latency to the device in order to
524    * make other sinks compensate for the delay.
525    *
526    * Since: 0.10.22
527    */
528   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
529       g_param_spec_uint64 ("render-delay", "Render Delay",
530           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
531           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
532   /**
533    * GstBaseSink:throttle-time
534    *
535    * The time to insert between buffers. This property can be used to control
536    * the maximum amount of buffers per second to render. Setting this property
537    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
538    *
539    * Since: 0.10.33
540    */
541   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
542       g_param_spec_uint64 ("throttle-time", "Throttle time",
543           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
544           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
545
546   gstelement_class->change_state =
547       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
548   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
549   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
550   gstelement_class->get_query_types =
551       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
552
553   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
554   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
555   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
556   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
557   klass->activate_pull =
558       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
559
560   /* Registering debug symbols for function pointers */
561   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
562   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
563   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
564   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
565   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
566   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
571 }
572
573 static GstCaps *
574 gst_base_sink_pad_getcaps (GstPad * pad)
575 {
576   GstBaseSinkClass *bclass;
577   GstBaseSink *bsink;
578   GstCaps *caps = NULL;
579
580   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
581   bclass = GST_BASE_SINK_GET_CLASS (bsink);
582
583   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
584     /* if we are operating in pull mode we only accept the negotiated caps */
585     GST_OBJECT_LOCK (pad);
586     if ((caps = GST_PAD_CAPS (pad)))
587       gst_caps_ref (caps);
588     GST_OBJECT_UNLOCK (pad);
589   }
590   if (caps == NULL) {
591     if (bclass->get_caps)
592       caps = bclass->get_caps (bsink);
593
594     if (caps == NULL) {
595       GstPadTemplate *pad_template;
596
597       pad_template =
598           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
599           "sink");
600       if (pad_template != NULL) {
601         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
602       }
603     }
604   }
605   gst_object_unref (bsink);
606
607   return caps;
608 }
609
610 static gboolean
611 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
612 {
613   GstBaseSinkClass *bclass;
614   GstBaseSink *bsink;
615   gboolean res = TRUE;
616
617   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
618   bclass = GST_BASE_SINK_GET_CLASS (bsink);
619
620   if (res && bclass->set_caps)
621     res = bclass->set_caps (bsink, caps);
622
623   gst_object_unref (bsink);
624
625   return res;
626 }
627
628 static void
629 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
630 {
631   GstBaseSinkClass *bclass;
632   GstBaseSink *bsink;
633
634   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
635   bclass = GST_BASE_SINK_GET_CLASS (bsink);
636
637   if (bclass->fixate)
638     bclass->fixate (bsink, caps);
639
640   gst_object_unref (bsink);
641 }
642
643 static GstFlowReturn
644 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
645     GstCaps * caps, GstBuffer ** buf)
646 {
647   GstBaseSinkClass *bclass;
648   GstBaseSink *bsink;
649   GstFlowReturn result = GST_FLOW_OK;
650
651   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
652   bclass = GST_BASE_SINK_GET_CLASS (bsink);
653
654   if (bclass->buffer_alloc)
655     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
656   else
657     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
658
659   gst_object_unref (bsink);
660
661   return result;
662 }
663
664 static void
665 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
666 {
667   GstPadTemplate *pad_template;
668   GstBaseSinkPrivate *priv;
669
670   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
671
672   pad_template =
673       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
674   g_return_if_fail (pad_template != NULL);
675
676   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
677
678   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
679   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
680   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
681   gst_pad_set_bufferalloc_function (basesink->sinkpad,
682       gst_base_sink_pad_buffer_alloc);
683   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
684   gst_pad_set_activatepush_function (basesink->sinkpad,
685       gst_base_sink_pad_activate_push);
686   gst_pad_set_activatepull_function (basesink->sinkpad,
687       gst_base_sink_pad_activate_pull);
688   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
689   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
690   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
691   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
692
693   basesink->pad_mode = GST_ACTIVATE_NONE;
694   basesink->preroll_queue = g_queue_new ();
695   basesink->abidata.ABI.clip_segment = gst_segment_new ();
696   priv->have_latency = FALSE;
697
698   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
699   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
700
701   basesink->sync = DEFAULT_SYNC;
702   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
703   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
704   priv->async_enabled = DEFAULT_ASYNC;
705   priv->ts_offset = DEFAULT_TS_OFFSET;
706   priv->render_delay = DEFAULT_RENDER_DELAY;
707   priv->blocksize = DEFAULT_BLOCKSIZE;
708   priv->cached_clock_id = NULL;
709   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
710   priv->throttle_time = DEFAULT_THROTTLE_TIME;
711
712   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
713 }
714
715 static void
716 gst_base_sink_finalize (GObject * object)
717 {
718   GstBaseSink *basesink;
719
720   basesink = GST_BASE_SINK (object);
721
722   g_queue_free (basesink->preroll_queue);
723   gst_segment_free (basesink->abidata.ABI.clip_segment);
724
725   G_OBJECT_CLASS (parent_class)->finalize (object);
726 }
727
728 /**
729  * gst_base_sink_set_sync:
730  * @sink: the sink
731  * @sync: the new sync value.
732  *
733  * Configures @sink to synchronize on the clock or not. When
734  * @sync is FALSE, incomming samples will be played as fast as
735  * possible. If @sync is TRUE, the timestamps of the incomming
736  * buffers will be used to schedule the exact render time of its
737  * contents.
738  *
739  * Since: 0.10.4
740  */
741 void
742 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
743 {
744   g_return_if_fail (GST_IS_BASE_SINK (sink));
745
746   GST_OBJECT_LOCK (sink);
747   sink->sync = sync;
748   GST_OBJECT_UNLOCK (sink);
749 }
750
751 /**
752  * gst_base_sink_get_sync:
753  * @sink: the sink
754  *
755  * Checks if @sink is currently configured to synchronize against the
756  * clock.
757  *
758  * Returns: TRUE if the sink is configured to synchronize against the clock.
759  *
760  * Since: 0.10.4
761  */
762 gboolean
763 gst_base_sink_get_sync (GstBaseSink * sink)
764 {
765   gboolean res;
766
767   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
768
769   GST_OBJECT_LOCK (sink);
770   res = sink->sync;
771   GST_OBJECT_UNLOCK (sink);
772
773   return res;
774 }
775
776 /**
777  * gst_base_sink_set_max_lateness:
778  * @sink: the sink
779  * @max_lateness: the new max lateness value.
780  *
781  * Sets the new max lateness value to @max_lateness. This value is
782  * used to decide if a buffer should be dropped or not based on the
783  * buffer timestamp and the current clock time. A value of -1 means
784  * an unlimited time.
785  *
786  * Since: 0.10.4
787  */
788 void
789 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
790 {
791   g_return_if_fail (GST_IS_BASE_SINK (sink));
792
793   GST_OBJECT_LOCK (sink);
794   sink->abidata.ABI.max_lateness = max_lateness;
795   GST_OBJECT_UNLOCK (sink);
796 }
797
798 /**
799  * gst_base_sink_get_max_lateness:
800  * @sink: the sink
801  *
802  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
803  * more details.
804  *
805  * Returns: The maximum time in nanoseconds that a buffer can be late
806  * before it is dropped and not rendered. A value of -1 means an
807  * unlimited time.
808  *
809  * Since: 0.10.4
810  */
811 gint64
812 gst_base_sink_get_max_lateness (GstBaseSink * sink)
813 {
814   gint64 res;
815
816   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
817
818   GST_OBJECT_LOCK (sink);
819   res = sink->abidata.ABI.max_lateness;
820   GST_OBJECT_UNLOCK (sink);
821
822   return res;
823 }
824
825 /**
826  * gst_base_sink_set_qos_enabled:
827  * @sink: the sink
828  * @enabled: the new qos value.
829  *
830  * Configures @sink to send Quality-of-Service events upstream.
831  *
832  * Since: 0.10.5
833  */
834 void
835 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
836 {
837   g_return_if_fail (GST_IS_BASE_SINK (sink));
838
839   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
840 }
841
842 /**
843  * gst_base_sink_is_qos_enabled:
844  * @sink: the sink
845  *
846  * Checks if @sink is currently configured to send Quality-of-Service events
847  * upstream.
848  *
849  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
850  *
851  * Since: 0.10.5
852  */
853 gboolean
854 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
855 {
856   gboolean res;
857
858   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
859
860   res = g_atomic_int_get (&sink->priv->qos_enabled);
861
862   return res;
863 }
864
865 /**
866  * gst_base_sink_set_async_enabled:
867  * @sink: the sink
868  * @enabled: the new async value.
869  *
870  * Configures @sink to perform all state changes asynchronusly. When async is
871  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
872  * preroll buffer. This feature is usefull if the sink does not synchronize
873  * against the clock or when it is dealing with sparse streams.
874  *
875  * Since: 0.10.15
876  */
877 void
878 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
879 {
880   g_return_if_fail (GST_IS_BASE_SINK (sink));
881
882   GST_PAD_PREROLL_LOCK (sink->sinkpad);
883   g_atomic_int_set (&sink->priv->async_enabled, enabled);
884   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
885   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
886 }
887
888 /**
889  * gst_base_sink_is_async_enabled:
890  * @sink: the sink
891  *
892  * Checks if @sink is currently configured to perform asynchronous state
893  * changes to PAUSED.
894  *
895  * Returns: TRUE if the sink is configured to perform asynchronous state
896  * changes.
897  *
898  * Since: 0.10.15
899  */
900 gboolean
901 gst_base_sink_is_async_enabled (GstBaseSink * sink)
902 {
903   gboolean res;
904
905   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
906
907   res = g_atomic_int_get (&sink->priv->async_enabled);
908
909   return res;
910 }
911
912 /**
913  * gst_base_sink_set_ts_offset:
914  * @sink: the sink
915  * @offset: the new offset
916  *
917  * Adjust the synchronisation of @sink with @offset. A negative value will
918  * render buffers earlier than their timestamp. A positive value will delay
919  * rendering. This function can be used to fix playback of badly timestamped
920  * buffers.
921  *
922  * Since: 0.10.15
923  */
924 void
925 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
926 {
927   g_return_if_fail (GST_IS_BASE_SINK (sink));
928
929   GST_OBJECT_LOCK (sink);
930   sink->priv->ts_offset = offset;
931   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
932   GST_OBJECT_UNLOCK (sink);
933 }
934
935 /**
936  * gst_base_sink_get_ts_offset:
937  * @sink: the sink
938  *
939  * Get the synchronisation offset of @sink.
940  *
941  * Returns: The synchronisation offset.
942  *
943  * Since: 0.10.15
944  */
945 GstClockTimeDiff
946 gst_base_sink_get_ts_offset (GstBaseSink * sink)
947 {
948   GstClockTimeDiff res;
949
950   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
951
952   GST_OBJECT_LOCK (sink);
953   res = sink->priv->ts_offset;
954   GST_OBJECT_UNLOCK (sink);
955
956   return res;
957 }
958
959 /**
960  * gst_base_sink_get_last_buffer:
961  * @sink: the sink
962  *
963  * Get the last buffer that arrived in the sink and was used for preroll or for
964  * rendering. This property can be used to generate thumbnails.
965  *
966  * The #GstCaps on the buffer can be used to determine the type of the buffer.
967  *
968  * Free-function: gst_buffer_unref
969  *
970  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
971  *     This function returns NULL when no buffer has arrived in the sink yet
972  *     or when the sink is not in PAUSED or PLAYING.
973  *
974  * Since: 0.10.15
975  */
976 GstBuffer *
977 gst_base_sink_get_last_buffer (GstBaseSink * sink)
978 {
979   GstBuffer *res;
980
981   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
982
983   GST_OBJECT_LOCK (sink);
984   if ((res = sink->priv->last_buffer))
985     gst_buffer_ref (res);
986   GST_OBJECT_UNLOCK (sink);
987
988   return res;
989 }
990
991 /* with OBJECT_LOCK */
992 static void
993 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
994 {
995   GstBuffer *old;
996
997   old = sink->priv->last_buffer;
998   if (G_LIKELY (old != buffer)) {
999     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1000     if (G_LIKELY (buffer))
1001       gst_buffer_ref (buffer);
1002     sink->priv->last_buffer = buffer;
1003   } else {
1004     old = NULL;
1005   }
1006   /* avoid unreffing with the lock because cleanup code might want to take the
1007    * lock too */
1008   if (G_LIKELY (old)) {
1009     GST_OBJECT_UNLOCK (sink);
1010     gst_buffer_unref (old);
1011     GST_OBJECT_LOCK (sink);
1012   }
1013 }
1014
1015 static void
1016 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1017 {
1018   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1019     return;
1020
1021   GST_OBJECT_LOCK (sink);
1022   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1023   GST_OBJECT_UNLOCK (sink);
1024 }
1025
1026 /**
1027  * gst_base_sink_set_last_buffer_enabled:
1028  * @sink: the sink
1029  * @enabled: the new enable-last-buffer value.
1030  *
1031  * Configures @sink to store the last received buffer in the last-buffer
1032  * property.
1033  *
1034  * Since: 0.10.30
1035  */
1036 void
1037 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1038 {
1039   g_return_if_fail (GST_IS_BASE_SINK (sink));
1040
1041   /* Only take lock if we change the value */
1042   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1043           !enabled, enabled) && !enabled) {
1044     GST_OBJECT_LOCK (sink);
1045     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1046     GST_OBJECT_UNLOCK (sink);
1047   }
1048 }
1049
1050 /**
1051  * gst_base_sink_is_last_buffer_enabled:
1052  * @sink: the sink
1053  *
1054  * Checks if @sink is currently configured to store the last received buffer in
1055  * the last-buffer property.
1056  *
1057  * Returns: TRUE if the sink is configured to store the last received buffer.
1058  *
1059  * Since: 0.10.30
1060  */
1061 gboolean
1062 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1063 {
1064   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1065
1066   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1067 }
1068
1069 /**
1070  * gst_base_sink_get_latency:
1071  * @sink: the sink
1072  *
1073  * Get the currently configured latency.
1074  *
1075  * Returns: The configured latency.
1076  *
1077  * Since: 0.10.12
1078  */
1079 GstClockTime
1080 gst_base_sink_get_latency (GstBaseSink * sink)
1081 {
1082   GstClockTime res;
1083
1084   GST_OBJECT_LOCK (sink);
1085   res = sink->priv->latency;
1086   GST_OBJECT_UNLOCK (sink);
1087
1088   return res;
1089 }
1090
1091 /**
1092  * gst_base_sink_query_latency:
1093  * @sink: the sink
1094  * @live: (out) (allow-none): if the sink is live
1095  * @upstream_live: (out) (allow-none): if an upstream element is live
1096  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1097  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1098  *
1099  * Query the sink for the latency parameters. The latency will be queried from
1100  * the upstream elements. @live will be TRUE if @sink is configured to
1101  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1102  * element is live.
1103  *
1104  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1105  * for the latency introduced by the upstream elements by setting the
1106  * @min_latency to a strictly possitive value.
1107  *
1108  * This function is mostly used by subclasses.
1109  *
1110  * Returns: TRUE if the query succeeded.
1111  *
1112  * Since: 0.10.12
1113  */
1114 gboolean
1115 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1116     gboolean * upstream_live, GstClockTime * min_latency,
1117     GstClockTime * max_latency)
1118 {
1119   gboolean l, us_live, res, have_latency;
1120   GstClockTime min, max, render_delay;
1121   GstQuery *query;
1122   GstClockTime us_min, us_max;
1123
1124   /* we are live when we sync to the clock */
1125   GST_OBJECT_LOCK (sink);
1126   l = sink->sync;
1127   have_latency = sink->priv->have_latency;
1128   render_delay = sink->priv->render_delay;
1129   GST_OBJECT_UNLOCK (sink);
1130
1131   /* assume no latency */
1132   min = 0;
1133   max = -1;
1134   us_live = FALSE;
1135
1136   if (have_latency) {
1137     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1138     /* we are ready for a latency query this is when we preroll or when we are
1139      * not async. */
1140     query = gst_query_new_latency ();
1141
1142     /* ask the peer for the latency */
1143     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1144       /* get upstream min and max latency */
1145       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1146
1147       if (us_live) {
1148         /* upstream live, use its latency, subclasses should use these
1149          * values to create the complete latency. */
1150         min = us_min;
1151         max = us_max;
1152       }
1153       if (l) {
1154         /* we need to add the render delay if we are live */
1155         if (min != -1)
1156           min += render_delay;
1157         if (max != -1)
1158           max += render_delay;
1159       }
1160     }
1161     gst_query_unref (query);
1162   } else {
1163     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1164     res = FALSE;
1165   }
1166
1167   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1168   if (!res) {
1169     if (!l) {
1170       res = TRUE;
1171       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1172     } else {
1173       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1174     }
1175   }
1176
1177   if (res) {
1178     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1179         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1180         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1181
1182     if (live)
1183       *live = l;
1184     if (upstream_live)
1185       *upstream_live = us_live;
1186     if (min_latency)
1187       *min_latency = min;
1188     if (max_latency)
1189       *max_latency = max;
1190   }
1191   return res;
1192 }
1193
1194 /**
1195  * gst_base_sink_set_render_delay:
1196  * @sink: a #GstBaseSink
1197  * @delay: the new delay
1198  *
1199  * Set the render delay in @sink to @delay. The render delay is the time
1200  * between actual rendering of a buffer and its synchronisation time. Some
1201  * devices might delay media rendering which can be compensated for with this
1202  * function.
1203  *
1204  * After calling this function, this sink will report additional latency and
1205  * other sinks will adjust their latency to delay the rendering of their media.
1206  *
1207  * This function is usually called by subclasses.
1208  *
1209  * Since: 0.10.21
1210  */
1211 void
1212 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1213 {
1214   GstClockTime old_render_delay;
1215
1216   g_return_if_fail (GST_IS_BASE_SINK (sink));
1217
1218   GST_OBJECT_LOCK (sink);
1219   old_render_delay = sink->priv->render_delay;
1220   sink->priv->render_delay = delay;
1221   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1222       GST_TIME_ARGS (delay));
1223   GST_OBJECT_UNLOCK (sink);
1224
1225   if (delay != old_render_delay) {
1226     GST_DEBUG_OBJECT (sink, "posting latency changed");
1227     gst_element_post_message (GST_ELEMENT_CAST (sink),
1228         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1229   }
1230 }
1231
1232 /**
1233  * gst_base_sink_get_render_delay:
1234  * @sink: a #GstBaseSink
1235  *
1236  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1237  * information about the render delay.
1238  *
1239  * Returns: the render delay of @sink.
1240  *
1241  * Since: 0.10.21
1242  */
1243 GstClockTime
1244 gst_base_sink_get_render_delay (GstBaseSink * sink)
1245 {
1246   GstClockTimeDiff res;
1247
1248   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1249
1250   GST_OBJECT_LOCK (sink);
1251   res = sink->priv->render_delay;
1252   GST_OBJECT_UNLOCK (sink);
1253
1254   return res;
1255 }
1256
1257 /**
1258  * gst_base_sink_set_blocksize:
1259  * @sink: a #GstBaseSink
1260  * @blocksize: the blocksize in bytes
1261  *
1262  * Set the number of bytes that the sink will pull when it is operating in pull
1263  * mode.
1264  *
1265  * Since: 0.10.22
1266  */
1267 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1268 void
1269 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1270 {
1271   g_return_if_fail (GST_IS_BASE_SINK (sink));
1272
1273   GST_OBJECT_LOCK (sink);
1274   sink->priv->blocksize = blocksize;
1275   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1276   GST_OBJECT_UNLOCK (sink);
1277 }
1278
1279 /**
1280  * gst_base_sink_get_blocksize:
1281  * @sink: a #GstBaseSink
1282  *
1283  * Get the number of bytes that the sink will pull when it is operating in pull
1284  * mode.
1285  *
1286  * Returns: the number of bytes @sink will pull in pull mode.
1287  *
1288  * Since: 0.10.22
1289  */
1290 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1291 guint
1292 gst_base_sink_get_blocksize (GstBaseSink * sink)
1293 {
1294   guint res;
1295
1296   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1297
1298   GST_OBJECT_LOCK (sink);
1299   res = sink->priv->blocksize;
1300   GST_OBJECT_UNLOCK (sink);
1301
1302   return res;
1303 }
1304
1305 /**
1306  * gst_base_sink_set_throttle_time:
1307  * @sink: a #GstBaseSink
1308  * @throttle: the throttle time in nanoseconds
1309  *
1310  * Set the time that will be inserted between rendered buffers. This
1311  * can be used to control the maximum buffers per second that the sink
1312  * will render. 
1313  *
1314  * Since: 0.10.33
1315  */
1316 void
1317 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1318 {
1319   g_return_if_fail (GST_IS_BASE_SINK (sink));
1320
1321   GST_OBJECT_LOCK (sink);
1322   sink->priv->throttle_time = throttle;
1323   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1324   GST_OBJECT_UNLOCK (sink);
1325 }
1326
1327 /**
1328  * gst_base_sink_get_throttle_time:
1329  * @sink: a #GstBaseSink
1330  *
1331  * Get the time that will be inserted between frames to control the 
1332  * maximum buffers per second.
1333  *
1334  * Returns: the number of nanoseconds @sink will put between frames.
1335  *
1336  * Since: 0.10.33
1337  */
1338 guint64
1339 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1340 {
1341   guint64 res;
1342
1343   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1344
1345   GST_OBJECT_LOCK (sink);
1346   res = sink->priv->throttle_time;
1347   GST_OBJECT_UNLOCK (sink);
1348
1349   return res;
1350 }
1351
1352 static void
1353 gst_base_sink_set_property (GObject * object, guint prop_id,
1354     const GValue * value, GParamSpec * pspec)
1355 {
1356   GstBaseSink *sink = GST_BASE_SINK (object);
1357
1358   switch (prop_id) {
1359     case PROP_PREROLL_QUEUE_LEN:
1360       /* preroll lock necessary to serialize with finish_preroll */
1361       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1362       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1363       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1364       break;
1365     case PROP_SYNC:
1366       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1367       break;
1368     case PROP_MAX_LATENESS:
1369       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1370       break;
1371     case PROP_QOS:
1372       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1373       break;
1374     case PROP_ASYNC:
1375       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1376       break;
1377     case PROP_TS_OFFSET:
1378       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1379       break;
1380     case PROP_BLOCKSIZE:
1381       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1382       break;
1383     case PROP_RENDER_DELAY:
1384       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1385       break;
1386     case PROP_ENABLE_LAST_BUFFER:
1387       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1388       break;
1389     case PROP_THROTTLE_TIME:
1390       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1391       break;
1392     default:
1393       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1394       break;
1395   }
1396 }
1397
1398 static void
1399 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1400     GParamSpec * pspec)
1401 {
1402   GstBaseSink *sink = GST_BASE_SINK (object);
1403
1404   switch (prop_id) {
1405     case PROP_PREROLL_QUEUE_LEN:
1406       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1407       break;
1408     case PROP_SYNC:
1409       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1410       break;
1411     case PROP_MAX_LATENESS:
1412       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1413       break;
1414     case PROP_QOS:
1415       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1416       break;
1417     case PROP_ASYNC:
1418       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1419       break;
1420     case PROP_TS_OFFSET:
1421       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1422       break;
1423     case PROP_LAST_BUFFER:
1424       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1425       break;
1426     case PROP_ENABLE_LAST_BUFFER:
1427       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1428       break;
1429     case PROP_BLOCKSIZE:
1430       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1431       break;
1432     case PROP_RENDER_DELAY:
1433       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1434       break;
1435     case PROP_THROTTLE_TIME:
1436       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1437       break;
1438     default:
1439       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1440       break;
1441   }
1442 }
1443
1444
1445 static GstCaps *
1446 gst_base_sink_get_caps (GstBaseSink * sink)
1447 {
1448   return NULL;
1449 }
1450
1451 static gboolean
1452 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1453 {
1454   return TRUE;
1455 }
1456
1457 static GstFlowReturn
1458 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1459     GstCaps * caps, GstBuffer ** buf)
1460 {
1461   *buf = NULL;
1462   return GST_FLOW_OK;
1463 }
1464
1465 /* with PREROLL_LOCK, STREAM_LOCK */
1466 static void
1467 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1468 {
1469   GstMiniObject *obj;
1470
1471   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1472   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1473     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1474     gst_mini_object_unref (obj);
1475   }
1476   /* we can't have EOS anymore now */
1477   basesink->eos = FALSE;
1478   basesink->priv->received_eos = FALSE;
1479   basesink->have_preroll = FALSE;
1480   basesink->priv->step_unlock = FALSE;
1481   basesink->eos_queued = FALSE;
1482   basesink->preroll_queued = 0;
1483   basesink->buffers_queued = 0;
1484   basesink->events_queued = 0;
1485   /* can't report latency anymore until we preroll again */
1486   if (basesink->priv->async_enabled) {
1487     GST_OBJECT_LOCK (basesink);
1488     basesink->priv->have_latency = FALSE;
1489     GST_OBJECT_UNLOCK (basesink);
1490   }
1491   /* and signal any waiters now */
1492   GST_PAD_PREROLL_SIGNAL (pad);
1493 }
1494
1495 /* with STREAM_LOCK, configures given segment with the event information. */
1496 static void
1497 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1498     GstEvent * event, GstSegment * segment)
1499 {
1500   gboolean update;
1501   gdouble rate, arate;
1502   GstFormat format;
1503   gint64 start;
1504   gint64 stop;
1505   gint64 time;
1506
1507   /* the newsegment event is needed to bring the buffer timestamps to the
1508    * stream time and to drop samples outside of the playback segment. */
1509   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1510       &start, &stop, &time);
1511
1512   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1513    * We protect with the OBJECT_LOCK so that we can use the values to
1514    * safely answer a POSITION query. */
1515   GST_OBJECT_LOCK (basesink);
1516   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1517       stop, time);
1518
1519   if (format == GST_FORMAT_TIME) {
1520     GST_DEBUG_OBJECT (basesink,
1521         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1522         "format GST_FORMAT_TIME, "
1523         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1524         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1525         update, rate, arate, GST_TIME_ARGS (segment->start),
1526         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1527         GST_TIME_ARGS (segment->accum));
1528   } else {
1529     GST_DEBUG_OBJECT (basesink,
1530         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1531         "format %d, "
1532         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1533         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1534         segment->format, segment->start, segment->stop, segment->time,
1535         segment->accum);
1536   }
1537   GST_OBJECT_UNLOCK (basesink);
1538 }
1539
1540 /* with PREROLL_LOCK, STREAM_LOCK */
1541 static gboolean
1542 gst_base_sink_commit_state (GstBaseSink * basesink)
1543 {
1544   /* commit state and proceed to next pending state */
1545   GstState current, next, pending, post_pending;
1546   gboolean post_paused = FALSE;
1547   gboolean post_async_done = FALSE;
1548   gboolean post_playing = FALSE;
1549
1550   /* we are certainly not playing async anymore now */
1551   basesink->playing_async = FALSE;
1552
1553   GST_OBJECT_LOCK (basesink);
1554   current = GST_STATE (basesink);
1555   next = GST_STATE_NEXT (basesink);
1556   pending = GST_STATE_PENDING (basesink);
1557   post_pending = pending;
1558
1559   switch (pending) {
1560     case GST_STATE_PLAYING:
1561     {
1562       GstBaseSinkClass *bclass;
1563       GstStateChangeReturn ret;
1564
1565       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1566
1567       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1568
1569       basesink->need_preroll = FALSE;
1570       post_async_done = TRUE;
1571       basesink->priv->commited = TRUE;
1572       post_playing = TRUE;
1573       /* post PAUSED too when we were READY */
1574       if (current == GST_STATE_READY) {
1575         post_paused = TRUE;
1576       }
1577
1578       /* make sure we notify the subclass of async playing */
1579       if (bclass->async_play) {
1580         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1581         ret = bclass->async_play (basesink);
1582         if (ret == GST_STATE_CHANGE_FAILURE)
1583           goto async_failed;
1584       }
1585       break;
1586     }
1587     case GST_STATE_PAUSED:
1588       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1589       post_paused = TRUE;
1590       post_async_done = TRUE;
1591       basesink->priv->commited = TRUE;
1592       post_pending = GST_STATE_VOID_PENDING;
1593       break;
1594     case GST_STATE_READY:
1595     case GST_STATE_NULL:
1596       goto stopping;
1597     case GST_STATE_VOID_PENDING:
1598       goto nothing_pending;
1599     default:
1600       break;
1601   }
1602
1603   /* we can report latency queries now */
1604   basesink->priv->have_latency = TRUE;
1605
1606   GST_STATE (basesink) = pending;
1607   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1608   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1609   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1610   GST_OBJECT_UNLOCK (basesink);
1611
1612   if (post_paused) {
1613     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1614     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1615         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1616             current, next, post_pending));
1617   }
1618   if (post_async_done) {
1619     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1620     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1621         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1622   }
1623   if (post_playing) {
1624     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1625     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1626         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1627             next, pending, GST_STATE_VOID_PENDING));
1628   }
1629
1630   GST_STATE_BROADCAST (basesink);
1631
1632   return TRUE;
1633
1634 nothing_pending:
1635   {
1636     /* Depending on the state, set our vars. We get in this situation when the
1637      * state change function got a change to update the state vars before the
1638      * streaming thread did. This is fine but we need to make sure that we
1639      * update the need_preroll var since it was TRUE when we got here and might
1640      * become FALSE if we got to PLAYING. */
1641     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1642         gst_element_state_get_name (current));
1643     switch (current) {
1644       case GST_STATE_PLAYING:
1645         basesink->need_preroll = FALSE;
1646         break;
1647       case GST_STATE_PAUSED:
1648         basesink->need_preroll = TRUE;
1649         break;
1650       default:
1651         basesink->need_preroll = FALSE;
1652         basesink->flushing = TRUE;
1653         break;
1654     }
1655     /* we can report latency queries now */
1656     basesink->priv->have_latency = TRUE;
1657     GST_OBJECT_UNLOCK (basesink);
1658     return TRUE;
1659   }
1660 stopping:
1661   {
1662     /* app is going to READY */
1663     GST_DEBUG_OBJECT (basesink, "stopping");
1664     basesink->need_preroll = FALSE;
1665     basesink->flushing = TRUE;
1666     GST_OBJECT_UNLOCK (basesink);
1667     return FALSE;
1668   }
1669 async_failed:
1670   {
1671     GST_DEBUG_OBJECT (basesink, "async commit failed");
1672     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1673     GST_OBJECT_UNLOCK (basesink);
1674     return FALSE;
1675   }
1676 }
1677
1678 static void
1679 start_stepping (GstBaseSink * sink, GstSegment * segment,
1680     GstStepInfo * pending, GstStepInfo * current)
1681 {
1682   gint64 end;
1683   GstMessage *message;
1684
1685   GST_DEBUG_OBJECT (sink, "update pending step");
1686
1687   GST_OBJECT_LOCK (sink);
1688   memcpy (current, pending, sizeof (GstStepInfo));
1689   pending->valid = FALSE;
1690   GST_OBJECT_UNLOCK (sink);
1691
1692   /* post message first */
1693   message =
1694       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1695       current->amount, current->rate, current->flush, current->intermediate);
1696   gst_message_set_seqnum (message, current->seqnum);
1697   gst_element_post_message (GST_ELEMENT (sink), message);
1698
1699   /* get the running time of where we paused and remember it */
1700   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1701   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1702
1703   /* set the new rate for the remainder of the segment */
1704   current->start_rate = segment->rate;
1705   segment->rate *= current->rate;
1706   segment->abs_rate = ABS (segment->rate);
1707
1708   /* save values */
1709   if (segment->rate > 0.0)
1710     current->start_stop = segment->stop;
1711   else
1712     current->start_start = segment->start;
1713
1714   if (current->format == GST_FORMAT_TIME) {
1715     end = current->start + current->amount;
1716     if (!current->flush) {
1717       /* update the segment clipping regions for non-flushing seeks */
1718       if (segment->rate > 0.0) {
1719         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1720         segment->last_stop = segment->stop;
1721       } else {
1722         gint64 position;
1723
1724         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1725         segment->time = position;
1726         segment->start = position;
1727         segment->last_stop = position;
1728       }
1729     }
1730   }
1731
1732   GST_DEBUG_OBJECT (sink,
1733       "segment now rate %lf, applied rate %lf, "
1734       "format GST_FORMAT_TIME, "
1735       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1736       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1737       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1738       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1739       GST_TIME_ARGS (segment->accum));
1740
1741   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1742       GST_TIME_ARGS (current->start));
1743
1744   if (current->amount == -1) {
1745     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1746     current->valid = FALSE;
1747   } else {
1748     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1749         "rate: %f", current->amount, gst_format_get_name (current->format),
1750         current->rate);
1751   }
1752 }
1753
1754 static void
1755 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1756     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1757 {
1758   gint64 stop, position;
1759   GstMessage *message;
1760
1761   GST_DEBUG_OBJECT (sink, "step complete");
1762
1763   if (segment->rate > 0.0)
1764     stop = rstart;
1765   else
1766     stop = rstop;
1767
1768   GST_DEBUG_OBJECT (sink,
1769       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1770
1771   if (stop == -1)
1772     current->duration = current->position;
1773   else
1774     current->duration = stop - current->start;
1775
1776   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1777       GST_TIME_ARGS (current->duration));
1778
1779   position = current->start + current->duration;
1780
1781   /* now move the segment to the new running time */
1782   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1783
1784   if (current->flush) {
1785     /* and remove the accumulated time we flushed, start time did not change */
1786     segment->accum = current->start;
1787   } else {
1788     /* start time is now the stepped position */
1789     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1790   }
1791
1792   /* restore the previous rate */
1793   segment->rate = current->start_rate;
1794   segment->abs_rate = ABS (segment->rate);
1795
1796   if (segment->rate > 0.0)
1797     segment->stop = current->start_stop;
1798   else
1799     segment->start = current->start_start;
1800
1801   /* the clip segment is used for position report in paused... */
1802   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1803
1804   /* post the step done when we know the stepped duration in TIME */
1805   message =
1806       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1807       current->amount, current->rate, current->flush, current->intermediate,
1808       current->duration, eos);
1809   gst_message_set_seqnum (message, current->seqnum);
1810   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1811
1812   if (!current->intermediate)
1813     sink->need_preroll = current->need_preroll;
1814
1815   /* and the current step info finished and becomes invalid */
1816   current->valid = FALSE;
1817 }
1818
1819 static gboolean
1820 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1821     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1822     gint64 * rstop)
1823 {
1824   gboolean step_end = FALSE;
1825
1826   /* see if we need to skip this buffer because of stepping */
1827   switch (current->format) {
1828     case GST_FORMAT_TIME:
1829     {
1830       guint64 end;
1831       gint64 first, last;
1832
1833       if (segment->rate > 0.0) {
1834         if (segment->stop == *cstop)
1835           *rstop = *rstart + current->amount;
1836
1837         first = *rstart;
1838         last = *rstop;
1839       } else {
1840         if (segment->start == *cstart)
1841           *rstart = *rstop + current->amount;
1842
1843         first = *rstop;
1844         last = *rstart;
1845       }
1846
1847       end = current->start + current->amount;
1848       current->position = first - current->start;
1849
1850       if (G_UNLIKELY (segment->abs_rate != 1.0))
1851         current->position /= segment->abs_rate;
1852
1853       GST_DEBUG_OBJECT (sink,
1854           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1855           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1856       GST_DEBUG_OBJECT (sink,
1857           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1858           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1859           GST_TIME_ARGS (last - current->start),
1860           GST_TIME_ARGS (current->amount));
1861
1862       if ((current->flush && current->position >= current->amount)
1863           || last >= end) {
1864         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1865         step_end = TRUE;
1866         if (segment->rate > 0.0) {
1867           *rstart = end;
1868           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1869         } else {
1870           *rstop = end;
1871           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1872         }
1873       }
1874       GST_DEBUG_OBJECT (sink,
1875           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1876           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1877       GST_DEBUG_OBJECT (sink,
1878           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1879           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1880       break;
1881     }
1882     case GST_FORMAT_BUFFERS:
1883       GST_DEBUG_OBJECT (sink,
1884           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1885           current->position, current->amount);
1886
1887       if (current->position < current->amount) {
1888         current->position++;
1889       } else {
1890         step_end = TRUE;
1891       }
1892       break;
1893     case GST_FORMAT_DEFAULT:
1894     default:
1895       GST_DEBUG_OBJECT (sink,
1896           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1897           current->position, current->amount);
1898       break;
1899   }
1900   return step_end;
1901 }
1902
1903 /* with STREAM_LOCK, PREROLL_LOCK
1904  *
1905  * Returns TRUE if the object needs synchronisation and takes therefore
1906  * part in prerolling.
1907  *
1908  * rsstart/rsstop contain the start/stop in stream time.
1909  * rrstart/rrstop contain the start/stop in running time.
1910  */
1911 static gboolean
1912 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1913     GstClockTime * rsstart, GstClockTime * rsstop,
1914     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1915     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1916     gboolean * step_end, guint8 obj_type)
1917 {
1918   GstBaseSinkClass *bclass;
1919   GstBuffer *buffer;
1920   GstClockTime start, stop;     /* raw start/stop timestamps */
1921   gint64 cstart, cstop;         /* clipped raw timestamps */
1922   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1923   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1924   GstFormat format;
1925   GstBaseSinkPrivate *priv;
1926   gboolean eos;
1927
1928   priv = basesink->priv;
1929
1930   /* start with nothing */
1931   start = stop = GST_CLOCK_TIME_NONE;
1932
1933   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1934     GstEvent *event = GST_EVENT_CAST (obj);
1935
1936     switch (GST_EVENT_TYPE (event)) {
1937         /* EOS event needs syncing */
1938       case GST_EVENT_EOS:
1939       {
1940         if (basesink->segment.rate >= 0.0) {
1941           sstart = sstop = priv->current_sstop;
1942           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1943             /* we have not seen a buffer yet, use the segment values */
1944             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1945                 basesink->segment.format, basesink->segment.stop);
1946           }
1947         } else {
1948           sstart = sstop = priv->current_sstart;
1949           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1950             /* we have not seen a buffer yet, use the segment values */
1951             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1952                 basesink->segment.format, basesink->segment.start);
1953           }
1954         }
1955
1956         rstart = rstop = priv->eos_rtime;
1957         *do_sync = rstart != -1;
1958         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1959             GST_TIME_ARGS (rstart));
1960         /* if we are stepping, we end now */
1961         *step_end = step->valid;
1962         eos = TRUE;
1963         goto eos_done;
1964       }
1965       default:
1966         /* other events do not need syncing */
1967         /* FIXME, maybe NEWSEGMENT might need synchronisation
1968          * since the POSITION query depends on accumulated times and
1969          * we cannot accumulate the current segment before the previous
1970          * one completed.
1971          */
1972         return FALSE;
1973     }
1974   }
1975
1976   eos = FALSE;
1977
1978 again:
1979   /* else do buffer sync code */
1980   buffer = GST_BUFFER_CAST (obj);
1981
1982   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1983
1984   /* just get the times to see if we need syncing, if the start returns -1 we
1985    * don't sync. */
1986   if (bclass->get_times)
1987     bclass->get_times (basesink, buffer, &start, &stop);
1988
1989   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1990     /* we don't need to sync but we still want to get the timestamps for
1991      * tracking the position */
1992     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1993     *do_sync = FALSE;
1994   } else {
1995     *do_sync = TRUE;
1996   }
1997
1998   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1999       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2000       GST_TIME_ARGS (stop), *do_sync);
2001
2002   /* collect segment and format for code clarity */
2003   format = segment->format;
2004
2005   /* no timestamp clipping if we did not get a TIME segment format */
2006   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2007     cstart = start;
2008     cstop = stop;
2009     /* do running and stream time in TIME format */
2010     format = GST_FORMAT_TIME;
2011     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2012     goto do_times;
2013   }
2014
2015   /* clip, only when we know about time */
2016   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2017               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2018     if (step->valid) {
2019       GST_DEBUG_OBJECT (basesink, "step out of segment");
2020       /* when we are stepping, pretend we're at the end of the segment */
2021       if (segment->rate > 0.0) {
2022         cstart = segment->stop;
2023         cstop = segment->stop;
2024       } else {
2025         cstart = segment->start;
2026         cstop = segment->start;
2027       }
2028       goto do_times;
2029     }
2030     goto out_of_segment;
2031   }
2032
2033   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2034     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2035         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2036         GST_TIME_ARGS (cstop));
2037   }
2038
2039   /* set last stop position */
2040   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2041     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2042   else
2043     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2044
2045 do_times:
2046   rstart = gst_segment_to_running_time (segment, format, cstart);
2047   rstop = gst_segment_to_running_time (segment, format, cstop);
2048
2049   if (G_UNLIKELY (step->valid)) {
2050     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2051                 &rstart, &rstop))) {
2052       /* step is still busy, we discard data when we are flushing */
2053       *stepped = step->flush;
2054       GST_DEBUG_OBJECT (basesink, "stepping busy");
2055     }
2056   }
2057   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2058    * upstream is behaving very badly */
2059   sstart = gst_segment_to_stream_time (segment, format, cstart);
2060   sstop = gst_segment_to_stream_time (segment, format, cstop);
2061
2062 eos_done:
2063   /* eos_done label only called when doing EOS, we also stop stepping then */
2064   if (*step_end && step->flush) {
2065     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2066     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2067     *step_end = FALSE;
2068     /* re-determine running start times for adjusted segment
2069      * (which has a flushed amount of running/accumulated time removed) */
2070     if (!GST_IS_EVENT (obj)) {
2071       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2072       goto again;
2073     }
2074   }
2075
2076   /* save times */
2077   *rsstart = sstart;
2078   *rsstop = sstop;
2079   *rrstart = rstart;
2080   *rrstop = rstop;
2081
2082   /* buffers and EOS always need syncing and preroll */
2083   return TRUE;
2084
2085   /* special cases */
2086 out_of_segment:
2087   {
2088     /* we usually clip in the chain function already but stepping could cause
2089      * the segment to be updated later. we return FALSE so that we don't try
2090      * to sync on it. */
2091     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2092     return FALSE;
2093   }
2094 }
2095
2096 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2097  * adjust a timestamp with the latency and timestamp offset. This function does
2098  * not adjust for the render delay. */
2099 static GstClockTime
2100 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2101 {
2102   GstClockTimeDiff ts_offset;
2103
2104   /* don't do anything funny with invalid timestamps */
2105   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2106     return time;
2107
2108   time += basesink->priv->latency;
2109
2110   /* apply offset, be carefull for underflows */
2111   ts_offset = basesink->priv->ts_offset;
2112   if (ts_offset < 0) {
2113     ts_offset = -ts_offset;
2114     if (ts_offset < time)
2115       time -= ts_offset;
2116     else
2117       time = 0;
2118   } else
2119     time += ts_offset;
2120
2121   /* subtract the render delay again, which was included in the latency */
2122   if (time > basesink->priv->render_delay)
2123     time -= basesink->priv->render_delay;
2124   else
2125     time = 0;
2126
2127   return time;
2128 }
2129
2130 /**
2131  * gst_base_sink_wait_clock:
2132  * @sink: the sink
2133  * @time: the running_time to be reached
2134  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2135  *
2136  * This function will block until @time is reached. It is usually called by
2137  * subclasses that use their own internal synchronisation.
2138  *
2139  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2140  * returned. Likewise, if synchronisation is disabled in the element or there
2141  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2142  *
2143  * This function should only be called with the PREROLL_LOCK held, like when
2144  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2145  * receiving a buffer in
2146  * the #GstBaseSinkClass.render() vmethod.
2147  *
2148  * The @time argument should be the running_time of when this method should
2149  * return and is not adjusted with any latency or offset configured in the
2150  * sink.
2151  *
2152  * Since: 0.10.20
2153  *
2154  * Returns: #GstClockReturn
2155  */
2156 GstClockReturn
2157 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2158     GstClockTimeDiff * jitter)
2159 {
2160   GstClockReturn ret;
2161   GstClock *clock;
2162   GstClockTime base_time;
2163
2164   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2165     goto invalid_time;
2166
2167   GST_OBJECT_LOCK (sink);
2168   if (G_UNLIKELY (!sink->sync))
2169     goto no_sync;
2170
2171   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2172     goto no_clock;
2173
2174   base_time = GST_ELEMENT_CAST (sink)->base_time;
2175   GST_LOG_OBJECT (sink,
2176       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2177       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2178
2179   /* add base_time to running_time to get the time against the clock */
2180   time += base_time;
2181
2182   /* Re-use existing clockid if available */
2183   if (G_LIKELY (sink->priv->cached_clock_id != NULL)) {
2184     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2185             time)) {
2186       gst_clock_id_unref (sink->priv->cached_clock_id);
2187       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2188     }
2189   } else
2190     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2191   GST_OBJECT_UNLOCK (sink);
2192
2193   /* A blocking wait is performed on the clock. We save the ClockID
2194    * so we can unlock the entry at any time. While we are blocking, we
2195    * release the PREROLL_LOCK so that other threads can interrupt the
2196    * entry. */
2197   sink->clock_id = sink->priv->cached_clock_id;
2198   /* release the preroll lock while waiting */
2199   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2200
2201   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2202
2203   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2204   sink->clock_id = NULL;
2205
2206   return ret;
2207
2208   /* no syncing needed */
2209 invalid_time:
2210   {
2211     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2212     return GST_CLOCK_BADTIME;
2213   }
2214 no_sync:
2215   {
2216     GST_DEBUG_OBJECT (sink, "sync disabled");
2217     GST_OBJECT_UNLOCK (sink);
2218     return GST_CLOCK_BADTIME;
2219   }
2220 no_clock:
2221   {
2222     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2223     GST_OBJECT_UNLOCK (sink);
2224     return GST_CLOCK_BADTIME;
2225   }
2226 }
2227
2228 /**
2229  * gst_base_sink_wait_preroll:
2230  * @sink: the sink
2231  *
2232  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2233  * against the clock it must unblock when going from PLAYING to the PAUSED state
2234  * and call this method before continuing to render the remaining data.
2235  *
2236  * This function will block until a state change to PLAYING happens (in which
2237  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2238  * to a state change to READY or a FLUSH event (in which case this function
2239  * returns #GST_FLOW_WRONG_STATE).
2240  *
2241  * This function should only be called with the PREROLL_LOCK held, like in the
2242  * render function.
2243  *
2244  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2245  * continue. Any other return value should be returned from the render vmethod.
2246  *
2247  * Since: 0.10.11
2248  */
2249 GstFlowReturn
2250 gst_base_sink_wait_preroll (GstBaseSink * sink)
2251 {
2252   sink->have_preroll = TRUE;
2253   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2254   /* block until the state changes, or we get a flush, or something */
2255   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2256   sink->have_preroll = FALSE;
2257   if (G_UNLIKELY (sink->flushing))
2258     goto stopping;
2259   if (G_UNLIKELY (sink->priv->step_unlock))
2260     goto step_unlocked;
2261   GST_DEBUG_OBJECT (sink, "continue after preroll");
2262
2263   return GST_FLOW_OK;
2264
2265   /* ERRORS */
2266 stopping:
2267   {
2268     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2269     return GST_FLOW_WRONG_STATE;
2270   }
2271 step_unlocked:
2272   {
2273     sink->priv->step_unlock = FALSE;
2274     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2275     return GST_FLOW_STEP;
2276   }
2277 }
2278
2279 static inline guint8
2280 get_object_type (GstMiniObject * obj)
2281 {
2282   guint8 obj_type;
2283
2284   if (G_LIKELY (GST_IS_BUFFER (obj)))
2285     obj_type = _PR_IS_BUFFER;
2286   else if (GST_IS_EVENT (obj))
2287     obj_type = _PR_IS_EVENT;
2288   else if (GST_IS_BUFFER_LIST (obj))
2289     obj_type = _PR_IS_BUFFERLIST;
2290   else
2291     obj_type = _PR_IS_NOTHING;
2292
2293   return obj_type;
2294 }
2295
2296 /**
2297  * gst_base_sink_do_preroll:
2298  * @sink: the sink
2299  * @obj: (transfer none): the mini object that caused the preroll
2300  *
2301  * If the @sink spawns its own thread for pulling buffers from upstream it
2302  * should call this method after it has pulled a buffer. If the element needed
2303  * to preroll, this function will perform the preroll and will then block
2304  * until the element state is changed.
2305  *
2306  * This function should be called with the PREROLL_LOCK held.
2307  *
2308  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2309  * continue. Any other return value should be returned from the render vmethod.
2310  *
2311  * Since: 0.10.22
2312  */
2313 GstFlowReturn
2314 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2315 {
2316   GstFlowReturn ret;
2317
2318   while (G_UNLIKELY (sink->need_preroll)) {
2319     guint8 obj_type;
2320     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2321
2322     obj_type = get_object_type (obj);
2323
2324     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2325     if (ret != GST_FLOW_OK)
2326       goto preroll_failed;
2327
2328     /* need to recheck here because the commit state could have
2329      * made us not need the preroll anymore */
2330     if (G_LIKELY (sink->need_preroll)) {
2331       /* block until the state changes, or we get a flush, or something */
2332       ret = gst_base_sink_wait_preroll (sink);
2333       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2334         goto preroll_failed;
2335     }
2336   }
2337   return GST_FLOW_OK;
2338
2339   /* ERRORS */
2340 preroll_failed:
2341   {
2342     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2343     return ret;
2344   }
2345 }
2346
2347 /**
2348  * gst_base_sink_wait_eos:
2349  * @sink: the sink
2350  * @time: the running_time to be reached
2351  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2352  *
2353  * This function will block until @time is reached. It is usually called by
2354  * subclasses that use their own internal synchronisation but want to let the
2355  * EOS be handled by the base class.
2356  *
2357  * This function should only be called with the PREROLL_LOCK held, like when
2358  * receiving an EOS event in the ::event vmethod.
2359  *
2360  * The @time argument should be the running_time of when the EOS should happen
2361  * and will be adjusted with any latency and offset configured in the sink.
2362  *
2363  * Returns: #GstFlowReturn
2364  *
2365  * Since: 0.10.15
2366  */
2367 GstFlowReturn
2368 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2369     GstClockTimeDiff * jitter)
2370 {
2371   GstClockReturn status;
2372   GstFlowReturn ret;
2373
2374   do {
2375     GstClockTime stime;
2376
2377     GST_DEBUG_OBJECT (sink, "checking preroll");
2378
2379     /* first wait for the playing state before we can continue */
2380     while (G_UNLIKELY (sink->need_preroll)) {
2381       ret = gst_base_sink_wait_preroll (sink);
2382       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2383         goto flushing;
2384     }
2385
2386     /* preroll done, we can sync since we are in PLAYING now. */
2387     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2388         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2389
2390     /* compensate for latency and ts_offset. We don't adjust for render delay
2391      * because we don't interact with the device on EOS normally. */
2392     stime = gst_base_sink_adjust_time (sink, time);
2393
2394     /* wait for the clock, this can be interrupted because we got shut down or
2395      * we PAUSED. */
2396     status = gst_base_sink_wait_clock (sink, stime, jitter);
2397
2398     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2399
2400     /* invalid time, no clock or sync disabled, just continue then */
2401     if (status == GST_CLOCK_BADTIME)
2402       break;
2403
2404     /* waiting could have been interrupted and we can be flushing now */
2405     if (G_UNLIKELY (sink->flushing))
2406       goto flushing;
2407
2408     /* retry if we got unscheduled, which means we did not reach the timeout
2409      * yet. if some other error occures, we continue. */
2410   } while (status == GST_CLOCK_UNSCHEDULED);
2411
2412   GST_DEBUG_OBJECT (sink, "end of stream");
2413
2414   return GST_FLOW_OK;
2415
2416   /* ERRORS */
2417 flushing:
2418   {
2419     GST_DEBUG_OBJECT (sink, "we are flushing");
2420     return GST_FLOW_WRONG_STATE;
2421   }
2422 }
2423
2424 /* with STREAM_LOCK, PREROLL_LOCK
2425  *
2426  * Make sure we are in PLAYING and synchronize an object to the clock.
2427  *
2428  * If we need preroll, we are not in PLAYING. We try to commit the state
2429  * if needed and then block if we still are not PLAYING.
2430  *
2431  * We start waiting on the clock in PLAYING. If we got interrupted, we
2432  * immediatly try to re-preroll.
2433  *
2434  * Some objects do not need synchronisation (most events) and so this function
2435  * immediatly returns GST_FLOW_OK.
2436  *
2437  * for objects that arrive later than max-lateness to be synchronized to the
2438  * clock have the @late boolean set to TRUE.
2439  *
2440  * This function keeps a running average of the jitter (the diff between the
2441  * clock time and the requested sync time). The jitter is negative for
2442  * objects that arrive in time and positive for late buffers.
2443  *
2444  * does not take ownership of obj.
2445  */
2446 static GstFlowReturn
2447 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2448     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2449 {
2450   GstClockTimeDiff jitter = 0;
2451   gboolean syncable;
2452   GstClockReturn status = GST_CLOCK_OK;
2453   GstClockTime rstart, rstop, sstart, sstop, stime;
2454   gboolean do_sync;
2455   GstBaseSinkPrivate *priv;
2456   GstFlowReturn ret;
2457   GstStepInfo *current, *pending;
2458   gboolean stepped;
2459
2460   priv = basesink->priv;
2461
2462 do_step:
2463   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2464   do_sync = TRUE;
2465   stepped = FALSE;
2466
2467   priv->current_rstart = GST_CLOCK_TIME_NONE;
2468
2469   /* get stepping info */
2470   current = &priv->current_step;
2471   pending = &priv->pending_step;
2472
2473   /* get timing information for this object against the render segment */
2474   syncable = gst_base_sink_get_sync_times (basesink, obj,
2475       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2476       current, step_end, obj_type);
2477
2478   if (G_UNLIKELY (stepped))
2479     goto step_skipped;
2480
2481   /* a syncable object needs to participate in preroll and
2482    * clocking. All buffers and EOS are syncable. */
2483   if (G_UNLIKELY (!syncable))
2484     goto not_syncable;
2485
2486   /* store timing info for current object */
2487   priv->current_rstart = rstart;
2488   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2489
2490   /* save sync time for eos when the previous object needed sync */
2491   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2492
2493 again:
2494   /* first do preroll, this makes sure we commit our state
2495    * to PAUSED and can continue to PLAYING. We cannot perform
2496    * any clock sync in PAUSED because there is no clock. */
2497   ret = gst_base_sink_do_preroll (basesink, obj);
2498   if (G_UNLIKELY (ret != GST_FLOW_OK))
2499     goto preroll_failed;
2500
2501   /* update the segment with a pending step if the current one is invalid and we
2502    * have a new pending one. We only accept new step updates after a preroll */
2503   if (G_UNLIKELY (pending->valid && !current->valid)) {
2504     start_stepping (basesink, &basesink->segment, pending, current);
2505     goto do_step;
2506   }
2507
2508   /* After rendering we store the position of the last buffer so that we can use
2509    * it to report the position. We need to take the lock here. */
2510   GST_OBJECT_LOCK (basesink);
2511   priv->current_sstart = sstart;
2512   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2513   GST_OBJECT_UNLOCK (basesink);
2514
2515   if (!do_sync)
2516     goto done;
2517
2518   /* adjust for latency */
2519   stime = gst_base_sink_adjust_time (basesink, rstart);
2520
2521   /* adjust for render-delay, avoid underflows */
2522   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2523     if (stime > priv->render_delay)
2524       stime -= priv->render_delay;
2525     else
2526       stime = 0;
2527   }
2528
2529   /* preroll done, we can sync since we are in PLAYING now. */
2530   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2531       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2532       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2533
2534   /* This function will return immediatly if start == -1, no clock
2535    * or sync is disabled with GST_CLOCK_BADTIME. */
2536   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2537
2538   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2539       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2540
2541   /* invalid time, no clock or sync disabled, just render */
2542   if (status == GST_CLOCK_BADTIME)
2543     goto done;
2544
2545   /* waiting could have been interrupted and we can be flushing now */
2546   if (G_UNLIKELY (basesink->flushing))
2547     goto flushing;
2548
2549   /* check for unlocked by a state change, we are not flushing so
2550    * we can try to preroll on the current buffer. */
2551   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2552     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2553     priv->call_preroll = TRUE;
2554     goto again;
2555   }
2556
2557   /* successful syncing done, record observation */
2558   priv->current_jitter = jitter;
2559
2560   /* check if the object should be dropped */
2561   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2562       status, jitter);
2563
2564 done:
2565   return GST_FLOW_OK;
2566
2567   /* ERRORS */
2568 step_skipped:
2569   {
2570     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2571     *late = TRUE;
2572     return GST_FLOW_OK;
2573   }
2574 not_syncable:
2575   {
2576     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2577     return GST_FLOW_OK;
2578   }
2579 flushing:
2580   {
2581     GST_DEBUG_OBJECT (basesink, "we are flushing");
2582     return GST_FLOW_WRONG_STATE;
2583   }
2584 preroll_failed:
2585   {
2586     GST_DEBUG_OBJECT (basesink, "preroll failed");
2587     *step_end = FALSE;
2588     return ret;
2589   }
2590 }
2591
2592 static gboolean
2593 gst_base_sink_send_qos (GstBaseSink * basesink,
2594     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2595 {
2596   GstEvent *event;
2597   gboolean res;
2598
2599   /* generate Quality-of-Service event */
2600   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2601       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2602       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2603
2604   event = gst_event_new_qos (proportion, diff, time);
2605
2606   /* send upstream */
2607   res = gst_pad_push_event (basesink->sinkpad, event);
2608
2609   return res;
2610 }
2611
2612 static void
2613 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2614 {
2615   GstBaseSinkPrivate *priv;
2616   GstClockTime start, stop;
2617   GstClockTimeDiff jitter;
2618   GstClockTime pt, entered, left;
2619   GstClockTime duration;
2620   gdouble rate;
2621
2622   priv = sink->priv;
2623
2624   start = priv->current_rstart;
2625
2626   if (priv->current_step.valid)
2627     return;
2628
2629   /* if Quality-of-Service disabled, do nothing */
2630   if (!g_atomic_int_get (&priv->qos_enabled) ||
2631       !GST_CLOCK_TIME_IS_VALID (start))
2632     return;
2633
2634   stop = priv->current_rstop;
2635   jitter = priv->current_jitter;
2636
2637   if (jitter < 0) {
2638     /* this is the time the buffer entered the sink */
2639     if (start < -jitter)
2640       entered = 0;
2641     else
2642       entered = start + jitter;
2643     left = start;
2644   } else {
2645     /* this is the time the buffer entered the sink */
2646     entered = start + jitter;
2647     /* this is the time the buffer left the sink */
2648     left = start + jitter;
2649   }
2650
2651   /* calculate duration of the buffer */
2652   if (GST_CLOCK_TIME_IS_VALID (stop))
2653     duration = stop - start;
2654   else
2655     duration = GST_CLOCK_TIME_NONE;
2656
2657   /* if we have the time when the last buffer left us, calculate
2658    * processing time */
2659   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2660     if (entered > priv->last_left) {
2661       pt = entered - priv->last_left;
2662     } else {
2663       pt = 0;
2664     }
2665   } else {
2666     pt = priv->avg_pt;
2667   }
2668
2669   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2670       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2671       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2672       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2673       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2674       jitter);
2675
2676   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2677       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2678       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2679       priv->avg_rate);
2680
2681   /* collect running averages. for first observations, we copy the
2682    * values */
2683   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2684     priv->avg_duration = duration;
2685   else
2686     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2687
2688   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2689     priv->avg_pt = pt;
2690   else
2691     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2692
2693   if (priv->avg_duration != 0)
2694     rate =
2695         gst_guint64_to_gdouble (priv->avg_pt) /
2696         gst_guint64_to_gdouble (priv->avg_duration);
2697   else
2698     rate = 0.0;
2699
2700   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2701     if (dropped || priv->avg_rate < 0.0) {
2702       priv->avg_rate = rate;
2703     } else {
2704       if (rate > 1.0)
2705         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2706       else
2707         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2708     }
2709   }
2710
2711   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2712       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2713       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2714       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2715
2716
2717   if (priv->avg_rate >= 0.0) {
2718     /* if we have a valid rate, start sending QoS messages */
2719     if (priv->current_jitter < 0) {
2720       /* make sure we never go below 0 when adding the jitter to the
2721        * timestamp. */
2722       if (priv->current_rstart < -priv->current_jitter)
2723         priv->current_jitter = -priv->current_rstart;
2724     }
2725     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2726         priv->current_jitter);
2727   }
2728
2729   /* record when this buffer will leave us */
2730   priv->last_left = left;
2731 }
2732
2733 /* reset all qos measuring */
2734 static void
2735 gst_base_sink_reset_qos (GstBaseSink * sink)
2736 {
2737   GstBaseSinkPrivate *priv;
2738
2739   priv = sink->priv;
2740
2741   priv->last_in_time = GST_CLOCK_TIME_NONE;
2742   priv->last_left = GST_CLOCK_TIME_NONE;
2743   priv->avg_duration = GST_CLOCK_TIME_NONE;
2744   priv->avg_pt = GST_CLOCK_TIME_NONE;
2745   priv->avg_rate = -1.0;
2746   priv->avg_render = GST_CLOCK_TIME_NONE;
2747   priv->rendered = 0;
2748   priv->dropped = 0;
2749
2750 }
2751
2752 /* Checks if the object was scheduled too late.
2753  *
2754  * start/stop contain the raw timestamp start and stop values
2755  * of the object.
2756  *
2757  * status and jitter contain the return values from the clock wait.
2758  *
2759  * returns TRUE if the buffer was too late.
2760  */
2761 static gboolean
2762 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2763     GstClockTime start, GstClockTime stop,
2764     GstClockReturn status, GstClockTimeDiff jitter)
2765 {
2766   gboolean late;
2767   gint64 max_lateness;
2768   GstBaseSinkPrivate *priv;
2769
2770   priv = basesink->priv;
2771
2772   late = FALSE;
2773
2774   /* only for objects that were too late */
2775   if (G_LIKELY (status != GST_CLOCK_EARLY))
2776     goto in_time;
2777
2778   max_lateness = basesink->abidata.ABI.max_lateness;
2779
2780   /* check if frame dropping is enabled */
2781   if (max_lateness == -1)
2782     goto no_drop;
2783
2784   /* only check for buffers */
2785   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2786     goto not_buffer;
2787
2788   /* can't do check if we don't have a timestamp */
2789   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (start)))
2790     goto no_timestamp;
2791
2792   /* we can add a valid stop time */
2793   if (GST_CLOCK_TIME_IS_VALID (stop))
2794     max_lateness += stop;
2795   else
2796     max_lateness += start;
2797
2798   /* if the jitter bigger than duration and lateness we are too late */
2799   if ((late = start + jitter > max_lateness)) {
2800     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2801         "buffer is too late %" GST_TIME_FORMAT
2802         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2803         GST_TIME_ARGS (max_lateness));
2804     /* !!emergency!!, if we did not receive anything valid for more than a
2805      * second, render it anyway so the user sees something */
2806     if (GST_CLOCK_TIME_IS_VALID (priv->last_in_time) &&
2807         start - priv->last_in_time > GST_SECOND) {
2808       late = FALSE;
2809       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2810           (_("A lot of buffers are being dropped.")),
2811           ("There may be a timestamping problem, or this computer is too slow."));
2812       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2813           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2814           GST_TIME_ARGS (priv->last_in_time));
2815     }
2816   }
2817
2818 done:
2819   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_in_time)) {
2820     priv->last_in_time = start;
2821   }
2822   return late;
2823
2824   /* all is fine */
2825 in_time:
2826   {
2827     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2828     goto done;
2829   }
2830 no_drop:
2831   {
2832     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2833     goto done;
2834   }
2835 not_buffer:
2836   {
2837     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2838     return FALSE;
2839   }
2840 no_timestamp:
2841   {
2842     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2843     return FALSE;
2844   }
2845 }
2846
2847 /* called before and after calling the render vmethod. It keeps track of how
2848  * much time was spent in the render method and is used to check if we are
2849  * flooded */
2850 static void
2851 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2852 {
2853   GstBaseSinkPrivate *priv;
2854
2855   priv = basesink->priv;
2856
2857   if (start) {
2858     priv->start = gst_util_get_timestamp ();
2859   } else {
2860     GstClockTime elapsed;
2861
2862     priv->stop = gst_util_get_timestamp ();
2863
2864     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2865
2866     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2867       priv->avg_render = elapsed;
2868     else
2869       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2870
2871     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2872         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2873   }
2874 }
2875
2876 /* with STREAM_LOCK, PREROLL_LOCK,
2877  *
2878  * Synchronize the object on the clock and then render it.
2879  *
2880  * takes ownership of obj.
2881  */
2882 static GstFlowReturn
2883 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2884     guint8 obj_type, gpointer obj)
2885 {
2886   GstFlowReturn ret;
2887   GstBaseSinkClass *bclass;
2888   gboolean late, step_end;
2889   gpointer sync_obj;
2890   GstBaseSinkPrivate *priv;
2891
2892   priv = basesink->priv;
2893
2894   if (OBJ_IS_BUFFERLIST (obj_type)) {
2895     /*
2896      * If buffer list, use the first group buffer within the list
2897      * for syncing
2898      */
2899     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2900     g_assert (NULL != sync_obj);
2901   } else {
2902     sync_obj = obj;
2903   }
2904
2905 again:
2906   late = FALSE;
2907   step_end = FALSE;
2908
2909   /* synchronize this object, non syncable objects return OK
2910    * immediatly. */
2911   ret =
2912       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2913       obj_type);
2914   if (G_UNLIKELY (ret != GST_FLOW_OK))
2915     goto sync_failed;
2916
2917   /* and now render, event or buffer/buffer list. */
2918   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2919     /* drop late buffers unconditionally, let's hope it's unlikely */
2920     if (G_UNLIKELY (late))
2921       goto dropped;
2922
2923     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2924
2925     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2926             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2927       gint do_qos;
2928
2929       /* read once, to get same value before and after */
2930       do_qos = g_atomic_int_get (&priv->qos_enabled);
2931
2932       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2933
2934       /* record rendering time for QoS and stats */
2935       if (do_qos)
2936         gst_base_sink_do_render_stats (basesink, TRUE);
2937
2938       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2939         GstBuffer *buf;
2940
2941         /* For buffer lists do not set last buffer. Creating buffer
2942          * with meaningful data can be done only with memcpy which will
2943          * significantly affect performance */
2944         buf = GST_BUFFER_CAST (obj);
2945         gst_base_sink_set_last_buffer (basesink, buf);
2946
2947         ret = bclass->render (basesink, buf);
2948       } else {
2949         GstBufferList *buflist;
2950
2951         buflist = GST_BUFFER_LIST_CAST (obj);
2952
2953         ret = bclass->render_list (basesink, buflist);
2954       }
2955
2956       if (do_qos)
2957         gst_base_sink_do_render_stats (basesink, FALSE);
2958
2959       if (ret == GST_FLOW_STEP)
2960         goto again;
2961
2962       if (G_UNLIKELY (basesink->flushing))
2963         goto flushing;
2964
2965       priv->rendered++;
2966     }
2967   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
2968     GstEvent *event = GST_EVENT_CAST (obj);
2969     gboolean event_res = TRUE;
2970     GstEventType type;
2971
2972     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2973
2974     type = GST_EVENT_TYPE (event);
2975
2976     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2977         gst_event_type_get_name (type));
2978
2979     if (bclass->event)
2980       event_res = bclass->event (basesink, event);
2981
2982     /* when we get here we could be flushing again when the event handler calls
2983      * _wait_eos(). We have to ignore this object in that case. */
2984     if (G_UNLIKELY (basesink->flushing))
2985       goto flushing;
2986
2987     if (G_LIKELY (event_res)) {
2988       guint32 seqnum;
2989
2990       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2991       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2992
2993       switch (type) {
2994         case GST_EVENT_EOS:
2995         {
2996           GstMessage *message;
2997
2998           /* the EOS event is completely handled so we mark
2999            * ourselves as being in the EOS state. eos is also
3000            * protected by the object lock so we can read it when
3001            * answering the POSITION query. */
3002           GST_OBJECT_LOCK (basesink);
3003           basesink->eos = TRUE;
3004           GST_OBJECT_UNLOCK (basesink);
3005
3006           /* ok, now we can post the message */
3007           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3008
3009           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3010           gst_message_set_seqnum (message, seqnum);
3011           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3012           break;
3013         }
3014         case GST_EVENT_NEWSEGMENT:
3015           /* configure the segment */
3016           gst_base_sink_configure_segment (basesink, pad, event,
3017               &basesink->segment);
3018           break;
3019         case GST_EVENT_SINK_MESSAGE:{
3020           GstMessage *msg = NULL;
3021
3022           gst_event_parse_sink_message (event, &msg);
3023
3024           if (msg)
3025             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3026         }
3027         default:
3028           break;
3029       }
3030     }
3031   } else {
3032     g_return_val_if_reached (GST_FLOW_ERROR);
3033   }
3034
3035 done:
3036   if (step_end) {
3037     /* the step ended, check if we need to activate a new step */
3038     GST_DEBUG_OBJECT (basesink, "step ended");
3039     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3040         priv->current_rstart, priv->current_rstop, basesink->eos);
3041     goto again;
3042   }
3043
3044   gst_base_sink_perform_qos (basesink, late);
3045
3046   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3047   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3048   return ret;
3049
3050   /* ERRORS */
3051 sync_failed:
3052   {
3053     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3054     goto done;
3055   }
3056 dropped:
3057   {
3058     priv->dropped++;
3059     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3060
3061     if (g_atomic_int_get (&priv->qos_enabled)) {
3062       GstMessage *qos_msg;
3063       GstClockTime timestamp, duration;
3064
3065       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3066       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3067
3068       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3069           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3070           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3071           GST_TIME_ARGS (priv->current_rstart),
3072           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3073           GST_TIME_ARGS (duration));
3074       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3075           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3076           priv->rendered, priv->dropped);
3077
3078       qos_msg =
3079           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3080           priv->current_rstart, priv->current_sstart, timestamp, duration);
3081       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3082           1000000);
3083       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3084           priv->dropped);
3085       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3086     }
3087     goto done;
3088   }
3089 flushing:
3090   {
3091     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3092     gst_mini_object_unref (obj);
3093     return GST_FLOW_WRONG_STATE;
3094   }
3095 }
3096
3097 /* with STREAM_LOCK, PREROLL_LOCK
3098  *
3099  * Perform preroll on the given object. For buffers this means
3100  * calling the preroll subclass method.
3101  * If that succeeds, the state will be commited.
3102  *
3103  * function does not take ownership of obj.
3104  */
3105 static GstFlowReturn
3106 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3107     GstMiniObject * obj)
3108 {
3109   GstFlowReturn ret;
3110
3111   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3112
3113   /* if it's a buffer, we need to call the preroll method */
3114   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3115     GstBaseSinkClass *bclass;
3116     GstBuffer *buf;
3117     GstClockTime timestamp;
3118
3119     if (OBJ_IS_BUFFERLIST (obj_type)) {
3120       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3121       g_assert (NULL != buf);
3122     } else {
3123       buf = GST_BUFFER_CAST (obj);
3124     }
3125
3126     timestamp = GST_BUFFER_TIMESTAMP (buf);
3127
3128     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3129         GST_TIME_ARGS (timestamp));
3130
3131     /*
3132      * For buffer lists do not set last buffer. Creating buffer
3133      * with meaningful data can be done only with memcpy which will
3134      * significantly affect performance
3135      */
3136     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3137       gst_base_sink_set_last_buffer (basesink, buf);
3138     }
3139
3140     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3141     if (bclass->preroll)
3142       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3143         goto preroll_failed;
3144
3145     basesink->priv->call_preroll = FALSE;
3146   }
3147
3148   /* commit state */
3149   if (G_LIKELY (basesink->playing_async)) {
3150     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3151       goto stopping;
3152   }
3153
3154   return GST_FLOW_OK;
3155
3156   /* ERRORS */
3157 preroll_failed:
3158   {
3159     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3160     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3161     return ret;
3162   }
3163 stopping:
3164   {
3165     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3166     return GST_FLOW_WRONG_STATE;
3167   }
3168 }
3169
3170 /* with STREAM_LOCK, PREROLL_LOCK
3171  *
3172  * Queue an object for rendering.
3173  * The first prerollable object queued will complete the preroll. If the
3174  * preroll queue if filled, we render all the objects in the queue.
3175  *
3176  * This function takes ownership of the object.
3177  */
3178 static GstFlowReturn
3179 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3180     guint8 obj_type, gpointer obj, gboolean prerollable)
3181 {
3182   GstFlowReturn ret = GST_FLOW_OK;
3183   gint length;
3184   GQueue *q;
3185
3186   if (G_UNLIKELY (basesink->need_preroll)) {
3187     if (G_LIKELY (prerollable))
3188       basesink->preroll_queued++;
3189
3190     length = basesink->preroll_queued;
3191
3192     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3193
3194     /* first prerollable item needs to finish the preroll */
3195     if (length == 1) {
3196       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3197       if (G_UNLIKELY (ret != GST_FLOW_OK))
3198         goto preroll_failed;
3199     }
3200     /* need to recheck if we need preroll, commmit state during preroll
3201      * could have made us not need more preroll. */
3202     if (G_UNLIKELY (basesink->need_preroll)) {
3203       /* see if we can render now, if we can't add the object to the preroll
3204        * queue. */
3205       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3206         goto more_preroll;
3207     }
3208   }
3209   /* we can start rendering (or blocking) the queued object
3210    * if any. */
3211   q = basesink->preroll_queue;
3212   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3213     GstMiniObject *o;
3214     guint8 ot;
3215
3216     o = g_queue_pop_head (q);
3217     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3218
3219     ot = get_object_type (o);
3220
3221     /* do something with the return value */
3222     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3223     if (ret != GST_FLOW_OK)
3224       goto dequeue_failed;
3225   }
3226
3227   /* now render the object */
3228   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3229   basesink->preroll_queued = 0;
3230
3231   return ret;
3232
3233   /* special cases */
3234 preroll_failed:
3235   {
3236     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3237         gst_flow_get_name (ret));
3238     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3239     return ret;
3240   }
3241 more_preroll:
3242   {
3243     /* add object to the queue and return */
3244     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3245         length, basesink->preroll_queue_max_len);
3246     g_queue_push_tail (basesink->preroll_queue, obj);
3247     return GST_FLOW_OK;
3248   }
3249 dequeue_failed:
3250   {
3251     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3252         gst_flow_get_name (ret));
3253     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3254     return ret;
3255   }
3256 }
3257
3258 /* with STREAM_LOCK
3259  *
3260  * This function grabs the PREROLL_LOCK and adds the object to
3261  * the queue.
3262  *
3263  * This function takes ownership of obj.
3264  *
3265  * Note: Only GstEvent seem to be passed to this private method
3266  */
3267 static GstFlowReturn
3268 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3269     GstMiniObject * obj, gboolean prerollable)
3270 {
3271   GstFlowReturn ret;
3272
3273   GST_PAD_PREROLL_LOCK (pad);
3274   if (G_UNLIKELY (basesink->flushing))
3275     goto flushing;
3276
3277   if (G_UNLIKELY (basesink->priv->received_eos))
3278     goto was_eos;
3279
3280   ret =
3281       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3282       prerollable);
3283   GST_PAD_PREROLL_UNLOCK (pad);
3284
3285   return ret;
3286
3287   /* ERRORS */
3288 flushing:
3289   {
3290     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3291     GST_PAD_PREROLL_UNLOCK (pad);
3292     gst_mini_object_unref (obj);
3293     return GST_FLOW_WRONG_STATE;
3294   }
3295 was_eos:
3296   {
3297     GST_DEBUG_OBJECT (basesink,
3298         "we are EOS, dropping object, return UNEXPECTED");
3299     GST_PAD_PREROLL_UNLOCK (pad);
3300     gst_mini_object_unref (obj);
3301     return GST_FLOW_UNEXPECTED;
3302   }
3303 }
3304
3305 static void
3306 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3307 {
3308   /* make sure we are not blocked on the clock also clear any pending
3309    * eos state. */
3310   gst_base_sink_set_flushing (basesink, pad, TRUE);
3311
3312   /* we grab the stream lock but that is not needed since setting the
3313    * sink to flushing would make sure no state commit is being done
3314    * anymore */
3315   GST_PAD_STREAM_LOCK (pad);
3316   gst_base_sink_reset_qos (basesink);
3317   /* and we need to commit our state again on the next
3318    * prerolled buffer */
3319   basesink->playing_async = TRUE;
3320   if (basesink->priv->async_enabled) {
3321     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3322   } else {
3323     basesink->priv->have_latency = TRUE;
3324   }
3325   gst_base_sink_set_last_buffer (basesink, NULL);
3326   GST_PAD_STREAM_UNLOCK (pad);
3327 }
3328
3329 static void
3330 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3331 {
3332   /* unset flushing so we can accept new data, this also flushes out any EOS
3333    * event. */
3334   gst_base_sink_set_flushing (basesink, pad, FALSE);
3335
3336   /* for position reporting */
3337   GST_OBJECT_LOCK (basesink);
3338   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3339   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3340   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3341   basesink->priv->call_preroll = TRUE;
3342   basesink->priv->current_step.valid = FALSE;
3343   basesink->priv->pending_step.valid = FALSE;
3344   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3345     /* we need new segment info after the flush. */
3346     basesink->have_newsegment = FALSE;
3347     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3348     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3349   }
3350   GST_OBJECT_UNLOCK (basesink);
3351 }
3352
3353 static gboolean
3354 gst_base_sink_event (GstPad * pad, GstEvent * event)
3355 {
3356   GstBaseSink *basesink;
3357   gboolean result = TRUE;
3358   GstBaseSinkClass *bclass;
3359
3360   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3361
3362   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3363
3364   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3365       event);
3366
3367   switch (GST_EVENT_TYPE (event)) {
3368     case GST_EVENT_EOS:
3369     {
3370       GstFlowReturn ret;
3371
3372       GST_PAD_PREROLL_LOCK (pad);
3373       if (G_UNLIKELY (basesink->flushing))
3374         goto flushing;
3375
3376       if (G_UNLIKELY (basesink->priv->received_eos)) {
3377         /* we can't accept anything when we are EOS */
3378         result = FALSE;
3379         gst_event_unref (event);
3380       } else {
3381         /* we set the received EOS flag here so that we can use it when testing if
3382          * we are prerolled and to refuse more buffers. */
3383         basesink->priv->received_eos = TRUE;
3384
3385         /* EOS is a prerollable object, we call the unlocked version because it
3386          * does not check the received_eos flag. */
3387         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3388             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3389         if (G_UNLIKELY (ret != GST_FLOW_OK))
3390           result = FALSE;
3391       }
3392       GST_PAD_PREROLL_UNLOCK (pad);
3393       break;
3394     }
3395     case GST_EVENT_NEWSEGMENT:
3396     {
3397       GstFlowReturn ret;
3398       gboolean update;
3399
3400       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3401
3402       GST_PAD_PREROLL_LOCK (pad);
3403       if (G_UNLIKELY (basesink->flushing))
3404         goto flushing;
3405
3406       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3407           NULL, NULL);
3408
3409       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3410         /* we can't accept anything when we are EOS */
3411         result = FALSE;
3412         gst_event_unref (event);
3413       } else {
3414         /* the new segment is a non prerollable item and does not block anything,
3415          * we need to configure the current clipping segment and insert the event
3416          * in the queue to serialize it with the buffers for rendering. */
3417         gst_base_sink_configure_segment (basesink, pad, event,
3418             basesink->abidata.ABI.clip_segment);
3419
3420         ret =
3421             gst_base_sink_queue_object_unlocked (basesink, pad,
3422             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3423         if (G_UNLIKELY (ret != GST_FLOW_OK))
3424           result = FALSE;
3425         else {
3426           GST_OBJECT_LOCK (basesink);
3427           basesink->have_newsegment = TRUE;
3428           GST_OBJECT_UNLOCK (basesink);
3429         }
3430       }
3431       GST_PAD_PREROLL_UNLOCK (pad);
3432       break;
3433     }
3434     case GST_EVENT_FLUSH_START:
3435       if (bclass->event)
3436         bclass->event (basesink, event);
3437
3438       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3439
3440       gst_base_sink_flush_start (basesink, pad);
3441
3442       gst_event_unref (event);
3443       break;
3444     case GST_EVENT_FLUSH_STOP:
3445       if (bclass->event)
3446         bclass->event (basesink, event);
3447
3448       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3449
3450       gst_base_sink_flush_stop (basesink, pad);
3451
3452       gst_event_unref (event);
3453       break;
3454     default:
3455       /* other events are sent to queue or subclass depending on if they
3456        * are serialized. */
3457       if (GST_EVENT_IS_SERIALIZED (event)) {
3458         gst_base_sink_queue_object (basesink, pad,
3459             GST_MINI_OBJECT_CAST (event), FALSE);
3460       } else {
3461         if (bclass->event)
3462           bclass->event (basesink, event);
3463         gst_event_unref (event);
3464       }
3465       break;
3466   }
3467 done:
3468   gst_object_unref (basesink);
3469
3470   return result;
3471
3472   /* ERRORS */
3473 flushing:
3474   {
3475     GST_DEBUG_OBJECT (basesink, "we are flushing");
3476     GST_PAD_PREROLL_UNLOCK (pad);
3477     result = FALSE;
3478     gst_event_unref (event);
3479     goto done;
3480   }
3481 }
3482
3483 /* default implementation to calculate the start and end
3484  * timestamps on a buffer, subclasses can override
3485  */
3486 static void
3487 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3488     GstClockTime * start, GstClockTime * end)
3489 {
3490   GstClockTime timestamp, duration;
3491
3492   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3493   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3494
3495     /* get duration to calculate end time */
3496     duration = GST_BUFFER_DURATION (buffer);
3497     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3498       *end = timestamp + duration;
3499     }
3500     *start = timestamp;
3501   }
3502 }
3503
3504 /* must be called with PREROLL_LOCK */
3505 static gboolean
3506 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3507 {
3508   gboolean is_prerolled, res;
3509
3510   /* we have 2 cases where the PREROLL_LOCK is released:
3511    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3512    *  2) we are syncing on the clock
3513    */
3514   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3515   res = !is_prerolled;
3516
3517   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3518       basesink->have_preroll, basesink->priv->received_eos, res);
3519
3520   return res;
3521 }
3522
3523 /* with STREAM_LOCK, PREROLL_LOCK
3524  *
3525  * Takes a buffer and compare the timestamps with the last segment.
3526  * If the buffer falls outside of the segment boundaries, drop it.
3527  * Else queue the buffer for preroll and rendering.
3528  *
3529  * This function takes ownership of the buffer.
3530  */
3531 static GstFlowReturn
3532 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3533     guint8 obj_type, gpointer obj)
3534 {
3535   GstBaseSinkClass *bclass;
3536   GstFlowReturn result;
3537   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3538   GstSegment *clip_segment;
3539   GstBuffer *time_buf;
3540
3541   if (G_UNLIKELY (basesink->flushing))
3542     goto flushing;
3543
3544   if (G_UNLIKELY (basesink->priv->received_eos))
3545     goto was_eos;
3546
3547   if (OBJ_IS_BUFFERLIST (obj_type)) {
3548     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3549     g_assert (NULL != time_buf);
3550   } else {
3551     time_buf = GST_BUFFER_CAST (obj);
3552   }
3553
3554   /* for code clarity */
3555   clip_segment = basesink->abidata.ABI.clip_segment;
3556
3557   if (G_UNLIKELY (!basesink->have_newsegment)) {
3558     gboolean sync;
3559
3560     sync = gst_base_sink_get_sync (basesink);
3561     if (sync) {
3562       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3563           (_("Internal data flow problem.")),
3564           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3565     }
3566
3567     /* this means this sink will assume timestamps start from 0 */
3568     GST_OBJECT_LOCK (basesink);
3569     clip_segment->start = 0;
3570     clip_segment->stop = -1;
3571     basesink->segment.start = 0;
3572     basesink->segment.stop = -1;
3573     basesink->have_newsegment = TRUE;
3574     GST_OBJECT_UNLOCK (basesink);
3575   }
3576
3577   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3578
3579   /* check if the buffer needs to be dropped, we first ask the subclass for the
3580    * start and end */
3581   if (bclass->get_times)
3582     bclass->get_times (basesink, time_buf, &start, &end);
3583
3584   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3585     /* if the subclass does not want sync, we use our own values so that we at
3586      * least clip the buffer to the segment */
3587     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3588   }
3589
3590   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3591       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3592
3593   /* a dropped buffer does not participate in anything */
3594   if (GST_CLOCK_TIME_IS_VALID (start) &&
3595       (clip_segment->format == GST_FORMAT_TIME)) {
3596     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3597                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3598       goto out_of_segment;
3599   }
3600
3601   /* now we can process the buffer in the queue, this function takes ownership
3602    * of the buffer */
3603   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3604       obj_type, obj, TRUE);
3605   return result;
3606
3607   /* ERRORS */
3608 flushing:
3609   {
3610     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3611     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3612     return GST_FLOW_WRONG_STATE;
3613   }
3614 was_eos:
3615   {
3616     GST_DEBUG_OBJECT (basesink,
3617         "we are EOS, dropping object, return UNEXPECTED");
3618     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3619     return GST_FLOW_UNEXPECTED;
3620   }
3621 out_of_segment:
3622   {
3623     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3624     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3625     return GST_FLOW_OK;
3626   }
3627 }
3628
3629 /* with STREAM_LOCK
3630  */
3631 static GstFlowReturn
3632 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3633     guint8 obj_type, gpointer obj)
3634 {
3635   GstFlowReturn result;
3636
3637   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3638     goto wrong_mode;
3639
3640   GST_PAD_PREROLL_LOCK (pad);
3641   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3642   GST_PAD_PREROLL_UNLOCK (pad);
3643
3644 done:
3645   return result;
3646
3647   /* ERRORS */
3648 wrong_mode:
3649   {
3650     GST_OBJECT_LOCK (pad);
3651     GST_WARNING_OBJECT (basesink,
3652         "Push on pad %s:%s, but it was not activated in push mode",
3653         GST_DEBUG_PAD_NAME (pad));
3654     GST_OBJECT_UNLOCK (pad);
3655     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3656     /* we don't post an error message this will signal to the peer
3657      * pushing that EOS is reached. */
3658     result = GST_FLOW_UNEXPECTED;
3659     goto done;
3660   }
3661 }
3662
3663 static GstFlowReturn
3664 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3665 {
3666   GstBaseSink *basesink;
3667
3668   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3669
3670   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3671 }
3672
3673 static GstFlowReturn
3674 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3675 {
3676   GstBaseSink *basesink;
3677   GstBaseSinkClass *bclass;
3678   GstFlowReturn result;
3679
3680   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3681   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3682
3683   if (G_LIKELY (bclass->render_list)) {
3684     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3685   } else {
3686     GstBufferListIterator *it;
3687     GstBuffer *group;
3688
3689     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3690
3691     it = gst_buffer_list_iterate (list);
3692
3693     if (gst_buffer_list_iterator_next_group (it)) {
3694       do {
3695         group = gst_buffer_list_iterator_merge_group (it);
3696         if (group == NULL) {
3697           group = gst_buffer_new ();
3698           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3699         } else {
3700           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3701         }
3702         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3703       } while (result == GST_FLOW_OK
3704           && gst_buffer_list_iterator_next_group (it));
3705     } else {
3706       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3707       result =
3708           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3709           gst_buffer_new ());
3710     }
3711     gst_buffer_list_iterator_free (it);
3712     gst_buffer_list_unref (list);
3713   }
3714   return result;
3715 }
3716
3717
3718 static gboolean
3719 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3720 {
3721   gboolean res = TRUE;
3722
3723   /* update our offset if the start/stop position was updated */
3724   if (segment->format == GST_FORMAT_BYTES) {
3725     segment->time = segment->start;
3726   } else if (segment->start == 0) {
3727     /* seek to start, we can implement a default for this. */
3728     segment->time = 0;
3729   } else {
3730     res = FALSE;
3731     GST_INFO_OBJECT (sink, "Can't do a default seek");
3732   }
3733
3734   return res;
3735 }
3736
3737 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3738
3739 static gboolean
3740 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3741     GstEvent * event, GstSegment * segment)
3742 {
3743   /* By default, we try one of 2 things:
3744    *   - For absolute seek positions, convert the requested position to our
3745    *     configured processing format and place it in the output segment \
3746    *   - For relative seek positions, convert our current (input) values to the
3747    *     seek format, adjust by the relative seek offset and then convert back to
3748    *     the processing format
3749    */
3750   GstSeekType cur_type, stop_type;
3751   gint64 cur, stop;
3752   GstSeekFlags flags;
3753   GstFormat seek_format, dest_format;
3754   gdouble rate;
3755   gboolean update;
3756   gboolean res = TRUE;
3757
3758   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3759       &cur_type, &cur, &stop_type, &stop);
3760   dest_format = segment->format;
3761
3762   if (seek_format == dest_format) {
3763     gst_segment_set_seek (segment, rate, seek_format, flags,
3764         cur_type, cur, stop_type, stop, &update);
3765     return TRUE;
3766   }
3767
3768   if (cur_type != GST_SEEK_TYPE_NONE) {
3769     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3770     res =
3771         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3772         &cur);
3773     cur_type = GST_SEEK_TYPE_SET;
3774   }
3775
3776   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3777     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3778     res =
3779         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3780         &stop);
3781     stop_type = GST_SEEK_TYPE_SET;
3782   }
3783
3784   /* And finally, configure our output segment in the desired format */
3785   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3786       stop_type, stop, &update);
3787
3788   if (!res)
3789     goto no_format;
3790
3791   return res;
3792
3793 no_format:
3794   {
3795     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3796     return FALSE;
3797   }
3798 }
3799
3800 /* perform a seek, only executed in pull mode */
3801 static gboolean
3802 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3803 {
3804   gboolean flush;
3805   gdouble rate;
3806   GstFormat seek_format, dest_format;
3807   GstSeekFlags flags;
3808   GstSeekType cur_type, stop_type;
3809   gboolean seekseg_configured = FALSE;
3810   gint64 cur, stop;
3811   gboolean update, res = TRUE;
3812   GstSegment seeksegment;
3813
3814   dest_format = sink->segment.format;
3815
3816   if (event) {
3817     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3818     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3819         &cur_type, &cur, &stop_type, &stop);
3820
3821     flush = flags & GST_SEEK_FLAG_FLUSH;
3822   } else {
3823     GST_DEBUG_OBJECT (sink, "performing seek without event");
3824     flush = FALSE;
3825   }
3826
3827   if (flush) {
3828     GST_DEBUG_OBJECT (sink, "flushing upstream");
3829     gst_pad_push_event (pad, gst_event_new_flush_start ());
3830     gst_base_sink_flush_start (sink, pad);
3831   } else {
3832     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3833   }
3834
3835   GST_PAD_STREAM_LOCK (pad);
3836
3837   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3838    * copy the current segment info into the temp segment that we can actually
3839    * attempt the seek with. We only update the real segment if the seek suceeds. */
3840   if (!seekseg_configured) {
3841     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3842
3843     /* now configure the final seek segment */
3844     if (event) {
3845       if (sink->segment.format != seek_format) {
3846         /* OK, here's where we give the subclass a chance to convert the relative
3847          * seek into an absolute one in the processing format. We set up any
3848          * absolute seek above, before taking the stream lock. */
3849         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3850                 &seeksegment)) {
3851           GST_DEBUG_OBJECT (sink,
3852               "Preparing the seek failed after flushing. " "Aborting seek");
3853           res = FALSE;
3854         }
3855       } else {
3856         /* The seek format matches our processing format, no need to ask the
3857          * the subclass to configure the segment. */
3858         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3859             cur_type, cur, stop_type, stop, &update);
3860       }
3861     }
3862     /* Else, no seek event passed, so we're just (re)starting the
3863        current segment. */
3864   }
3865
3866   if (res) {
3867     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3868         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3869         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3870
3871     /* do the seek, segment.last_stop contains the new position. */
3872     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3873   }
3874
3875
3876   if (flush) {
3877     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3878     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3879     gst_base_sink_flush_stop (sink, pad);
3880   } else if (res && sink->abidata.ABI.running) {
3881     /* we are running the current segment and doing a non-flushing seek,
3882      * close the segment first based on the last_stop. */
3883     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3884         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3885   }
3886
3887   /* The subclass must have converted the segment to the processing format
3888    * by now */
3889   if (res && seeksegment.format != dest_format) {
3890     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3891         "in the correct format. Aborting seek.");
3892     res = FALSE;
3893   }
3894
3895   /* if successfull seek, we update our real segment and push
3896    * out the new segment. */
3897   if (res) {
3898     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3899
3900     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3901       gst_element_post_message (GST_ELEMENT (sink),
3902           gst_message_new_segment_start (GST_OBJECT (sink),
3903               sink->segment.format, sink->segment.last_stop));
3904     }
3905   }
3906
3907   sink->priv->discont = TRUE;
3908   sink->abidata.ABI.running = TRUE;
3909
3910   GST_PAD_STREAM_UNLOCK (pad);
3911
3912   return res;
3913 }
3914
3915 static void
3916 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3917     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3918     gboolean flush, gboolean intermediate)
3919 {
3920   GST_OBJECT_LOCK (sink);
3921   pending->seqnum = seqnum;
3922   pending->format = format;
3923   pending->amount = amount;
3924   pending->position = 0;
3925   pending->rate = rate;
3926   pending->flush = flush;
3927   pending->intermediate = intermediate;
3928   pending->valid = TRUE;
3929   /* flush invalidates the current stepping segment */
3930   if (flush)
3931     current->valid = FALSE;
3932   GST_OBJECT_UNLOCK (sink);
3933 }
3934
3935 static gboolean
3936 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3937 {
3938   GstBaseSinkPrivate *priv;
3939   GstBaseSinkClass *bclass;
3940   gboolean flush, intermediate;
3941   gdouble rate;
3942   GstFormat format;
3943   guint64 amount;
3944   guint seqnum;
3945   GstStepInfo *pending, *current;
3946   GstMessage *message;
3947
3948   bclass = GST_BASE_SINK_GET_CLASS (sink);
3949   priv = sink->priv;
3950
3951   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3952
3953   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3954   seqnum = gst_event_get_seqnum (event);
3955
3956   pending = &priv->pending_step;
3957   current = &priv->current_step;
3958
3959   /* post message first */
3960   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3961       amount, rate, flush, intermediate);
3962   gst_message_set_seqnum (message, seqnum);
3963   gst_element_post_message (GST_ELEMENT (sink), message);
3964
3965   if (flush) {
3966     /* we need to call ::unlock before locking PREROLL_LOCK
3967      * since we lock it before going into ::render */
3968     if (bclass->unlock)
3969       bclass->unlock (sink);
3970
3971     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3972     /* now that we have the PREROLL lock, clear our unlock request */
3973     if (bclass->unlock_stop)
3974       bclass->unlock_stop (sink);
3975
3976     /* update the stepinfo and make it valid */
3977     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3978         intermediate);
3979
3980     if (sink->priv->async_enabled) {
3981       /* and we need to commit our state again on the next
3982        * prerolled buffer */
3983       sink->playing_async = TRUE;
3984       priv->pending_step.need_preroll = TRUE;
3985       sink->need_preroll = FALSE;
3986       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3987     } else {
3988       sink->priv->have_latency = TRUE;
3989       sink->need_preroll = FALSE;
3990     }
3991     priv->current_sstart = GST_CLOCK_TIME_NONE;
3992     priv->current_sstop = GST_CLOCK_TIME_NONE;
3993     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3994     priv->call_preroll = TRUE;
3995     gst_base_sink_set_last_buffer (sink, NULL);
3996     gst_base_sink_reset_qos (sink);
3997
3998     if (sink->clock_id) {
3999       gst_clock_id_unschedule (sink->clock_id);
4000     }
4001
4002     if (sink->have_preroll) {
4003       GST_DEBUG_OBJECT (sink, "signal waiter");
4004       priv->step_unlock = TRUE;
4005       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4006     }
4007     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4008   } else {
4009     /* update the stepinfo and make it valid */
4010     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4011         intermediate);
4012   }
4013
4014   return TRUE;
4015 }
4016
4017 /* with STREAM_LOCK
4018  */
4019 static void
4020 gst_base_sink_loop (GstPad * pad)
4021 {
4022   GstBaseSink *basesink;
4023   GstBuffer *buf = NULL;
4024   GstFlowReturn result;
4025   guint blocksize;
4026   guint64 offset;
4027
4028   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4029
4030   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4031
4032   if ((blocksize = basesink->priv->blocksize) == 0)
4033     blocksize = -1;
4034
4035   offset = basesink->segment.last_stop;
4036
4037   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4038       offset, blocksize);
4039
4040   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4041   if (G_UNLIKELY (result != GST_FLOW_OK))
4042     goto paused;
4043
4044   if (G_UNLIKELY (buf == NULL))
4045     goto no_buffer;
4046
4047   offset += GST_BUFFER_SIZE (buf);
4048
4049   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4050
4051   GST_PAD_PREROLL_LOCK (pad);
4052   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4053   GST_PAD_PREROLL_UNLOCK (pad);
4054   if (G_UNLIKELY (result != GST_FLOW_OK))
4055     goto paused;
4056
4057   return;
4058
4059   /* ERRORS */
4060 paused:
4061   {
4062     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4063         gst_flow_get_name (result));
4064     gst_pad_pause_task (pad);
4065     if (result == GST_FLOW_UNEXPECTED) {
4066       /* perform EOS logic */
4067       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4068         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4069             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4070                 basesink->segment.format, basesink->segment.last_stop));
4071       } else {
4072         gst_base_sink_event (pad, gst_event_new_eos ());
4073       }
4074     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4075       /* for fatal errors we post an error message, post the error
4076        * first so the app knows about the error first. 
4077        * wrong-state is not a fatal error because it happens due to
4078        * flushing and posting an error message in that case is the
4079        * wrong thing to do, e.g. when basesrc is doing a flushing
4080        * seek. */
4081       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4082           (_("Internal data stream error.")),
4083           ("stream stopped, reason %s", gst_flow_get_name (result)));
4084       gst_base_sink_event (pad, gst_event_new_eos ());
4085     }
4086     return;
4087   }
4088 no_buffer:
4089   {
4090     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4091     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4092         (_("Internal data flow error.")), ("element returned NULL buffer"));
4093     result = GST_FLOW_ERROR;
4094     goto paused;
4095   }
4096 }
4097
4098 static gboolean
4099 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4100     gboolean flushing)
4101 {
4102   GstBaseSinkClass *bclass;
4103
4104   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4105
4106   if (flushing) {
4107     /* unlock any subclasses, we need to do this before grabbing the
4108      * PREROLL_LOCK since we hold this lock before going into ::render. */
4109     if (bclass->unlock)
4110       bclass->unlock (basesink);
4111   }
4112
4113   GST_PAD_PREROLL_LOCK (pad);
4114   basesink->flushing = flushing;
4115   if (flushing) {
4116     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4117     if (bclass->unlock_stop)
4118       bclass->unlock_stop (basesink);
4119
4120     /* set need_preroll before we unblock the clock. If the clock is unblocked
4121      * before timing out, we can reuse the buffer for preroll. */
4122     basesink->need_preroll = TRUE;
4123
4124     /* step 2, unblock clock sync (if any) or any other blocking thing */
4125     if (basesink->clock_id) {
4126       gst_clock_id_unschedule (basesink->clock_id);
4127     }
4128
4129     /* flush out the data thread if it's locked in finish_preroll, this will
4130      * also flush out the EOS state */
4131     GST_DEBUG_OBJECT (basesink,
4132         "flushing out data thread, need preroll to TRUE");
4133     gst_base_sink_preroll_queue_flush (basesink, pad);
4134   }
4135   GST_PAD_PREROLL_UNLOCK (pad);
4136
4137   return TRUE;
4138 }
4139
4140 static gboolean
4141 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4142 {
4143   gboolean result;
4144
4145   if (active) {
4146     /* start task */
4147     result = gst_pad_start_task (basesink->sinkpad,
4148         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4149   } else {
4150     /* step 2, make sure streaming finishes */
4151     result = gst_pad_stop_task (basesink->sinkpad);
4152   }
4153
4154   return result;
4155 }
4156
4157 static gboolean
4158 gst_base_sink_pad_activate (GstPad * pad)
4159 {
4160   gboolean result = FALSE;
4161   GstBaseSink *basesink;
4162
4163   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4164
4165   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4166
4167   gst_base_sink_set_flushing (basesink, pad, FALSE);
4168
4169   /* we need to have the pull mode enabled */
4170   if (!basesink->can_activate_pull) {
4171     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4172     goto fallback;
4173   }
4174
4175   /* check if downstreams supports pull mode at all */
4176   if (!gst_pad_check_pull_range (pad)) {
4177     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4178     goto fallback;
4179   }
4180
4181   /* set the pad mode before starting the task so that it's in the
4182    * correct state for the new thread. also the sink set_caps and get_caps
4183    * function checks this */
4184   basesink->pad_mode = GST_ACTIVATE_PULL;
4185
4186   /* we first try to negotiate a format so that when we try to activate
4187    * downstream, it knows about our format */
4188   if (!gst_base_sink_negotiate_pull (basesink)) {
4189     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4190     goto fallback;
4191   }
4192
4193   /* ok activate now */
4194   if (!gst_pad_activate_pull (pad, TRUE)) {
4195     /* clear any pending caps */
4196     GST_OBJECT_LOCK (basesink);
4197     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4198     GST_OBJECT_UNLOCK (basesink);
4199     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4200     goto fallback;
4201   }
4202
4203   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4204   result = TRUE;
4205   goto done;
4206
4207   /* push mode fallback */
4208 fallback:
4209   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4210   if ((result = gst_pad_activate_push (pad, TRUE))) {
4211     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4212   }
4213
4214 done:
4215   if (!result) {
4216     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4217     gst_base_sink_set_flushing (basesink, pad, TRUE);
4218   }
4219
4220   gst_object_unref (basesink);
4221
4222   return result;
4223 }
4224
4225 static gboolean
4226 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4227 {
4228   gboolean result;
4229   GstBaseSink *basesink;
4230
4231   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4232
4233   if (active) {
4234     if (!basesink->can_activate_push) {
4235       result = FALSE;
4236       basesink->pad_mode = GST_ACTIVATE_NONE;
4237     } else {
4238       result = TRUE;
4239       basesink->pad_mode = GST_ACTIVATE_PUSH;
4240     }
4241   } else {
4242     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4243       g_warning ("Internal GStreamer activation error!!!");
4244       result = FALSE;
4245     } else {
4246       gst_base_sink_set_flushing (basesink, pad, TRUE);
4247       result = TRUE;
4248       basesink->pad_mode = GST_ACTIVATE_NONE;
4249     }
4250   }
4251
4252   gst_object_unref (basesink);
4253
4254   return result;
4255 }
4256
4257 static gboolean
4258 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4259 {
4260   GstCaps *caps;
4261   gboolean result;
4262
4263   result = FALSE;
4264
4265   /* this returns the intersection between our caps and the peer caps. If there
4266    * is no peer, it returns NULL and we can't operate in pull mode so we can
4267    * fail the negotiation. */
4268   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4269   if (caps == NULL || gst_caps_is_empty (caps))
4270     goto no_caps_possible;
4271
4272   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4273
4274   caps = gst_caps_make_writable (caps);
4275   /* get the first (prefered) format */
4276   gst_caps_truncate (caps);
4277   /* try to fixate */
4278   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4279
4280   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4281
4282   if (gst_caps_is_any (caps)) {
4283     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4284         "allowing pull()");
4285     /* neither side has template caps in this case, so they are prepared for
4286        pull() without setcaps() */
4287     result = TRUE;
4288   } else if (gst_caps_is_fixed (caps)) {
4289     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4290       goto could_not_set_caps;
4291
4292     GST_OBJECT_LOCK (basesink);
4293     gst_caps_replace (&basesink->priv->pull_caps, caps);
4294     GST_OBJECT_UNLOCK (basesink);
4295
4296     result = TRUE;
4297   }
4298
4299   gst_caps_unref (caps);
4300
4301   return result;
4302
4303 no_caps_possible:
4304   {
4305     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4306     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4307     if (caps)
4308       gst_caps_unref (caps);
4309     return FALSE;
4310   }
4311 could_not_set_caps:
4312   {
4313     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4314     gst_caps_unref (caps);
4315     return FALSE;
4316   }
4317 }
4318
4319 /* this won't get called until we implement an activate function */
4320 static gboolean
4321 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4322 {
4323   gboolean result = FALSE;
4324   GstBaseSink *basesink;
4325   GstBaseSinkClass *bclass;
4326
4327   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4328   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4329
4330   if (active) {
4331     GstFormat format;
4332     gint64 duration;
4333
4334     /* we mark we have a newsegment here because pull based
4335      * mode works just fine without having a newsegment before the
4336      * first buffer */
4337     format = GST_FORMAT_BYTES;
4338
4339     gst_segment_init (&basesink->segment, format);
4340     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4341     GST_OBJECT_LOCK (basesink);
4342     basesink->have_newsegment = TRUE;
4343     GST_OBJECT_UNLOCK (basesink);
4344
4345     /* get the peer duration in bytes */
4346     result = gst_pad_query_peer_duration (pad, &format, &duration);
4347     if (result) {
4348       GST_DEBUG_OBJECT (basesink,
4349           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4350       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4351           duration);
4352       gst_segment_set_duration (&basesink->segment, format, duration);
4353     } else {
4354       GST_DEBUG_OBJECT (basesink, "unknown duration");
4355     }
4356
4357     if (bclass->activate_pull)
4358       result = bclass->activate_pull (basesink, TRUE);
4359     else
4360       result = FALSE;
4361
4362     if (!result)
4363       goto activate_failed;
4364
4365   } else {
4366     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4367       g_warning ("Internal GStreamer activation error!!!");
4368       result = FALSE;
4369     } else {
4370       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4371       if (bclass->activate_pull)
4372         result &= bclass->activate_pull (basesink, FALSE);
4373       basesink->pad_mode = GST_ACTIVATE_NONE;
4374       /* clear any pending caps */
4375       GST_OBJECT_LOCK (basesink);
4376       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4377       GST_OBJECT_UNLOCK (basesink);
4378     }
4379   }
4380   gst_object_unref (basesink);
4381
4382   return result;
4383
4384   /* ERRORS */
4385 activate_failed:
4386   {
4387     /* reset, as starting the thread failed */
4388     basesink->pad_mode = GST_ACTIVATE_NONE;
4389
4390     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4391     return FALSE;
4392   }
4393 }
4394
4395 /* send an event to our sinkpad peer. */
4396 static gboolean
4397 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4398 {
4399   GstPad *pad;
4400   GstBaseSink *basesink = GST_BASE_SINK (element);
4401   gboolean forward, result = TRUE;
4402   GstActivateMode mode;
4403
4404   GST_OBJECT_LOCK (element);
4405   /* get the pad and the scheduling mode */
4406   pad = gst_object_ref (basesink->sinkpad);
4407   mode = basesink->pad_mode;
4408   GST_OBJECT_UNLOCK (element);
4409
4410   /* only push UPSTREAM events upstream */
4411   forward = GST_EVENT_IS_UPSTREAM (event);
4412
4413   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4414       event);
4415
4416   switch (GST_EVENT_TYPE (event)) {
4417     case GST_EVENT_LATENCY:
4418     {
4419       GstClockTime latency;
4420
4421       gst_event_parse_latency (event, &latency);
4422
4423       /* store the latency. We use this to adjust the running_time before syncing
4424        * it to the clock. */
4425       GST_OBJECT_LOCK (element);
4426       basesink->priv->latency = latency;
4427       if (!basesink->priv->have_latency)
4428         forward = FALSE;
4429       GST_OBJECT_UNLOCK (element);
4430       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4431           GST_TIME_ARGS (latency));
4432
4433       /* We forward this event so that all elements know about the global pipeline
4434        * latency. This is interesting for an element when it wants to figure out
4435        * when a particular piece of data will be rendered. */
4436       break;
4437     }
4438     case GST_EVENT_SEEK:
4439       /* in pull mode we will execute the seek */
4440       if (mode == GST_ACTIVATE_PULL)
4441         result = gst_base_sink_perform_seek (basesink, pad, event);
4442       break;
4443     case GST_EVENT_STEP:
4444       result = gst_base_sink_perform_step (basesink, pad, event);
4445       forward = FALSE;
4446       break;
4447     default:
4448       break;
4449   }
4450
4451   if (forward) {
4452     result = gst_pad_push_event (pad, event);
4453   } else {
4454     /* not forwarded, unref the event */
4455     gst_event_unref (event);
4456   }
4457
4458   gst_object_unref (pad);
4459   return result;
4460 }
4461
4462 static gboolean
4463 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4464     gint64 * cur, gboolean * upstream)
4465 {
4466   GstClock *clock = NULL;
4467   gboolean res = FALSE;
4468   GstFormat oformat, tformat;
4469   GstSegment *segment;
4470   GstClockTime now, latency;
4471   GstClockTimeDiff base;
4472   gint64 time, accum, duration;
4473   gdouble rate;
4474   gint64 last;
4475   gboolean last_seen, with_clock, in_paused;
4476
4477   GST_OBJECT_LOCK (basesink);
4478   /* we can only get the segment when we are not NULL or READY */
4479   if (!basesink->have_newsegment)
4480     goto wrong_state;
4481
4482   in_paused = FALSE;
4483   /* when not in PLAYING or when we're busy with a state change, we
4484    * cannot read from the clock so we report time based on the
4485    * last seen timestamp. */
4486   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4487       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4488     in_paused = TRUE;
4489   }
4490
4491   /* we don't use the clip segment in pull mode, when seeking we update the
4492    * main segment directly with the new segment values without it having to be
4493    * activated by the rendering after preroll */
4494   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4495     segment = basesink->abidata.ABI.clip_segment;
4496   else
4497     segment = &basesink->segment;
4498
4499   /* our intermediate time format */
4500   tformat = GST_FORMAT_TIME;
4501   /* get the format in the segment */
4502   oformat = segment->format;
4503
4504   /* report with last seen position when EOS */
4505   last_seen = basesink->eos;
4506
4507   /* assume we will use the clock for getting the current position */
4508   with_clock = TRUE;
4509   if (basesink->sync == FALSE)
4510     with_clock = FALSE;
4511
4512   /* and we need a clock */
4513   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4514     with_clock = FALSE;
4515   else
4516     gst_object_ref (clock);
4517
4518   /* collect all data we need holding the lock */
4519   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4520     time = segment->time;
4521   else
4522     time = 0;
4523
4524   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4525     duration = segment->stop - segment->start;
4526   else
4527     duration = 0;
4528
4529   accum = segment->accum;
4530   rate = segment->rate * segment->applied_rate;
4531   latency = basesink->priv->latency;
4532
4533   if (oformat == GST_FORMAT_TIME) {
4534     gint64 start, stop;
4535
4536     start = basesink->priv->current_sstart;
4537     stop = basesink->priv->current_sstop;
4538
4539     if (in_paused) {
4540       /* in paused we use the last position as a lower bound */
4541       if (stop == -1 || segment->rate > 0.0)
4542         last = start;
4543       else
4544         last = stop;
4545     } else {
4546       /* in playing, use last stop time as upper bound */
4547       if (start == -1 || segment->rate > 0.0)
4548         last = stop;
4549       else
4550         last = start;
4551     }
4552   } else {
4553     /* convert last stop to stream time */
4554     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4555   }
4556
4557   if (in_paused) {
4558     /* in paused, use start_time */
4559     base = GST_ELEMENT_START_TIME (basesink);
4560     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4561         GST_TIME_ARGS (base));
4562   } else if (with_clock) {
4563     /* else use clock when needed */
4564     base = GST_ELEMENT_CAST (basesink)->base_time;
4565     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4566         GST_TIME_ARGS (base));
4567   } else {
4568     /* else, no sync or clock -> no base time */
4569     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4570     base = -1;
4571   }
4572
4573   /* no base, we can't calculate running_time, use last seem timestamp to report
4574    * time */
4575   if (base == -1)
4576     last_seen = TRUE;
4577
4578   /* need to release the object lock before we can get the time,
4579    * a clock might take the LOCK of the provider, which could be
4580    * a basesink subclass. */
4581   GST_OBJECT_UNLOCK (basesink);
4582
4583   if (last_seen) {
4584     /* in EOS or when no valid stream_time, report the value of last seen
4585      * timestamp */
4586     if (last == -1) {
4587       /* no timestamp, we need to ask upstream */
4588       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4589       res = FALSE;
4590       *upstream = TRUE;
4591       goto done;
4592     }
4593     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4594         GST_TIME_ARGS (last));
4595     *cur = last;
4596   } else {
4597     if (oformat != tformat) {
4598       /* convert accum, time and duration to time */
4599       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4600               &accum))
4601         goto convert_failed;
4602       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4603               &tformat, &duration))
4604         goto convert_failed;
4605       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4606               &time))
4607         goto convert_failed;
4608       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4609               &last))
4610         goto convert_failed;
4611
4612       /* assume time format from now on */
4613       oformat = tformat;
4614     }
4615
4616     if (!in_paused && with_clock) {
4617       now = gst_clock_get_time (clock);
4618     } else {
4619       now = base;
4620       base = 0;
4621     }
4622
4623     /* subtract base time and accumulated time from the clock time.
4624      * Make sure we don't go negative. This is the current time in
4625      * the segment which we need to scale with the combined
4626      * rate and applied rate. */
4627     base += accum;
4628     base += latency;
4629     if (GST_CLOCK_DIFF (base, now) < 0)
4630       base = now;
4631
4632     /* for negative rates we need to count back from the segment
4633      * duration. */
4634     if (rate < 0.0)
4635       time += duration;
4636
4637     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4638
4639     if (in_paused) {
4640       /* never report less than segment values in paused */
4641       if (last != -1)
4642         *cur = MAX (last, *cur);
4643     } else {
4644       /* never report more than last seen position in playing */
4645       if (last != -1)
4646         *cur = MIN (last, *cur);
4647     }
4648
4649     GST_DEBUG_OBJECT (basesink,
4650         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4651         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4652         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4653         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4654   }
4655
4656   if (oformat != format) {
4657     /* convert to final format */
4658     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4659       goto convert_failed;
4660   }
4661
4662   res = TRUE;
4663
4664 done:
4665   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4666       res, GST_TIME_ARGS (*cur));
4667
4668   if (clock)
4669     gst_object_unref (clock);
4670
4671   return res;
4672
4673   /* special cases */
4674 wrong_state:
4675   {
4676     /* in NULL or READY we always return FALSE and -1 */
4677     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4678     res = FALSE;
4679     *cur = -1;
4680     GST_OBJECT_UNLOCK (basesink);
4681     goto done;
4682   }
4683 convert_failed:
4684   {
4685     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4686     *upstream = TRUE;
4687     res = FALSE;
4688     goto done;
4689   }
4690 }
4691
4692 static gboolean
4693 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4694     gint64 * dur, gboolean * upstream)
4695 {
4696   gboolean res = FALSE;
4697
4698   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4699     GstFormat uformat = GST_FORMAT_BYTES;
4700     gint64 uduration;
4701
4702     /* get the duration in bytes, in pull mode that's all we are sure to
4703      * know. We have to explicitly get this value from upstream instead of
4704      * using our cached value because it might change. Duration caching
4705      * should be done at a higher level. */
4706     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4707     if (res) {
4708       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4709       if (format != uformat) {
4710         /* convert to the requested format */
4711         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4712             &format, dur);
4713       } else {
4714         *dur = uduration;
4715       }
4716     }
4717     *upstream = FALSE;
4718   } else {
4719     *upstream = TRUE;
4720   }
4721
4722   return res;
4723 }
4724
4725 static const GstQueryType *
4726 gst_base_sink_get_query_types (GstElement * element)
4727 {
4728   static const GstQueryType query_types[] = {
4729     GST_QUERY_DURATION,
4730     GST_QUERY_POSITION,
4731     GST_QUERY_SEGMENT,
4732     GST_QUERY_LATENCY,
4733     0
4734   };
4735
4736   return query_types;
4737 }
4738
4739 static gboolean
4740 gst_base_sink_query (GstElement * element, GstQuery * query)
4741 {
4742   gboolean res = FALSE;
4743
4744   GstBaseSink *basesink = GST_BASE_SINK (element);
4745
4746   switch (GST_QUERY_TYPE (query)) {
4747     case GST_QUERY_POSITION:
4748     {
4749       gint64 cur = 0;
4750       GstFormat format;
4751       gboolean upstream = FALSE;
4752
4753       gst_query_parse_position (query, &format, NULL);
4754
4755       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4756           gst_format_get_name (format));
4757
4758       /* first try to get the position based on the clock */
4759       if ((res =
4760               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4761         gst_query_set_position (query, format, cur);
4762       } else if (upstream) {
4763         /* fallback to peer query */
4764         res = gst_pad_peer_query (basesink->sinkpad, query);
4765       }
4766       if (!res) {
4767         /* we can handle a few things if upstream failed */
4768         if (format == GST_FORMAT_PERCENT) {
4769           gint64 dur = 0;
4770           GstFormat uformat = GST_FORMAT_TIME;
4771
4772           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4773               &upstream);
4774           if (!res && upstream) {
4775             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4776                 &cur);
4777           }
4778           if (res) {
4779             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4780                 &upstream);
4781             if (!res && upstream) {
4782               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4783                   &dur);
4784             }
4785           }
4786           if (res) {
4787             gint64 pos;
4788
4789             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4790                 dur);
4791             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4792           }
4793         }
4794       }
4795       break;
4796     }
4797     case GST_QUERY_DURATION:
4798     {
4799       gint64 dur = 0;
4800       GstFormat format;
4801       gboolean upstream = FALSE;
4802
4803       gst_query_parse_duration (query, &format, NULL);
4804
4805       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4806           gst_format_get_name (format));
4807
4808       if ((res =
4809               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4810         gst_query_set_duration (query, format, dur);
4811       } else if (upstream) {
4812         /* fallback to peer query */
4813         res = gst_pad_peer_query (basesink->sinkpad, query);
4814       }
4815       if (!res) {
4816         /* we can handle a few things if upstream failed */
4817         if (format == GST_FORMAT_PERCENT) {
4818           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4819               GST_FORMAT_PERCENT_MAX);
4820           res = TRUE;
4821         }
4822       }
4823       break;
4824     }
4825     case GST_QUERY_LATENCY:
4826     {
4827       gboolean live, us_live;
4828       GstClockTime min, max;
4829
4830       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4831                   &max))) {
4832         gst_query_set_latency (query, live, min, max);
4833       }
4834       break;
4835     }
4836     case GST_QUERY_JITTER:
4837       break;
4838     case GST_QUERY_RATE:
4839       /* gst_query_set_rate (query, basesink->segment_rate); */
4840       res = TRUE;
4841       break;
4842     case GST_QUERY_SEGMENT:
4843     {
4844       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4845         gst_query_set_segment (query, basesink->segment.rate,
4846             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4847         res = TRUE;
4848       } else {
4849         res = gst_pad_peer_query (basesink->sinkpad, query);
4850       }
4851       break;
4852     }
4853     case GST_QUERY_SEEKING:
4854     case GST_QUERY_CONVERT:
4855     case GST_QUERY_FORMATS:
4856     default:
4857       res = gst_pad_peer_query (basesink->sinkpad, query);
4858       break;
4859   }
4860   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4861       GST_QUERY_TYPE_NAME (query), res);
4862   return res;
4863 }
4864
4865 static GstStateChangeReturn
4866 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4867 {
4868   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4869   GstBaseSink *basesink = GST_BASE_SINK (element);
4870   GstBaseSinkClass *bclass;
4871   GstBaseSinkPrivate *priv;
4872
4873   priv = basesink->priv;
4874
4875   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4876
4877   switch (transition) {
4878     case GST_STATE_CHANGE_NULL_TO_READY:
4879       if (bclass->start)
4880         if (!bclass->start (basesink))
4881           goto start_failed;
4882       break;
4883     case GST_STATE_CHANGE_READY_TO_PAUSED:
4884       /* need to complete preroll before this state change completes, there
4885        * is no data flow in READY so we can safely assume we need to preroll. */
4886       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4887       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4888       basesink->have_newsegment = FALSE;
4889       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4890       gst_segment_init (basesink->abidata.ABI.clip_segment,
4891           GST_FORMAT_UNDEFINED);
4892       basesink->offset = 0;
4893       basesink->have_preroll = FALSE;
4894       priv->step_unlock = FALSE;
4895       basesink->need_preroll = TRUE;
4896       basesink->playing_async = TRUE;
4897       priv->current_sstart = GST_CLOCK_TIME_NONE;
4898       priv->current_sstop = GST_CLOCK_TIME_NONE;
4899       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4900       priv->latency = 0;
4901       basesink->eos = FALSE;
4902       priv->received_eos = FALSE;
4903       gst_base_sink_reset_qos (basesink);
4904       priv->commited = FALSE;
4905       priv->call_preroll = TRUE;
4906       priv->current_step.valid = FALSE;
4907       priv->pending_step.valid = FALSE;
4908       if (priv->async_enabled) {
4909         GST_DEBUG_OBJECT (basesink, "doing async state change");
4910         /* when async enabled, post async-start message and return ASYNC from
4911          * the state change function */
4912         ret = GST_STATE_CHANGE_ASYNC;
4913         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4914             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4915       } else {
4916         priv->have_latency = TRUE;
4917       }
4918       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4919       break;
4920     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4921       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4922       if (!gst_base_sink_needs_preroll (basesink)) {
4923         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4924         /* no preroll needed anymore now. */
4925         basesink->playing_async = FALSE;
4926         basesink->need_preroll = FALSE;
4927         if (basesink->eos) {
4928           GstMessage *message;
4929
4930           /* need to post EOS message here */
4931           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4932           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4933           gst_message_set_seqnum (message, basesink->priv->seqnum);
4934           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4935         } else {
4936           GST_DEBUG_OBJECT (basesink, "signal preroll");
4937           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4938         }
4939       } else {
4940         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4941         basesink->need_preroll = TRUE;
4942         basesink->playing_async = TRUE;
4943         priv->call_preroll = TRUE;
4944         priv->commited = FALSE;
4945         if (priv->async_enabled) {
4946           GST_DEBUG_OBJECT (basesink, "doing async state change");
4947           ret = GST_STATE_CHANGE_ASYNC;
4948           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4949               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4950         }
4951       }
4952       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4953       break;
4954     default:
4955       break;
4956   }
4957
4958   {
4959     GstStateChangeReturn bret;
4960
4961     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4962     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4963       goto activate_failed;
4964   }
4965
4966   switch (transition) {
4967     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4968       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4969       /* FIXME, make sure we cannot enter _render first */
4970
4971       /* we need to call ::unlock before locking PREROLL_LOCK
4972        * since we lock it before going into ::render */
4973       if (bclass->unlock)
4974         bclass->unlock (basesink);
4975
4976       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4977       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4978       /* now that we have the PREROLL lock, clear our unlock request */
4979       if (bclass->unlock_stop)
4980         bclass->unlock_stop (basesink);
4981
4982       /* we need preroll again and we set the flag before unlocking the clockid
4983        * because if the clockid is unlocked before a current buffer expired, we
4984        * can use that buffer to preroll with */
4985       basesink->need_preroll = TRUE;
4986
4987       if (basesink->clock_id) {
4988         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4989         gst_clock_id_unschedule (basesink->clock_id);
4990       }
4991
4992       /* if we don't have a preroll buffer we need to wait for a preroll and
4993        * return ASYNC. */
4994       if (!gst_base_sink_needs_preroll (basesink)) {
4995         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4996         basesink->playing_async = FALSE;
4997       } else {
4998         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4999           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5000           ret = GST_STATE_CHANGE_SUCCESS;
5001         } else {
5002           GST_DEBUG_OBJECT (basesink,
5003               "PLAYING to PAUSED, we are not prerolled");
5004           basesink->playing_async = TRUE;
5005           priv->commited = FALSE;
5006           priv->call_preroll = TRUE;
5007           if (priv->async_enabled) {
5008             GST_DEBUG_OBJECT (basesink, "doing async state change");
5009             ret = GST_STATE_CHANGE_ASYNC;
5010             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5011                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5012                     FALSE));
5013           }
5014         }
5015       }
5016       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5017           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5018
5019       gst_base_sink_reset_qos (basesink);
5020       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5021       break;
5022     case GST_STATE_CHANGE_PAUSED_TO_READY:
5023       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5024       /* start by reseting our position state with the object lock so that the
5025        * position query gets the right idea. We do this before we post the
5026        * messages so that the message handlers pick this up. */
5027       GST_OBJECT_LOCK (basesink);
5028       basesink->have_newsegment = FALSE;
5029       priv->current_sstart = GST_CLOCK_TIME_NONE;
5030       priv->current_sstop = GST_CLOCK_TIME_NONE;
5031       priv->have_latency = FALSE;
5032       if (priv->cached_clock_id) {
5033         gst_clock_id_unref (priv->cached_clock_id);
5034         priv->cached_clock_id = NULL;
5035       }
5036       GST_OBJECT_UNLOCK (basesink);
5037
5038       gst_base_sink_set_last_buffer (basesink, NULL);
5039       priv->call_preroll = FALSE;
5040
5041       if (!priv->commited) {
5042         if (priv->async_enabled) {
5043           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5044
5045           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5046               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5047                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5048
5049           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5050               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5051         }
5052         priv->commited = TRUE;
5053       } else {
5054         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5055       }
5056       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5057       break;
5058     case GST_STATE_CHANGE_READY_TO_NULL:
5059       if (bclass->stop) {
5060         if (!bclass->stop (basesink)) {
5061           GST_WARNING_OBJECT (basesink, "failed to stop");
5062         }
5063       }
5064       gst_base_sink_set_last_buffer (basesink, NULL);
5065       priv->call_preroll = FALSE;
5066       break;
5067     default:
5068       break;
5069   }
5070
5071   return ret;
5072
5073   /* ERRORS */
5074 start_failed:
5075   {
5076     GST_DEBUG_OBJECT (basesink, "failed to start");
5077     return GST_STATE_CHANGE_FAILURE;
5078   }
5079 activate_failed:
5080   {
5081     GST_DEBUG_OBJECT (basesink,
5082         "element failed to change states -- activation problem?");
5083     return GST_STATE_CHANGE_FAILURE;
5084   }
5085 }