basesink: use new QoS type
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199
200   /* EOS sync time in running time */
201   GstClockTime eos_rtime;
202
203   /* last buffer that arrived in time, running time */
204   GstClockTime last_in_time;
205   /* when the last buffer left the sink, running time */
206   GstClockTime last_left;
207
208   /* running averages go here these are done on running time */
209   GstClockTime avg_pt;
210   GstClockTime avg_duration;
211   gdouble avg_rate;
212
213   /* these are done on system time. avg_jitter and avg_render are
214    * compared to eachother to see if the rendering time takes a
215    * huge amount of the processing, If so we are flooded with
216    * buffers. */
217   GstClockTime last_left_systime;
218   GstClockTime avg_jitter;
219   GstClockTime start, stop;
220   GstClockTime avg_render;
221
222   /* number of rendered and dropped frames */
223   guint64 rendered;
224   guint64 dropped;
225
226   /* latency stuff */
227   GstClockTime latency;
228
229   /* if we already commited the state */
230   gboolean commited;
231
232   /* when we received EOS */
233   gboolean received_eos;
234
235   /* when we are prerolled and able to report latency */
236   gboolean have_latency;
237
238   /* the last buffer we prerolled or rendered. Useful for making snapshots */
239   gint enable_last_buffer;      /* atomic */
240   GstBuffer *last_buffer;
241
242   /* caps for pull based scheduling */
243   GstCaps *pull_caps;
244
245   /* blocksize for pulling */
246   guint blocksize;
247
248   gboolean discont;
249
250   /* seqnum of the stream */
251   guint32 seqnum;
252
253   gboolean call_preroll;
254   gboolean step_unlock;
255
256   /* we have a pending and a current step operation */
257   GstStepInfo current_step;
258   GstStepInfo pending_step;
259
260   /* Cached GstClockID */
261   GstClockID cached_clock_id;
262
263   /* for throttling */
264   GstClockTime throttle_time;
265 };
266
267 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
268
269 /* generic running average, this has a neutral window size */
270 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
271
272 /* the windows for these running averages are experimentally obtained.
273  * possitive values get averaged more while negative values use a small
274  * window so we can react faster to badness. */
275 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
276 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
277
278 enum
279 {
280   _PR_IS_NOTHING = 1 << 0,
281   _PR_IS_BUFFER = 1 << 1,
282   _PR_IS_BUFFERLIST = 1 << 2,
283   _PR_IS_EVENT = 1 << 3
284 } PrivateObjectType;
285
286 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
287 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
288 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
289 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
290
291 /* BaseSink properties */
292
293 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
294 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
295
296 #define DEFAULT_PREROLL_QUEUE_LEN   0
297 #define DEFAULT_SYNC                TRUE
298 #define DEFAULT_MAX_LATENESS        -1
299 #define DEFAULT_QOS                 FALSE
300 #define DEFAULT_ASYNC               TRUE
301 #define DEFAULT_TS_OFFSET           0
302 #define DEFAULT_BLOCKSIZE           4096
303 #define DEFAULT_RENDER_DELAY        0
304 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
305 #define DEFAULT_THROTTLE_TIME       0
306
307 enum
308 {
309   PROP_0,
310   PROP_PREROLL_QUEUE_LEN,
311   PROP_SYNC,
312   PROP_MAX_LATENESS,
313   PROP_QOS,
314   PROP_ASYNC,
315   PROP_TS_OFFSET,
316   PROP_ENABLE_LAST_BUFFER,
317   PROP_LAST_BUFFER,
318   PROP_BLOCKSIZE,
319   PROP_RENDER_DELAY,
320   PROP_THROTTLE_TIME,
321   PROP_LAST
322 };
323
324 static GstElementClass *parent_class = NULL;
325
326 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
327 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
328 static void gst_base_sink_finalize (GObject * object);
329
330 GType
331 gst_base_sink_get_type (void)
332 {
333   static volatile gsize base_sink_type = 0;
334
335   if (g_once_init_enter (&base_sink_type)) {
336     GType _type;
337     static const GTypeInfo base_sink_info = {
338       sizeof (GstBaseSinkClass),
339       NULL,
340       NULL,
341       (GClassInitFunc) gst_base_sink_class_init,
342       NULL,
343       NULL,
344       sizeof (GstBaseSink),
345       0,
346       (GInstanceInitFunc) gst_base_sink_init,
347     };
348
349     _type = g_type_register_static (GST_TYPE_ELEMENT,
350         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
351     g_once_init_leave (&base_sink_type, _type);
352   }
353   return base_sink_type;
354 }
355
356 static void gst_base_sink_set_property (GObject * object, guint prop_id,
357     const GValue * value, GParamSpec * pspec);
358 static void gst_base_sink_get_property (GObject * object, guint prop_id,
359     GValue * value, GParamSpec * pspec);
360
361 static gboolean gst_base_sink_send_event (GstElement * element,
362     GstEvent * event);
363 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
364 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
365
366 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
367 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
368 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
369     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
370 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
371     GstClockTime * start, GstClockTime * end);
372 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
373     GstPad * pad, gboolean flushing);
374 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
375     gboolean active);
376 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
377     GstSegment * segment);
378 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
379     GstEvent * event, GstSegment * segment);
380
381 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
382     GstStateChange transition);
383
384 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
385 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
386     GstBufferList * list);
387
388 static void gst_base_sink_loop (GstPad * pad);
389 static gboolean gst_base_sink_pad_activate (GstPad * pad);
390 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
391 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
392 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
393
394 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
395 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
396 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
397 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
398 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
399     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
400
401
402 /* check if an object was too late */
403 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
404     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
405     GstClockReturn status, GstClockTimeDiff jitter);
406 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
407     guint8 obj_type, GstMiniObject * obj);
408
409 static void
410 gst_base_sink_class_init (GstBaseSinkClass * klass)
411 {
412   GObjectClass *gobject_class;
413   GstElementClass *gstelement_class;
414
415   gobject_class = G_OBJECT_CLASS (klass);
416   gstelement_class = GST_ELEMENT_CLASS (klass);
417
418   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
419       "basesink element");
420
421   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
422
423   parent_class = g_type_class_peek_parent (klass);
424
425   gobject_class->finalize = gst_base_sink_finalize;
426   gobject_class->set_property = gst_base_sink_set_property;
427   gobject_class->get_property = gst_base_sink_get_property;
428
429   /* FIXME, this next value should be configured using an event from the
430    * upstream element, ie, the BUFFER_SIZE event. */
431   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
432       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
433           "Number of buffers to queue during preroll", 0, G_MAXUINT,
434           DEFAULT_PREROLL_QUEUE_LEN,
435           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
436
437   g_object_class_install_property (gobject_class, PROP_SYNC,
438       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
442       g_param_spec_int64 ("max-lateness", "Max Lateness",
443           "Maximum number of nanoseconds that a buffer can be late before it "
444           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_QOS,
448       g_param_spec_boolean ("qos", "Qos",
449           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
450           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstBaseSink:async
453    *
454    * If set to #TRUE, the basesink will perform asynchronous state changes.
455    * When set to #FALSE, the sink will not signal the parent when it prerolls.
456    * Use this option when dealing with sparse streams or when synchronisation is
457    * not required.
458    *
459    * Since: 0.10.15
460    */
461   g_object_class_install_property (gobject_class, PROP_ASYNC,
462       g_param_spec_boolean ("async", "Async",
463           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
464           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
465   /**
466    * GstBaseSink:ts-offset
467    *
468    * Controls the final synchronisation, a negative value will render the buffer
469    * earlier while a positive value delays playback. This property can be
470    * used to fix synchronisation in bad files.
471    *
472    * Since: 0.10.15
473    */
474   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
475       g_param_spec_int64 ("ts-offset", "TS Offset",
476           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
477           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478
479   /**
480    * GstBaseSink:enable-last-buffer
481    *
482    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
483    * reference to the last buffer arrived and the last-buffer property is always
484    * set to NULL. This can be useful if you need buffers to be released as soon
485    * as possible, eg. if you're using a buffer pool.
486    *
487    * Since: 0.10.30
488    */
489   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
490       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
491           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
492           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
493
494   /**
495    * GstBaseSink:last-buffer
496    *
497    * The last buffer that arrived in the sink and was used for preroll or for
498    * rendering. This property can be used to generate thumbnails. This property
499    * can be NULL when the sink has not yet received a bufer.
500    *
501    * Since: 0.10.15
502    */
503   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
504       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
505           "The last buffer received in the sink", GST_TYPE_BUFFER,
506           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
507   /**
508    * GstBaseSink:blocksize
509    *
510    * The amount of bytes to pull when operating in pull mode.
511    *
512    * Since: 0.10.22
513    */
514   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
515   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
516       g_param_spec_uint ("blocksize", "Block size",
517           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
518           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
519   /**
520    * GstBaseSink:render-delay
521    *
522    * The additional delay between synchronisation and actual rendering of the
523    * media. This property will add additional latency to the device in order to
524    * make other sinks compensate for the delay.
525    *
526    * Since: 0.10.22
527    */
528   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
529       g_param_spec_uint64 ("render-delay", "Render Delay",
530           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
531           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
532   /**
533    * GstBaseSink:throttle-time
534    *
535    * The time to insert between buffers. This property can be used to control
536    * the maximum amount of buffers per second to render. Setting this property
537    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
538    *
539    * Since: 0.10.33
540    */
541   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
542       g_param_spec_uint64 ("throttle-time", "Throttle time",
543           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
544           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
545
546   gstelement_class->change_state =
547       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
548   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
549   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
550   gstelement_class->get_query_types =
551       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
552
553   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
554   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
555   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
556   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
557   klass->activate_pull =
558       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
559
560   /* Registering debug symbols for function pointers */
561   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
562   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
563   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
564   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
565   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
566   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
571 }
572
573 static GstCaps *
574 gst_base_sink_pad_getcaps (GstPad * pad)
575 {
576   GstBaseSinkClass *bclass;
577   GstBaseSink *bsink;
578   GstCaps *caps = NULL;
579
580   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
581   bclass = GST_BASE_SINK_GET_CLASS (bsink);
582
583   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
584     /* if we are operating in pull mode we only accept the negotiated caps */
585     GST_OBJECT_LOCK (pad);
586     if ((caps = GST_PAD_CAPS (pad)))
587       gst_caps_ref (caps);
588     GST_OBJECT_UNLOCK (pad);
589   }
590   if (caps == NULL) {
591     if (bclass->get_caps)
592       caps = bclass->get_caps (bsink);
593
594     if (caps == NULL) {
595       GstPadTemplate *pad_template;
596
597       pad_template =
598           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
599           "sink");
600       if (pad_template != NULL) {
601         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
602       }
603     }
604   }
605   gst_object_unref (bsink);
606
607   return caps;
608 }
609
610 static gboolean
611 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
612 {
613   GstBaseSinkClass *bclass;
614   GstBaseSink *bsink;
615   gboolean res = TRUE;
616
617   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
618   bclass = GST_BASE_SINK_GET_CLASS (bsink);
619
620   if (res && bclass->set_caps)
621     res = bclass->set_caps (bsink, caps);
622
623   gst_object_unref (bsink);
624
625   return res;
626 }
627
628 static void
629 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
630 {
631   GstBaseSinkClass *bclass;
632   GstBaseSink *bsink;
633
634   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
635   bclass = GST_BASE_SINK_GET_CLASS (bsink);
636
637   if (bclass->fixate)
638     bclass->fixate (bsink, caps);
639
640   gst_object_unref (bsink);
641 }
642
643 static GstFlowReturn
644 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
645     GstCaps * caps, GstBuffer ** buf)
646 {
647   GstBaseSinkClass *bclass;
648   GstBaseSink *bsink;
649   GstFlowReturn result = GST_FLOW_OK;
650
651   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
652   bclass = GST_BASE_SINK_GET_CLASS (bsink);
653
654   if (bclass->buffer_alloc)
655     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
656   else
657     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
658
659   gst_object_unref (bsink);
660
661   return result;
662 }
663
664 static void
665 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
666 {
667   GstPadTemplate *pad_template;
668   GstBaseSinkPrivate *priv;
669
670   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
671
672   pad_template =
673       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
674   g_return_if_fail (pad_template != NULL);
675
676   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
677
678   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
679   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
680   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
681   gst_pad_set_bufferalloc_function (basesink->sinkpad,
682       gst_base_sink_pad_buffer_alloc);
683   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
684   gst_pad_set_activatepush_function (basesink->sinkpad,
685       gst_base_sink_pad_activate_push);
686   gst_pad_set_activatepull_function (basesink->sinkpad,
687       gst_base_sink_pad_activate_pull);
688   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
689   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
690   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
691   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
692
693   basesink->pad_mode = GST_ACTIVATE_NONE;
694   basesink->preroll_queue = g_queue_new ();
695   basesink->abidata.ABI.clip_segment = gst_segment_new ();
696   priv->have_latency = FALSE;
697
698   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
699   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
700
701   basesink->sync = DEFAULT_SYNC;
702   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
703   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
704   priv->async_enabled = DEFAULT_ASYNC;
705   priv->ts_offset = DEFAULT_TS_OFFSET;
706   priv->render_delay = DEFAULT_RENDER_DELAY;
707   priv->blocksize = DEFAULT_BLOCKSIZE;
708   priv->cached_clock_id = NULL;
709   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
710   priv->throttle_time = DEFAULT_THROTTLE_TIME;
711
712   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
713 }
714
715 static void
716 gst_base_sink_finalize (GObject * object)
717 {
718   GstBaseSink *basesink;
719
720   basesink = GST_BASE_SINK (object);
721
722   g_queue_free (basesink->preroll_queue);
723   gst_segment_free (basesink->abidata.ABI.clip_segment);
724
725   G_OBJECT_CLASS (parent_class)->finalize (object);
726 }
727
728 /**
729  * gst_base_sink_set_sync:
730  * @sink: the sink
731  * @sync: the new sync value.
732  *
733  * Configures @sink to synchronize on the clock or not. When
734  * @sync is FALSE, incomming samples will be played as fast as
735  * possible. If @sync is TRUE, the timestamps of the incomming
736  * buffers will be used to schedule the exact render time of its
737  * contents.
738  *
739  * Since: 0.10.4
740  */
741 void
742 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
743 {
744   g_return_if_fail (GST_IS_BASE_SINK (sink));
745
746   GST_OBJECT_LOCK (sink);
747   sink->sync = sync;
748   GST_OBJECT_UNLOCK (sink);
749 }
750
751 /**
752  * gst_base_sink_get_sync:
753  * @sink: the sink
754  *
755  * Checks if @sink is currently configured to synchronize against the
756  * clock.
757  *
758  * Returns: TRUE if the sink is configured to synchronize against the clock.
759  *
760  * Since: 0.10.4
761  */
762 gboolean
763 gst_base_sink_get_sync (GstBaseSink * sink)
764 {
765   gboolean res;
766
767   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
768
769   GST_OBJECT_LOCK (sink);
770   res = sink->sync;
771   GST_OBJECT_UNLOCK (sink);
772
773   return res;
774 }
775
776 /**
777  * gst_base_sink_set_max_lateness:
778  * @sink: the sink
779  * @max_lateness: the new max lateness value.
780  *
781  * Sets the new max lateness value to @max_lateness. This value is
782  * used to decide if a buffer should be dropped or not based on the
783  * buffer timestamp and the current clock time. A value of -1 means
784  * an unlimited time.
785  *
786  * Since: 0.10.4
787  */
788 void
789 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
790 {
791   g_return_if_fail (GST_IS_BASE_SINK (sink));
792
793   GST_OBJECT_LOCK (sink);
794   sink->abidata.ABI.max_lateness = max_lateness;
795   GST_OBJECT_UNLOCK (sink);
796 }
797
798 /**
799  * gst_base_sink_get_max_lateness:
800  * @sink: the sink
801  *
802  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
803  * more details.
804  *
805  * Returns: The maximum time in nanoseconds that a buffer can be late
806  * before it is dropped and not rendered. A value of -1 means an
807  * unlimited time.
808  *
809  * Since: 0.10.4
810  */
811 gint64
812 gst_base_sink_get_max_lateness (GstBaseSink * sink)
813 {
814   gint64 res;
815
816   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
817
818   GST_OBJECT_LOCK (sink);
819   res = sink->abidata.ABI.max_lateness;
820   GST_OBJECT_UNLOCK (sink);
821
822   return res;
823 }
824
825 /**
826  * gst_base_sink_set_qos_enabled:
827  * @sink: the sink
828  * @enabled: the new qos value.
829  *
830  * Configures @sink to send Quality-of-Service events upstream.
831  *
832  * Since: 0.10.5
833  */
834 void
835 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
836 {
837   g_return_if_fail (GST_IS_BASE_SINK (sink));
838
839   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
840 }
841
842 /**
843  * gst_base_sink_is_qos_enabled:
844  * @sink: the sink
845  *
846  * Checks if @sink is currently configured to send Quality-of-Service events
847  * upstream.
848  *
849  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
850  *
851  * Since: 0.10.5
852  */
853 gboolean
854 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
855 {
856   gboolean res;
857
858   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
859
860   res = g_atomic_int_get (&sink->priv->qos_enabled);
861
862   return res;
863 }
864
865 /**
866  * gst_base_sink_set_async_enabled:
867  * @sink: the sink
868  * @enabled: the new async value.
869  *
870  * Configures @sink to perform all state changes asynchronusly. When async is
871  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
872  * preroll buffer. This feature is usefull if the sink does not synchronize
873  * against the clock or when it is dealing with sparse streams.
874  *
875  * Since: 0.10.15
876  */
877 void
878 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
879 {
880   g_return_if_fail (GST_IS_BASE_SINK (sink));
881
882   GST_PAD_PREROLL_LOCK (sink->sinkpad);
883   g_atomic_int_set (&sink->priv->async_enabled, enabled);
884   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
885   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
886 }
887
888 /**
889  * gst_base_sink_is_async_enabled:
890  * @sink: the sink
891  *
892  * Checks if @sink is currently configured to perform asynchronous state
893  * changes to PAUSED.
894  *
895  * Returns: TRUE if the sink is configured to perform asynchronous state
896  * changes.
897  *
898  * Since: 0.10.15
899  */
900 gboolean
901 gst_base_sink_is_async_enabled (GstBaseSink * sink)
902 {
903   gboolean res;
904
905   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
906
907   res = g_atomic_int_get (&sink->priv->async_enabled);
908
909   return res;
910 }
911
912 /**
913  * gst_base_sink_set_ts_offset:
914  * @sink: the sink
915  * @offset: the new offset
916  *
917  * Adjust the synchronisation of @sink with @offset. A negative value will
918  * render buffers earlier than their timestamp. A positive value will delay
919  * rendering. This function can be used to fix playback of badly timestamped
920  * buffers.
921  *
922  * Since: 0.10.15
923  */
924 void
925 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
926 {
927   g_return_if_fail (GST_IS_BASE_SINK (sink));
928
929   GST_OBJECT_LOCK (sink);
930   sink->priv->ts_offset = offset;
931   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
932   GST_OBJECT_UNLOCK (sink);
933 }
934
935 /**
936  * gst_base_sink_get_ts_offset:
937  * @sink: the sink
938  *
939  * Get the synchronisation offset of @sink.
940  *
941  * Returns: The synchronisation offset.
942  *
943  * Since: 0.10.15
944  */
945 GstClockTimeDiff
946 gst_base_sink_get_ts_offset (GstBaseSink * sink)
947 {
948   GstClockTimeDiff res;
949
950   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
951
952   GST_OBJECT_LOCK (sink);
953   res = sink->priv->ts_offset;
954   GST_OBJECT_UNLOCK (sink);
955
956   return res;
957 }
958
959 /**
960  * gst_base_sink_get_last_buffer:
961  * @sink: the sink
962  *
963  * Get the last buffer that arrived in the sink and was used for preroll or for
964  * rendering. This property can be used to generate thumbnails.
965  *
966  * The #GstCaps on the buffer can be used to determine the type of the buffer.
967  *
968  * Free-function: gst_buffer_unref
969  *
970  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
971  *     This function returns NULL when no buffer has arrived in the sink yet
972  *     or when the sink is not in PAUSED or PLAYING.
973  *
974  * Since: 0.10.15
975  */
976 GstBuffer *
977 gst_base_sink_get_last_buffer (GstBaseSink * sink)
978 {
979   GstBuffer *res;
980
981   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
982
983   GST_OBJECT_LOCK (sink);
984   if ((res = sink->priv->last_buffer))
985     gst_buffer_ref (res);
986   GST_OBJECT_UNLOCK (sink);
987
988   return res;
989 }
990
991 /* with OBJECT_LOCK */
992 static void
993 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
994 {
995   GstBuffer *old;
996
997   old = sink->priv->last_buffer;
998   if (G_LIKELY (old != buffer)) {
999     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1000     if (G_LIKELY (buffer))
1001       gst_buffer_ref (buffer);
1002     sink->priv->last_buffer = buffer;
1003   } else {
1004     old = NULL;
1005   }
1006   /* avoid unreffing with the lock because cleanup code might want to take the
1007    * lock too */
1008   if (G_LIKELY (old)) {
1009     GST_OBJECT_UNLOCK (sink);
1010     gst_buffer_unref (old);
1011     GST_OBJECT_LOCK (sink);
1012   }
1013 }
1014
1015 static void
1016 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1017 {
1018   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1019     return;
1020
1021   GST_OBJECT_LOCK (sink);
1022   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1023   GST_OBJECT_UNLOCK (sink);
1024 }
1025
1026 /**
1027  * gst_base_sink_set_last_buffer_enabled:
1028  * @sink: the sink
1029  * @enabled: the new enable-last-buffer value.
1030  *
1031  * Configures @sink to store the last received buffer in the last-buffer
1032  * property.
1033  *
1034  * Since: 0.10.30
1035  */
1036 void
1037 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1038 {
1039   g_return_if_fail (GST_IS_BASE_SINK (sink));
1040
1041   /* Only take lock if we change the value */
1042   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1043           !enabled, enabled) && !enabled) {
1044     GST_OBJECT_LOCK (sink);
1045     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1046     GST_OBJECT_UNLOCK (sink);
1047   }
1048 }
1049
1050 /**
1051  * gst_base_sink_is_last_buffer_enabled:
1052  * @sink: the sink
1053  *
1054  * Checks if @sink is currently configured to store the last received buffer in
1055  * the last-buffer property.
1056  *
1057  * Returns: TRUE if the sink is configured to store the last received buffer.
1058  *
1059  * Since: 0.10.30
1060  */
1061 gboolean
1062 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1063 {
1064   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1065
1066   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1067 }
1068
1069 /**
1070  * gst_base_sink_get_latency:
1071  * @sink: the sink
1072  *
1073  * Get the currently configured latency.
1074  *
1075  * Returns: The configured latency.
1076  *
1077  * Since: 0.10.12
1078  */
1079 GstClockTime
1080 gst_base_sink_get_latency (GstBaseSink * sink)
1081 {
1082   GstClockTime res;
1083
1084   GST_OBJECT_LOCK (sink);
1085   res = sink->priv->latency;
1086   GST_OBJECT_UNLOCK (sink);
1087
1088   return res;
1089 }
1090
1091 /**
1092  * gst_base_sink_query_latency:
1093  * @sink: the sink
1094  * @live: (out) (allow-none): if the sink is live
1095  * @upstream_live: (out) (allow-none): if an upstream element is live
1096  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1097  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1098  *
1099  * Query the sink for the latency parameters. The latency will be queried from
1100  * the upstream elements. @live will be TRUE if @sink is configured to
1101  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1102  * element is live.
1103  *
1104  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1105  * for the latency introduced by the upstream elements by setting the
1106  * @min_latency to a strictly possitive value.
1107  *
1108  * This function is mostly used by subclasses.
1109  *
1110  * Returns: TRUE if the query succeeded.
1111  *
1112  * Since: 0.10.12
1113  */
1114 gboolean
1115 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1116     gboolean * upstream_live, GstClockTime * min_latency,
1117     GstClockTime * max_latency)
1118 {
1119   gboolean l, us_live, res, have_latency;
1120   GstClockTime min, max, render_delay;
1121   GstQuery *query;
1122   GstClockTime us_min, us_max;
1123
1124   /* we are live when we sync to the clock */
1125   GST_OBJECT_LOCK (sink);
1126   l = sink->sync;
1127   have_latency = sink->priv->have_latency;
1128   render_delay = sink->priv->render_delay;
1129   GST_OBJECT_UNLOCK (sink);
1130
1131   /* assume no latency */
1132   min = 0;
1133   max = -1;
1134   us_live = FALSE;
1135
1136   if (have_latency) {
1137     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1138     /* we are ready for a latency query this is when we preroll or when we are
1139      * not async. */
1140     query = gst_query_new_latency ();
1141
1142     /* ask the peer for the latency */
1143     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1144       /* get upstream min and max latency */
1145       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1146
1147       if (us_live) {
1148         /* upstream live, use its latency, subclasses should use these
1149          * values to create the complete latency. */
1150         min = us_min;
1151         max = us_max;
1152       }
1153       if (l) {
1154         /* we need to add the render delay if we are live */
1155         if (min != -1)
1156           min += render_delay;
1157         if (max != -1)
1158           max += render_delay;
1159       }
1160     }
1161     gst_query_unref (query);
1162   } else {
1163     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1164     res = FALSE;
1165   }
1166
1167   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1168   if (!res) {
1169     if (!l) {
1170       res = TRUE;
1171       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1172     } else {
1173       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1174     }
1175   }
1176
1177   if (res) {
1178     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1179         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1180         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1181
1182     if (live)
1183       *live = l;
1184     if (upstream_live)
1185       *upstream_live = us_live;
1186     if (min_latency)
1187       *min_latency = min;
1188     if (max_latency)
1189       *max_latency = max;
1190   }
1191   return res;
1192 }
1193
1194 /**
1195  * gst_base_sink_set_render_delay:
1196  * @sink: a #GstBaseSink
1197  * @delay: the new delay
1198  *
1199  * Set the render delay in @sink to @delay. The render delay is the time
1200  * between actual rendering of a buffer and its synchronisation time. Some
1201  * devices might delay media rendering which can be compensated for with this
1202  * function.
1203  *
1204  * After calling this function, this sink will report additional latency and
1205  * other sinks will adjust their latency to delay the rendering of their media.
1206  *
1207  * This function is usually called by subclasses.
1208  *
1209  * Since: 0.10.21
1210  */
1211 void
1212 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1213 {
1214   GstClockTime old_render_delay;
1215
1216   g_return_if_fail (GST_IS_BASE_SINK (sink));
1217
1218   GST_OBJECT_LOCK (sink);
1219   old_render_delay = sink->priv->render_delay;
1220   sink->priv->render_delay = delay;
1221   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1222       GST_TIME_ARGS (delay));
1223   GST_OBJECT_UNLOCK (sink);
1224
1225   if (delay != old_render_delay) {
1226     GST_DEBUG_OBJECT (sink, "posting latency changed");
1227     gst_element_post_message (GST_ELEMENT_CAST (sink),
1228         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1229   }
1230 }
1231
1232 /**
1233  * gst_base_sink_get_render_delay:
1234  * @sink: a #GstBaseSink
1235  *
1236  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1237  * information about the render delay.
1238  *
1239  * Returns: the render delay of @sink.
1240  *
1241  * Since: 0.10.21
1242  */
1243 GstClockTime
1244 gst_base_sink_get_render_delay (GstBaseSink * sink)
1245 {
1246   GstClockTimeDiff res;
1247
1248   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1249
1250   GST_OBJECT_LOCK (sink);
1251   res = sink->priv->render_delay;
1252   GST_OBJECT_UNLOCK (sink);
1253
1254   return res;
1255 }
1256
1257 /**
1258  * gst_base_sink_set_blocksize:
1259  * @sink: a #GstBaseSink
1260  * @blocksize: the blocksize in bytes
1261  *
1262  * Set the number of bytes that the sink will pull when it is operating in pull
1263  * mode.
1264  *
1265  * Since: 0.10.22
1266  */
1267 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1268 void
1269 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1270 {
1271   g_return_if_fail (GST_IS_BASE_SINK (sink));
1272
1273   GST_OBJECT_LOCK (sink);
1274   sink->priv->blocksize = blocksize;
1275   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1276   GST_OBJECT_UNLOCK (sink);
1277 }
1278
1279 /**
1280  * gst_base_sink_get_blocksize:
1281  * @sink: a #GstBaseSink
1282  *
1283  * Get the number of bytes that the sink will pull when it is operating in pull
1284  * mode.
1285  *
1286  * Returns: the number of bytes @sink will pull in pull mode.
1287  *
1288  * Since: 0.10.22
1289  */
1290 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1291 guint
1292 gst_base_sink_get_blocksize (GstBaseSink * sink)
1293 {
1294   guint res;
1295
1296   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1297
1298   GST_OBJECT_LOCK (sink);
1299   res = sink->priv->blocksize;
1300   GST_OBJECT_UNLOCK (sink);
1301
1302   return res;
1303 }
1304
1305 /**
1306  * gst_base_sink_set_throttle_time:
1307  * @sink: a #GstBaseSink
1308  * @throttle: the throttle time in nanoseconds
1309  *
1310  * Set the time that will be inserted between rendered buffers. This
1311  * can be used to control the maximum buffers per second that the sink
1312  * will render. 
1313  *
1314  * Since: 0.10.33
1315  */
1316 void
1317 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1318 {
1319   g_return_if_fail (GST_IS_BASE_SINK (sink));
1320
1321   GST_OBJECT_LOCK (sink);
1322   sink->priv->throttle_time = throttle;
1323   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1324   GST_OBJECT_UNLOCK (sink);
1325 }
1326
1327 /**
1328  * gst_base_sink_get_throttle_time:
1329  * @sink: a #GstBaseSink
1330  *
1331  * Get the time that will be inserted between frames to control the 
1332  * maximum buffers per second.
1333  *
1334  * Returns: the number of nanoseconds @sink will put between frames.
1335  *
1336  * Since: 0.10.33
1337  */
1338 guint64
1339 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1340 {
1341   guint64 res;
1342
1343   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1344
1345   GST_OBJECT_LOCK (sink);
1346   res = sink->priv->throttle_time;
1347   GST_OBJECT_UNLOCK (sink);
1348
1349   return res;
1350 }
1351
1352 static void
1353 gst_base_sink_set_property (GObject * object, guint prop_id,
1354     const GValue * value, GParamSpec * pspec)
1355 {
1356   GstBaseSink *sink = GST_BASE_SINK (object);
1357
1358   switch (prop_id) {
1359     case PROP_PREROLL_QUEUE_LEN:
1360       /* preroll lock necessary to serialize with finish_preroll */
1361       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1362       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1363       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1364       break;
1365     case PROP_SYNC:
1366       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1367       break;
1368     case PROP_MAX_LATENESS:
1369       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1370       break;
1371     case PROP_QOS:
1372       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1373       break;
1374     case PROP_ASYNC:
1375       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1376       break;
1377     case PROP_TS_OFFSET:
1378       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1379       break;
1380     case PROP_BLOCKSIZE:
1381       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1382       break;
1383     case PROP_RENDER_DELAY:
1384       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1385       break;
1386     case PROP_ENABLE_LAST_BUFFER:
1387       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1388       break;
1389     case PROP_THROTTLE_TIME:
1390       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1391       break;
1392     default:
1393       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1394       break;
1395   }
1396 }
1397
1398 static void
1399 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1400     GParamSpec * pspec)
1401 {
1402   GstBaseSink *sink = GST_BASE_SINK (object);
1403
1404   switch (prop_id) {
1405     case PROP_PREROLL_QUEUE_LEN:
1406       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1407       break;
1408     case PROP_SYNC:
1409       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1410       break;
1411     case PROP_MAX_LATENESS:
1412       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1413       break;
1414     case PROP_QOS:
1415       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1416       break;
1417     case PROP_ASYNC:
1418       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1419       break;
1420     case PROP_TS_OFFSET:
1421       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1422       break;
1423     case PROP_LAST_BUFFER:
1424       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1425       break;
1426     case PROP_ENABLE_LAST_BUFFER:
1427       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1428       break;
1429     case PROP_BLOCKSIZE:
1430       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1431       break;
1432     case PROP_RENDER_DELAY:
1433       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1434       break;
1435     case PROP_THROTTLE_TIME:
1436       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1437       break;
1438     default:
1439       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1440       break;
1441   }
1442 }
1443
1444
1445 static GstCaps *
1446 gst_base_sink_get_caps (GstBaseSink * sink)
1447 {
1448   return NULL;
1449 }
1450
1451 static gboolean
1452 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1453 {
1454   return TRUE;
1455 }
1456
1457 static GstFlowReturn
1458 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1459     GstCaps * caps, GstBuffer ** buf)
1460 {
1461   *buf = NULL;
1462   return GST_FLOW_OK;
1463 }
1464
1465 /* with PREROLL_LOCK, STREAM_LOCK */
1466 static void
1467 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1468 {
1469   GstMiniObject *obj;
1470
1471   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1472   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1473     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1474     gst_mini_object_unref (obj);
1475   }
1476   /* we can't have EOS anymore now */
1477   basesink->eos = FALSE;
1478   basesink->priv->received_eos = FALSE;
1479   basesink->have_preroll = FALSE;
1480   basesink->priv->step_unlock = FALSE;
1481   basesink->eos_queued = FALSE;
1482   basesink->preroll_queued = 0;
1483   basesink->buffers_queued = 0;
1484   basesink->events_queued = 0;
1485   /* can't report latency anymore until we preroll again */
1486   if (basesink->priv->async_enabled) {
1487     GST_OBJECT_LOCK (basesink);
1488     basesink->priv->have_latency = FALSE;
1489     GST_OBJECT_UNLOCK (basesink);
1490   }
1491   /* and signal any waiters now */
1492   GST_PAD_PREROLL_SIGNAL (pad);
1493 }
1494
1495 /* with STREAM_LOCK, configures given segment with the event information. */
1496 static void
1497 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1498     GstEvent * event, GstSegment * segment)
1499 {
1500   gboolean update;
1501   gdouble rate, arate;
1502   GstFormat format;
1503   gint64 start;
1504   gint64 stop;
1505   gint64 time;
1506
1507   /* the newsegment event is needed to bring the buffer timestamps to the
1508    * stream time and to drop samples outside of the playback segment. */
1509   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1510       &start, &stop, &time);
1511
1512   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1513    * We protect with the OBJECT_LOCK so that we can use the values to
1514    * safely answer a POSITION query. */
1515   GST_OBJECT_LOCK (basesink);
1516   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1517       stop, time);
1518
1519   if (format == GST_FORMAT_TIME) {
1520     GST_DEBUG_OBJECT (basesink,
1521         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1522         "format GST_FORMAT_TIME, "
1523         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1524         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1525         update, rate, arate, GST_TIME_ARGS (segment->start),
1526         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1527         GST_TIME_ARGS (segment->accum));
1528   } else {
1529     GST_DEBUG_OBJECT (basesink,
1530         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1531         "format %d, "
1532         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1533         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1534         segment->format, segment->start, segment->stop, segment->time,
1535         segment->accum);
1536   }
1537   GST_OBJECT_UNLOCK (basesink);
1538 }
1539
1540 /* with PREROLL_LOCK, STREAM_LOCK */
1541 static gboolean
1542 gst_base_sink_commit_state (GstBaseSink * basesink)
1543 {
1544   /* commit state and proceed to next pending state */
1545   GstState current, next, pending, post_pending;
1546   gboolean post_paused = FALSE;
1547   gboolean post_async_done = FALSE;
1548   gboolean post_playing = FALSE;
1549
1550   /* we are certainly not playing async anymore now */
1551   basesink->playing_async = FALSE;
1552
1553   GST_OBJECT_LOCK (basesink);
1554   current = GST_STATE (basesink);
1555   next = GST_STATE_NEXT (basesink);
1556   pending = GST_STATE_PENDING (basesink);
1557   post_pending = pending;
1558
1559   switch (pending) {
1560     case GST_STATE_PLAYING:
1561     {
1562       GstBaseSinkClass *bclass;
1563       GstStateChangeReturn ret;
1564
1565       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1566
1567       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1568
1569       basesink->need_preroll = FALSE;
1570       post_async_done = TRUE;
1571       basesink->priv->commited = TRUE;
1572       post_playing = TRUE;
1573       /* post PAUSED too when we were READY */
1574       if (current == GST_STATE_READY) {
1575         post_paused = TRUE;
1576       }
1577
1578       /* make sure we notify the subclass of async playing */
1579       if (bclass->async_play) {
1580         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1581         ret = bclass->async_play (basesink);
1582         if (ret == GST_STATE_CHANGE_FAILURE)
1583           goto async_failed;
1584       }
1585       break;
1586     }
1587     case GST_STATE_PAUSED:
1588       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1589       post_paused = TRUE;
1590       post_async_done = TRUE;
1591       basesink->priv->commited = TRUE;
1592       post_pending = GST_STATE_VOID_PENDING;
1593       break;
1594     case GST_STATE_READY:
1595     case GST_STATE_NULL:
1596       goto stopping;
1597     case GST_STATE_VOID_PENDING:
1598       goto nothing_pending;
1599     default:
1600       break;
1601   }
1602
1603   /* we can report latency queries now */
1604   basesink->priv->have_latency = TRUE;
1605
1606   GST_STATE (basesink) = pending;
1607   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1608   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1609   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1610   GST_OBJECT_UNLOCK (basesink);
1611
1612   if (post_paused) {
1613     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1614     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1615         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1616             current, next, post_pending));
1617   }
1618   if (post_async_done) {
1619     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1620     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1621         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1622   }
1623   if (post_playing) {
1624     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1625     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1626         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1627             next, pending, GST_STATE_VOID_PENDING));
1628   }
1629
1630   GST_STATE_BROADCAST (basesink);
1631
1632   return TRUE;
1633
1634 nothing_pending:
1635   {
1636     /* Depending on the state, set our vars. We get in this situation when the
1637      * state change function got a change to update the state vars before the
1638      * streaming thread did. This is fine but we need to make sure that we
1639      * update the need_preroll var since it was TRUE when we got here and might
1640      * become FALSE if we got to PLAYING. */
1641     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1642         gst_element_state_get_name (current));
1643     switch (current) {
1644       case GST_STATE_PLAYING:
1645         basesink->need_preroll = FALSE;
1646         break;
1647       case GST_STATE_PAUSED:
1648         basesink->need_preroll = TRUE;
1649         break;
1650       default:
1651         basesink->need_preroll = FALSE;
1652         basesink->flushing = TRUE;
1653         break;
1654     }
1655     /* we can report latency queries now */
1656     basesink->priv->have_latency = TRUE;
1657     GST_OBJECT_UNLOCK (basesink);
1658     return TRUE;
1659   }
1660 stopping:
1661   {
1662     /* app is going to READY */
1663     GST_DEBUG_OBJECT (basesink, "stopping");
1664     basesink->need_preroll = FALSE;
1665     basesink->flushing = TRUE;
1666     GST_OBJECT_UNLOCK (basesink);
1667     return FALSE;
1668   }
1669 async_failed:
1670   {
1671     GST_DEBUG_OBJECT (basesink, "async commit failed");
1672     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1673     GST_OBJECT_UNLOCK (basesink);
1674     return FALSE;
1675   }
1676 }
1677
1678 static void
1679 start_stepping (GstBaseSink * sink, GstSegment * segment,
1680     GstStepInfo * pending, GstStepInfo * current)
1681 {
1682   gint64 end;
1683   GstMessage *message;
1684
1685   GST_DEBUG_OBJECT (sink, "update pending step");
1686
1687   GST_OBJECT_LOCK (sink);
1688   memcpy (current, pending, sizeof (GstStepInfo));
1689   pending->valid = FALSE;
1690   GST_OBJECT_UNLOCK (sink);
1691
1692   /* post message first */
1693   message =
1694       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1695       current->amount, current->rate, current->flush, current->intermediate);
1696   gst_message_set_seqnum (message, current->seqnum);
1697   gst_element_post_message (GST_ELEMENT (sink), message);
1698
1699   /* get the running time of where we paused and remember it */
1700   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1701   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1702
1703   /* set the new rate for the remainder of the segment */
1704   current->start_rate = segment->rate;
1705   segment->rate *= current->rate;
1706   segment->abs_rate = ABS (segment->rate);
1707
1708   /* save values */
1709   if (segment->rate > 0.0)
1710     current->start_stop = segment->stop;
1711   else
1712     current->start_start = segment->start;
1713
1714   if (current->format == GST_FORMAT_TIME) {
1715     end = current->start + current->amount;
1716     if (!current->flush) {
1717       /* update the segment clipping regions for non-flushing seeks */
1718       if (segment->rate > 0.0) {
1719         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1720         segment->last_stop = segment->stop;
1721       } else {
1722         gint64 position;
1723
1724         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1725         segment->time = position;
1726         segment->start = position;
1727         segment->last_stop = position;
1728       }
1729     }
1730   }
1731
1732   GST_DEBUG_OBJECT (sink,
1733       "segment now rate %lf, applied rate %lf, "
1734       "format GST_FORMAT_TIME, "
1735       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1736       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1737       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1738       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1739       GST_TIME_ARGS (segment->accum));
1740
1741   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1742       GST_TIME_ARGS (current->start));
1743
1744   if (current->amount == -1) {
1745     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1746     current->valid = FALSE;
1747   } else {
1748     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1749         "rate: %f", current->amount, gst_format_get_name (current->format),
1750         current->rate);
1751   }
1752 }
1753
1754 static void
1755 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1756     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1757 {
1758   gint64 stop, position;
1759   GstMessage *message;
1760
1761   GST_DEBUG_OBJECT (sink, "step complete");
1762
1763   if (segment->rate > 0.0)
1764     stop = rstart;
1765   else
1766     stop = rstop;
1767
1768   GST_DEBUG_OBJECT (sink,
1769       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1770
1771   if (stop == -1)
1772     current->duration = current->position;
1773   else
1774     current->duration = stop - current->start;
1775
1776   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1777       GST_TIME_ARGS (current->duration));
1778
1779   position = current->start + current->duration;
1780
1781   /* now move the segment to the new running time */
1782   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1783
1784   if (current->flush) {
1785     /* and remove the accumulated time we flushed, start time did not change */
1786     segment->accum = current->start;
1787   } else {
1788     /* start time is now the stepped position */
1789     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1790   }
1791
1792   /* restore the previous rate */
1793   segment->rate = current->start_rate;
1794   segment->abs_rate = ABS (segment->rate);
1795
1796   if (segment->rate > 0.0)
1797     segment->stop = current->start_stop;
1798   else
1799     segment->start = current->start_start;
1800
1801   /* the clip segment is used for position report in paused... */
1802   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1803
1804   /* post the step done when we know the stepped duration in TIME */
1805   message =
1806       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1807       current->amount, current->rate, current->flush, current->intermediate,
1808       current->duration, eos);
1809   gst_message_set_seqnum (message, current->seqnum);
1810   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1811
1812   if (!current->intermediate)
1813     sink->need_preroll = current->need_preroll;
1814
1815   /* and the current step info finished and becomes invalid */
1816   current->valid = FALSE;
1817 }
1818
1819 static gboolean
1820 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1821     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1822     gint64 * rstop)
1823 {
1824   gboolean step_end = FALSE;
1825
1826   /* see if we need to skip this buffer because of stepping */
1827   switch (current->format) {
1828     case GST_FORMAT_TIME:
1829     {
1830       guint64 end;
1831       gint64 first, last;
1832
1833       if (segment->rate > 0.0) {
1834         if (segment->stop == *cstop)
1835           *rstop = *rstart + current->amount;
1836
1837         first = *rstart;
1838         last = *rstop;
1839       } else {
1840         if (segment->start == *cstart)
1841           *rstart = *rstop + current->amount;
1842
1843         first = *rstop;
1844         last = *rstart;
1845       }
1846
1847       end = current->start + current->amount;
1848       current->position = first - current->start;
1849
1850       if (G_UNLIKELY (segment->abs_rate != 1.0))
1851         current->position /= segment->abs_rate;
1852
1853       GST_DEBUG_OBJECT (sink,
1854           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1855           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1856       GST_DEBUG_OBJECT (sink,
1857           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1858           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1859           GST_TIME_ARGS (last - current->start),
1860           GST_TIME_ARGS (current->amount));
1861
1862       if ((current->flush && current->position >= current->amount)
1863           || last >= end) {
1864         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1865         step_end = TRUE;
1866         if (segment->rate > 0.0) {
1867           *rstart = end;
1868           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1869         } else {
1870           *rstop = end;
1871           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1872         }
1873       }
1874       GST_DEBUG_OBJECT (sink,
1875           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1876           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1877       GST_DEBUG_OBJECT (sink,
1878           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1879           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1880       break;
1881     }
1882     case GST_FORMAT_BUFFERS:
1883       GST_DEBUG_OBJECT (sink,
1884           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1885           current->position, current->amount);
1886
1887       if (current->position < current->amount) {
1888         current->position++;
1889       } else {
1890         step_end = TRUE;
1891       }
1892       break;
1893     case GST_FORMAT_DEFAULT:
1894     default:
1895       GST_DEBUG_OBJECT (sink,
1896           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1897           current->position, current->amount);
1898       break;
1899   }
1900   return step_end;
1901 }
1902
1903 /* with STREAM_LOCK, PREROLL_LOCK
1904  *
1905  * Returns TRUE if the object needs synchronisation and takes therefore
1906  * part in prerolling.
1907  *
1908  * rsstart/rsstop contain the start/stop in stream time.
1909  * rrstart/rrstop contain the start/stop in running time.
1910  */
1911 static gboolean
1912 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1913     GstClockTime * rsstart, GstClockTime * rsstop,
1914     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1915     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1916     gboolean * step_end, guint8 obj_type)
1917 {
1918   GstBaseSinkClass *bclass;
1919   GstBuffer *buffer;
1920   GstClockTime start, stop;     /* raw start/stop timestamps */
1921   gint64 cstart, cstop;         /* clipped raw timestamps */
1922   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1923   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1924   GstFormat format;
1925   GstBaseSinkPrivate *priv;
1926   gboolean eos;
1927
1928   priv = basesink->priv;
1929
1930   /* start with nothing */
1931   start = stop = GST_CLOCK_TIME_NONE;
1932
1933   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1934     GstEvent *event = GST_EVENT_CAST (obj);
1935
1936     switch (GST_EVENT_TYPE (event)) {
1937         /* EOS event needs syncing */
1938       case GST_EVENT_EOS:
1939       {
1940         if (basesink->segment.rate >= 0.0) {
1941           sstart = sstop = priv->current_sstop;
1942           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1943             /* we have not seen a buffer yet, use the segment values */
1944             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1945                 basesink->segment.format, basesink->segment.stop);
1946           }
1947         } else {
1948           sstart = sstop = priv->current_sstart;
1949           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1950             /* we have not seen a buffer yet, use the segment values */
1951             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1952                 basesink->segment.format, basesink->segment.start);
1953           }
1954         }
1955
1956         rstart = rstop = priv->eos_rtime;
1957         *do_sync = rstart != -1;
1958         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1959             GST_TIME_ARGS (rstart));
1960         /* if we are stepping, we end now */
1961         *step_end = step->valid;
1962         eos = TRUE;
1963         goto eos_done;
1964       }
1965       default:
1966         /* other events do not need syncing */
1967         /* FIXME, maybe NEWSEGMENT might need synchronisation
1968          * since the POSITION query depends on accumulated times and
1969          * we cannot accumulate the current segment before the previous
1970          * one completed.
1971          */
1972         return FALSE;
1973     }
1974   }
1975
1976   eos = FALSE;
1977
1978 again:
1979   /* else do buffer sync code */
1980   buffer = GST_BUFFER_CAST (obj);
1981
1982   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1983
1984   /* just get the times to see if we need syncing, if the start returns -1 we
1985    * don't sync. */
1986   if (bclass->get_times)
1987     bclass->get_times (basesink, buffer, &start, &stop);
1988
1989   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1990     /* we don't need to sync but we still want to get the timestamps for
1991      * tracking the position */
1992     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1993     *do_sync = FALSE;
1994   } else {
1995     *do_sync = TRUE;
1996   }
1997
1998   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1999       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2000       GST_TIME_ARGS (stop), *do_sync);
2001
2002   /* collect segment and format for code clarity */
2003   format = segment->format;
2004
2005   /* no timestamp clipping if we did not get a TIME segment format */
2006   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2007     cstart = start;
2008     cstop = stop;
2009     /* do running and stream time in TIME format */
2010     format = GST_FORMAT_TIME;
2011     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2012     goto do_times;
2013   }
2014
2015   /* clip, only when we know about time */
2016   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2017               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2018     if (step->valid) {
2019       GST_DEBUG_OBJECT (basesink, "step out of segment");
2020       /* when we are stepping, pretend we're at the end of the segment */
2021       if (segment->rate > 0.0) {
2022         cstart = segment->stop;
2023         cstop = segment->stop;
2024       } else {
2025         cstart = segment->start;
2026         cstop = segment->start;
2027       }
2028       goto do_times;
2029     }
2030     goto out_of_segment;
2031   }
2032
2033   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2034     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2035         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2036         GST_TIME_ARGS (cstop));
2037   }
2038
2039   /* set last stop position */
2040   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2041     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2042   else
2043     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2044
2045 do_times:
2046   rstart = gst_segment_to_running_time (segment, format, cstart);
2047   rstop = gst_segment_to_running_time (segment, format, cstop);
2048
2049   if (G_UNLIKELY (step->valid)) {
2050     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2051                 &rstart, &rstop))) {
2052       /* step is still busy, we discard data when we are flushing */
2053       *stepped = step->flush;
2054       GST_DEBUG_OBJECT (basesink, "stepping busy");
2055     }
2056   }
2057   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2058    * upstream is behaving very badly */
2059   sstart = gst_segment_to_stream_time (segment, format, cstart);
2060   sstop = gst_segment_to_stream_time (segment, format, cstop);
2061
2062 eos_done:
2063   /* eos_done label only called when doing EOS, we also stop stepping then */
2064   if (*step_end && step->flush) {
2065     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2066     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2067     *step_end = FALSE;
2068     /* re-determine running start times for adjusted segment
2069      * (which has a flushed amount of running/accumulated time removed) */
2070     if (!GST_IS_EVENT (obj)) {
2071       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2072       goto again;
2073     }
2074   }
2075
2076   /* save times */
2077   *rsstart = sstart;
2078   *rsstop = sstop;
2079   *rrstart = rstart;
2080   *rrstop = rstop;
2081
2082   /* buffers and EOS always need syncing and preroll */
2083   return TRUE;
2084
2085   /* special cases */
2086 out_of_segment:
2087   {
2088     /* we usually clip in the chain function already but stepping could cause
2089      * the segment to be updated later. we return FALSE so that we don't try
2090      * to sync on it. */
2091     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2092     return FALSE;
2093   }
2094 }
2095
2096 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2097  * adjust a timestamp with the latency and timestamp offset. This function does
2098  * not adjust for the render delay. */
2099 static GstClockTime
2100 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2101 {
2102   GstClockTimeDiff ts_offset;
2103
2104   /* don't do anything funny with invalid timestamps */
2105   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2106     return time;
2107
2108   time += basesink->priv->latency;
2109
2110   /* apply offset, be carefull for underflows */
2111   ts_offset = basesink->priv->ts_offset;
2112   if (ts_offset < 0) {
2113     ts_offset = -ts_offset;
2114     if (ts_offset < time)
2115       time -= ts_offset;
2116     else
2117       time = 0;
2118   } else
2119     time += ts_offset;
2120
2121   /* subtract the render delay again, which was included in the latency */
2122   if (time > basesink->priv->render_delay)
2123     time -= basesink->priv->render_delay;
2124   else
2125     time = 0;
2126
2127   return time;
2128 }
2129
2130 /**
2131  * gst_base_sink_wait_clock:
2132  * @sink: the sink
2133  * @time: the running_time to be reached
2134  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2135  *
2136  * This function will block until @time is reached. It is usually called by
2137  * subclasses that use their own internal synchronisation.
2138  *
2139  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2140  * returned. Likewise, if synchronisation is disabled in the element or there
2141  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2142  *
2143  * This function should only be called with the PREROLL_LOCK held, like when
2144  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2145  * receiving a buffer in
2146  * the #GstBaseSinkClass.render() vmethod.
2147  *
2148  * The @time argument should be the running_time of when this method should
2149  * return and is not adjusted with any latency or offset configured in the
2150  * sink.
2151  *
2152  * Since: 0.10.20
2153  *
2154  * Returns: #GstClockReturn
2155  */
2156 GstClockReturn
2157 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2158     GstClockTimeDiff * jitter)
2159 {
2160   GstClockReturn ret;
2161   GstClock *clock;
2162   GstClockTime base_time;
2163
2164   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2165     goto invalid_time;
2166
2167   GST_OBJECT_LOCK (sink);
2168   if (G_UNLIKELY (!sink->sync))
2169     goto no_sync;
2170
2171   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2172     goto no_clock;
2173
2174   base_time = GST_ELEMENT_CAST (sink)->base_time;
2175   GST_LOG_OBJECT (sink,
2176       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2177       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2178
2179   /* add base_time to running_time to get the time against the clock */
2180   time += base_time;
2181
2182   /* Re-use existing clockid if available */
2183   if (G_LIKELY (sink->priv->cached_clock_id != NULL)) {
2184     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2185             time)) {
2186       gst_clock_id_unref (sink->priv->cached_clock_id);
2187       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2188     }
2189   } else
2190     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2191   GST_OBJECT_UNLOCK (sink);
2192
2193   /* A blocking wait is performed on the clock. We save the ClockID
2194    * so we can unlock the entry at any time. While we are blocking, we
2195    * release the PREROLL_LOCK so that other threads can interrupt the
2196    * entry. */
2197   sink->clock_id = sink->priv->cached_clock_id;
2198   /* release the preroll lock while waiting */
2199   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2200
2201   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2202
2203   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2204   sink->clock_id = NULL;
2205
2206   return ret;
2207
2208   /* no syncing needed */
2209 invalid_time:
2210   {
2211     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2212     return GST_CLOCK_BADTIME;
2213   }
2214 no_sync:
2215   {
2216     GST_DEBUG_OBJECT (sink, "sync disabled");
2217     GST_OBJECT_UNLOCK (sink);
2218     return GST_CLOCK_BADTIME;
2219   }
2220 no_clock:
2221   {
2222     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2223     GST_OBJECT_UNLOCK (sink);
2224     return GST_CLOCK_BADTIME;
2225   }
2226 }
2227
2228 /**
2229  * gst_base_sink_wait_preroll:
2230  * @sink: the sink
2231  *
2232  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2233  * against the clock it must unblock when going from PLAYING to the PAUSED state
2234  * and call this method before continuing to render the remaining data.
2235  *
2236  * This function will block until a state change to PLAYING happens (in which
2237  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2238  * to a state change to READY or a FLUSH event (in which case this function
2239  * returns #GST_FLOW_WRONG_STATE).
2240  *
2241  * This function should only be called with the PREROLL_LOCK held, like in the
2242  * render function.
2243  *
2244  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2245  * continue. Any other return value should be returned from the render vmethod.
2246  *
2247  * Since: 0.10.11
2248  */
2249 GstFlowReturn
2250 gst_base_sink_wait_preroll (GstBaseSink * sink)
2251 {
2252   sink->have_preroll = TRUE;
2253   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2254   /* block until the state changes, or we get a flush, or something */
2255   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2256   sink->have_preroll = FALSE;
2257   if (G_UNLIKELY (sink->flushing))
2258     goto stopping;
2259   if (G_UNLIKELY (sink->priv->step_unlock))
2260     goto step_unlocked;
2261   GST_DEBUG_OBJECT (sink, "continue after preroll");
2262
2263   return GST_FLOW_OK;
2264
2265   /* ERRORS */
2266 stopping:
2267   {
2268     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2269     return GST_FLOW_WRONG_STATE;
2270   }
2271 step_unlocked:
2272   {
2273     sink->priv->step_unlock = FALSE;
2274     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2275     return GST_FLOW_STEP;
2276   }
2277 }
2278
2279 static inline guint8
2280 get_object_type (GstMiniObject * obj)
2281 {
2282   guint8 obj_type;
2283
2284   if (G_LIKELY (GST_IS_BUFFER (obj)))
2285     obj_type = _PR_IS_BUFFER;
2286   else if (GST_IS_EVENT (obj))
2287     obj_type = _PR_IS_EVENT;
2288   else if (GST_IS_BUFFER_LIST (obj))
2289     obj_type = _PR_IS_BUFFERLIST;
2290   else
2291     obj_type = _PR_IS_NOTHING;
2292
2293   return obj_type;
2294 }
2295
2296 /**
2297  * gst_base_sink_do_preroll:
2298  * @sink: the sink
2299  * @obj: (transfer none): the mini object that caused the preroll
2300  *
2301  * If the @sink spawns its own thread for pulling buffers from upstream it
2302  * should call this method after it has pulled a buffer. If the element needed
2303  * to preroll, this function will perform the preroll and will then block
2304  * until the element state is changed.
2305  *
2306  * This function should be called with the PREROLL_LOCK held.
2307  *
2308  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2309  * continue. Any other return value should be returned from the render vmethod.
2310  *
2311  * Since: 0.10.22
2312  */
2313 GstFlowReturn
2314 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2315 {
2316   GstFlowReturn ret;
2317
2318   while (G_UNLIKELY (sink->need_preroll)) {
2319     guint8 obj_type;
2320     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2321
2322     obj_type = get_object_type (obj);
2323
2324     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2325     if (ret != GST_FLOW_OK)
2326       goto preroll_failed;
2327
2328     /* need to recheck here because the commit state could have
2329      * made us not need the preroll anymore */
2330     if (G_LIKELY (sink->need_preroll)) {
2331       /* block until the state changes, or we get a flush, or something */
2332       ret = gst_base_sink_wait_preroll (sink);
2333       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2334         goto preroll_failed;
2335     }
2336   }
2337   return GST_FLOW_OK;
2338
2339   /* ERRORS */
2340 preroll_failed:
2341   {
2342     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2343     return ret;
2344   }
2345 }
2346
2347 /**
2348  * gst_base_sink_wait_eos:
2349  * @sink: the sink
2350  * @time: the running_time to be reached
2351  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2352  *
2353  * This function will block until @time is reached. It is usually called by
2354  * subclasses that use their own internal synchronisation but want to let the
2355  * EOS be handled by the base class.
2356  *
2357  * This function should only be called with the PREROLL_LOCK held, like when
2358  * receiving an EOS event in the ::event vmethod.
2359  *
2360  * The @time argument should be the running_time of when the EOS should happen
2361  * and will be adjusted with any latency and offset configured in the sink.
2362  *
2363  * Returns: #GstFlowReturn
2364  *
2365  * Since: 0.10.15
2366  */
2367 GstFlowReturn
2368 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2369     GstClockTimeDiff * jitter)
2370 {
2371   GstClockReturn status;
2372   GstFlowReturn ret;
2373
2374   do {
2375     GstClockTime stime;
2376
2377     GST_DEBUG_OBJECT (sink, "checking preroll");
2378
2379     /* first wait for the playing state before we can continue */
2380     while (G_UNLIKELY (sink->need_preroll)) {
2381       ret = gst_base_sink_wait_preroll (sink);
2382       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2383         goto flushing;
2384     }
2385
2386     /* preroll done, we can sync since we are in PLAYING now. */
2387     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2388         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2389
2390     /* compensate for latency and ts_offset. We don't adjust for render delay
2391      * because we don't interact with the device on EOS normally. */
2392     stime = gst_base_sink_adjust_time (sink, time);
2393
2394     /* wait for the clock, this can be interrupted because we got shut down or
2395      * we PAUSED. */
2396     status = gst_base_sink_wait_clock (sink, stime, jitter);
2397
2398     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2399
2400     /* invalid time, no clock or sync disabled, just continue then */
2401     if (status == GST_CLOCK_BADTIME)
2402       break;
2403
2404     /* waiting could have been interrupted and we can be flushing now */
2405     if (G_UNLIKELY (sink->flushing))
2406       goto flushing;
2407
2408     /* retry if we got unscheduled, which means we did not reach the timeout
2409      * yet. if some other error occures, we continue. */
2410   } while (status == GST_CLOCK_UNSCHEDULED);
2411
2412   GST_DEBUG_OBJECT (sink, "end of stream");
2413
2414   return GST_FLOW_OK;
2415
2416   /* ERRORS */
2417 flushing:
2418   {
2419     GST_DEBUG_OBJECT (sink, "we are flushing");
2420     return GST_FLOW_WRONG_STATE;
2421   }
2422 }
2423
2424 /* with STREAM_LOCK, PREROLL_LOCK
2425  *
2426  * Make sure we are in PLAYING and synchronize an object to the clock.
2427  *
2428  * If we need preroll, we are not in PLAYING. We try to commit the state
2429  * if needed and then block if we still are not PLAYING.
2430  *
2431  * We start waiting on the clock in PLAYING. If we got interrupted, we
2432  * immediatly try to re-preroll.
2433  *
2434  * Some objects do not need synchronisation (most events) and so this function
2435  * immediatly returns GST_FLOW_OK.
2436  *
2437  * for objects that arrive later than max-lateness to be synchronized to the
2438  * clock have the @late boolean set to TRUE.
2439  *
2440  * This function keeps a running average of the jitter (the diff between the
2441  * clock time and the requested sync time). The jitter is negative for
2442  * objects that arrive in time and positive for late buffers.
2443  *
2444  * does not take ownership of obj.
2445  */
2446 static GstFlowReturn
2447 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2448     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2449 {
2450   GstClockTimeDiff jitter = 0;
2451   gboolean syncable;
2452   GstClockReturn status = GST_CLOCK_OK;
2453   GstClockTime rstart, rstop, sstart, sstop, stime;
2454   gboolean do_sync;
2455   GstBaseSinkPrivate *priv;
2456   GstFlowReturn ret;
2457   GstStepInfo *current, *pending;
2458   gboolean stepped;
2459
2460   priv = basesink->priv;
2461
2462 do_step:
2463   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2464   do_sync = TRUE;
2465   stepped = FALSE;
2466
2467   priv->current_rstart = GST_CLOCK_TIME_NONE;
2468
2469   /* get stepping info */
2470   current = &priv->current_step;
2471   pending = &priv->pending_step;
2472
2473   /* get timing information for this object against the render segment */
2474   syncable = gst_base_sink_get_sync_times (basesink, obj,
2475       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2476       current, step_end, obj_type);
2477
2478   if (G_UNLIKELY (stepped))
2479     goto step_skipped;
2480
2481   /* a syncable object needs to participate in preroll and
2482    * clocking. All buffers and EOS are syncable. */
2483   if (G_UNLIKELY (!syncable))
2484     goto not_syncable;
2485
2486   /* store timing info for current object */
2487   priv->current_rstart = rstart;
2488   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2489
2490   /* save sync time for eos when the previous object needed sync */
2491   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2492
2493 again:
2494   /* first do preroll, this makes sure we commit our state
2495    * to PAUSED and can continue to PLAYING. We cannot perform
2496    * any clock sync in PAUSED because there is no clock. */
2497   ret = gst_base_sink_do_preroll (basesink, obj);
2498   if (G_UNLIKELY (ret != GST_FLOW_OK))
2499     goto preroll_failed;
2500
2501   /* update the segment with a pending step if the current one is invalid and we
2502    * have a new pending one. We only accept new step updates after a preroll */
2503   if (G_UNLIKELY (pending->valid && !current->valid)) {
2504     start_stepping (basesink, &basesink->segment, pending, current);
2505     goto do_step;
2506   }
2507
2508   /* After rendering we store the position of the last buffer so that we can use
2509    * it to report the position. We need to take the lock here. */
2510   GST_OBJECT_LOCK (basesink);
2511   priv->current_sstart = sstart;
2512   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2513   GST_OBJECT_UNLOCK (basesink);
2514
2515   if (!do_sync)
2516     goto done;
2517
2518   /* adjust for latency */
2519   stime = gst_base_sink_adjust_time (basesink, rstart);
2520
2521   /* adjust for render-delay, avoid underflows */
2522   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2523     if (stime > priv->render_delay)
2524       stime -= priv->render_delay;
2525     else
2526       stime = 0;
2527   }
2528
2529   /* preroll done, we can sync since we are in PLAYING now. */
2530   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2531       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2532       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2533
2534   /* This function will return immediatly if start == -1, no clock
2535    * or sync is disabled with GST_CLOCK_BADTIME. */
2536   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2537
2538   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2539       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2540
2541   /* invalid time, no clock or sync disabled, just render */
2542   if (status == GST_CLOCK_BADTIME)
2543     goto done;
2544
2545   /* waiting could have been interrupted and we can be flushing now */
2546   if (G_UNLIKELY (basesink->flushing))
2547     goto flushing;
2548
2549   /* check for unlocked by a state change, we are not flushing so
2550    * we can try to preroll on the current buffer. */
2551   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2552     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2553     priv->call_preroll = TRUE;
2554     goto again;
2555   }
2556
2557   /* successful syncing done, record observation */
2558   priv->current_jitter = jitter;
2559
2560   /* check if the object should be dropped */
2561   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2562       status, jitter);
2563
2564 done:
2565   return GST_FLOW_OK;
2566
2567   /* ERRORS */
2568 step_skipped:
2569   {
2570     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2571     *late = TRUE;
2572     return GST_FLOW_OK;
2573   }
2574 not_syncable:
2575   {
2576     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2577     return GST_FLOW_OK;
2578   }
2579 flushing:
2580   {
2581     GST_DEBUG_OBJECT (basesink, "we are flushing");
2582     return GST_FLOW_WRONG_STATE;
2583   }
2584 preroll_failed:
2585   {
2586     GST_DEBUG_OBJECT (basesink, "preroll failed");
2587     *step_end = FALSE;
2588     return ret;
2589   }
2590 }
2591
2592 static gboolean
2593 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2594     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2595 {
2596   GstEvent *event;
2597   gboolean res;
2598
2599   /* generate Quality-of-Service event */
2600   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2601       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2602       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2603
2604   event = gst_event_new_qos_full (type, proportion, diff, time);
2605
2606   /* send upstream */
2607   res = gst_pad_push_event (basesink->sinkpad, event);
2608
2609   return res;
2610 }
2611
2612 static void
2613 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2614 {
2615   GstBaseSinkPrivate *priv;
2616   GstClockTime start, stop;
2617   GstClockTimeDiff jitter;
2618   GstClockTime pt, entered, left;
2619   GstClockTime duration;
2620   gdouble rate;
2621
2622   priv = sink->priv;
2623
2624   start = priv->current_rstart;
2625
2626   if (priv->current_step.valid)
2627     return;
2628
2629   /* if Quality-of-Service disabled, do nothing */
2630   if (!g_atomic_int_get (&priv->qos_enabled) ||
2631       !GST_CLOCK_TIME_IS_VALID (start))
2632     return;
2633
2634   stop = priv->current_rstop;
2635   jitter = priv->current_jitter;
2636
2637   if (jitter < 0) {
2638     /* this is the time the buffer entered the sink */
2639     if (start < -jitter)
2640       entered = 0;
2641     else
2642       entered = start + jitter;
2643     left = start;
2644   } else {
2645     /* this is the time the buffer entered the sink */
2646     entered = start + jitter;
2647     /* this is the time the buffer left the sink */
2648     left = start + jitter;
2649   }
2650
2651   /* calculate duration of the buffer */
2652   if (GST_CLOCK_TIME_IS_VALID (stop))
2653     duration = stop - start;
2654   else
2655     duration = GST_CLOCK_TIME_NONE;
2656
2657   /* if we have the time when the last buffer left us, calculate
2658    * processing time */
2659   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2660     if (entered > priv->last_left) {
2661       pt = entered - priv->last_left;
2662     } else {
2663       pt = 0;
2664     }
2665   } else {
2666     pt = priv->avg_pt;
2667   }
2668
2669   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2670       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2671       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2672       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2673       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2674       jitter);
2675
2676   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2677       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2678       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2679       priv->avg_rate);
2680
2681   /* collect running averages. for first observations, we copy the
2682    * values */
2683   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2684     priv->avg_duration = duration;
2685   else
2686     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2687
2688   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2689     priv->avg_pt = pt;
2690   else
2691     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2692
2693   if (priv->avg_duration != 0)
2694     rate =
2695         gst_guint64_to_gdouble (priv->avg_pt) /
2696         gst_guint64_to_gdouble (priv->avg_duration);
2697   else
2698     rate = 0.0;
2699
2700   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2701     if (dropped || priv->avg_rate < 0.0) {
2702       priv->avg_rate = rate;
2703     } else {
2704       if (rate > 1.0)
2705         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2706       else
2707         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2708     }
2709   }
2710
2711   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2712       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2713       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2714       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2715
2716
2717   if (priv->avg_rate >= 0.0) {
2718     GstQOSType type;
2719     GstClockTimeDiff diff;
2720
2721     /* if we have a valid rate, start sending QoS messages */
2722     if (priv->current_jitter < 0) {
2723       /* make sure we never go below 0 when adding the jitter to the
2724        * timestamp. */
2725       if (priv->current_rstart < -priv->current_jitter)
2726         priv->current_jitter = -priv->current_rstart;
2727     }
2728
2729     if (priv->throttle_time > 0) {
2730       diff = priv->throttle_time;
2731       type = GST_QOS_TYPE_THROTTLE;
2732     } else {
2733       diff = priv->current_jitter;
2734       if (diff <= 0)
2735         type = GST_QOS_TYPE_OVERFLOW;
2736       else
2737         type = GST_QOS_TYPE_UNDERFLOW;
2738     }
2739
2740     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2741         diff);
2742   }
2743
2744   /* record when this buffer will leave us */
2745   priv->last_left = left;
2746 }
2747
2748 /* reset all qos measuring */
2749 static void
2750 gst_base_sink_reset_qos (GstBaseSink * sink)
2751 {
2752   GstBaseSinkPrivate *priv;
2753
2754   priv = sink->priv;
2755
2756   priv->last_in_time = GST_CLOCK_TIME_NONE;
2757   priv->last_left = GST_CLOCK_TIME_NONE;
2758   priv->avg_duration = GST_CLOCK_TIME_NONE;
2759   priv->avg_pt = GST_CLOCK_TIME_NONE;
2760   priv->avg_rate = -1.0;
2761   priv->avg_render = GST_CLOCK_TIME_NONE;
2762   priv->rendered = 0;
2763   priv->dropped = 0;
2764
2765 }
2766
2767 /* Checks if the object was scheduled too late.
2768  *
2769  * start/stop contain the raw timestamp start and stop values
2770  * of the object.
2771  *
2772  * status and jitter contain the return values from the clock wait.
2773  *
2774  * returns TRUE if the buffer was too late.
2775  */
2776 static gboolean
2777 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2778     GstClockTime start, GstClockTime stop,
2779     GstClockReturn status, GstClockTimeDiff jitter)
2780 {
2781   gboolean late;
2782   gint64 max_lateness;
2783   GstBaseSinkPrivate *priv;
2784
2785   priv = basesink->priv;
2786
2787   late = FALSE;
2788
2789   /* only for objects that were too late */
2790   if (G_LIKELY (status != GST_CLOCK_EARLY))
2791     goto in_time;
2792
2793   max_lateness = basesink->abidata.ABI.max_lateness;
2794
2795   /* check if frame dropping is enabled */
2796   if (max_lateness == -1)
2797     goto no_drop;
2798
2799   /* only check for buffers */
2800   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2801     goto not_buffer;
2802
2803   /* can't do check if we don't have a timestamp */
2804   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (start)))
2805     goto no_timestamp;
2806
2807   /* we can add a valid stop time */
2808   if (GST_CLOCK_TIME_IS_VALID (stop))
2809     max_lateness += stop;
2810   else
2811     max_lateness += start;
2812
2813   /* if the jitter bigger than duration and lateness we are too late */
2814   if ((late = start + jitter > max_lateness)) {
2815     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2816         "buffer is too late %" GST_TIME_FORMAT
2817         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2818         GST_TIME_ARGS (max_lateness));
2819     /* !!emergency!!, if we did not receive anything valid for more than a
2820      * second, render it anyway so the user sees something */
2821     if (GST_CLOCK_TIME_IS_VALID (priv->last_in_time) &&
2822         start - priv->last_in_time > GST_SECOND) {
2823       late = FALSE;
2824       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2825           (_("A lot of buffers are being dropped.")),
2826           ("There may be a timestamping problem, or this computer is too slow."));
2827       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2828           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2829           GST_TIME_ARGS (priv->last_in_time));
2830     }
2831   }
2832
2833 done:
2834   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_in_time)) {
2835     priv->last_in_time = start;
2836   }
2837   return late;
2838
2839   /* all is fine */
2840 in_time:
2841   {
2842     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2843     goto done;
2844   }
2845 no_drop:
2846   {
2847     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2848     goto done;
2849   }
2850 not_buffer:
2851   {
2852     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2853     return FALSE;
2854   }
2855 no_timestamp:
2856   {
2857     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2858     return FALSE;
2859   }
2860 }
2861
2862 /* called before and after calling the render vmethod. It keeps track of how
2863  * much time was spent in the render method and is used to check if we are
2864  * flooded */
2865 static void
2866 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2867 {
2868   GstBaseSinkPrivate *priv;
2869
2870   priv = basesink->priv;
2871
2872   if (start) {
2873     priv->start = gst_util_get_timestamp ();
2874   } else {
2875     GstClockTime elapsed;
2876
2877     priv->stop = gst_util_get_timestamp ();
2878
2879     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2880
2881     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2882       priv->avg_render = elapsed;
2883     else
2884       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2885
2886     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2887         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2888   }
2889 }
2890
2891 /* with STREAM_LOCK, PREROLL_LOCK,
2892  *
2893  * Synchronize the object on the clock and then render it.
2894  *
2895  * takes ownership of obj.
2896  */
2897 static GstFlowReturn
2898 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2899     guint8 obj_type, gpointer obj)
2900 {
2901   GstFlowReturn ret;
2902   GstBaseSinkClass *bclass;
2903   gboolean late, step_end;
2904   gpointer sync_obj;
2905   GstBaseSinkPrivate *priv;
2906
2907   priv = basesink->priv;
2908
2909   if (OBJ_IS_BUFFERLIST (obj_type)) {
2910     /*
2911      * If buffer list, use the first group buffer within the list
2912      * for syncing
2913      */
2914     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2915     g_assert (NULL != sync_obj);
2916   } else {
2917     sync_obj = obj;
2918   }
2919
2920 again:
2921   late = FALSE;
2922   step_end = FALSE;
2923
2924   /* synchronize this object, non syncable objects return OK
2925    * immediatly. */
2926   ret =
2927       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2928       obj_type);
2929   if (G_UNLIKELY (ret != GST_FLOW_OK))
2930     goto sync_failed;
2931
2932   /* and now render, event or buffer/buffer list. */
2933   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2934     /* drop late buffers unconditionally, let's hope it's unlikely */
2935     if (G_UNLIKELY (late))
2936       goto dropped;
2937
2938     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2939
2940     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2941             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2942       gint do_qos;
2943
2944       /* read once, to get same value before and after */
2945       do_qos = g_atomic_int_get (&priv->qos_enabled);
2946
2947       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2948
2949       /* record rendering time for QoS and stats */
2950       if (do_qos)
2951         gst_base_sink_do_render_stats (basesink, TRUE);
2952
2953       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2954         GstBuffer *buf;
2955
2956         /* For buffer lists do not set last buffer. Creating buffer
2957          * with meaningful data can be done only with memcpy which will
2958          * significantly affect performance */
2959         buf = GST_BUFFER_CAST (obj);
2960         gst_base_sink_set_last_buffer (basesink, buf);
2961
2962         ret = bclass->render (basesink, buf);
2963       } else {
2964         GstBufferList *buflist;
2965
2966         buflist = GST_BUFFER_LIST_CAST (obj);
2967
2968         ret = bclass->render_list (basesink, buflist);
2969       }
2970
2971       if (do_qos)
2972         gst_base_sink_do_render_stats (basesink, FALSE);
2973
2974       if (ret == GST_FLOW_STEP)
2975         goto again;
2976
2977       if (G_UNLIKELY (basesink->flushing))
2978         goto flushing;
2979
2980       priv->rendered++;
2981     }
2982   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
2983     GstEvent *event = GST_EVENT_CAST (obj);
2984     gboolean event_res = TRUE;
2985     GstEventType type;
2986
2987     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2988
2989     type = GST_EVENT_TYPE (event);
2990
2991     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2992         gst_event_type_get_name (type));
2993
2994     if (bclass->event)
2995       event_res = bclass->event (basesink, event);
2996
2997     /* when we get here we could be flushing again when the event handler calls
2998      * _wait_eos(). We have to ignore this object in that case. */
2999     if (G_UNLIKELY (basesink->flushing))
3000       goto flushing;
3001
3002     if (G_LIKELY (event_res)) {
3003       guint32 seqnum;
3004
3005       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3006       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3007
3008       switch (type) {
3009         case GST_EVENT_EOS:
3010         {
3011           GstMessage *message;
3012
3013           /* the EOS event is completely handled so we mark
3014            * ourselves as being in the EOS state. eos is also
3015            * protected by the object lock so we can read it when
3016            * answering the POSITION query. */
3017           GST_OBJECT_LOCK (basesink);
3018           basesink->eos = TRUE;
3019           GST_OBJECT_UNLOCK (basesink);
3020
3021           /* ok, now we can post the message */
3022           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3023
3024           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3025           gst_message_set_seqnum (message, seqnum);
3026           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3027           break;
3028         }
3029         case GST_EVENT_NEWSEGMENT:
3030           /* configure the segment */
3031           gst_base_sink_configure_segment (basesink, pad, event,
3032               &basesink->segment);
3033           break;
3034         case GST_EVENT_SINK_MESSAGE:{
3035           GstMessage *msg = NULL;
3036
3037           gst_event_parse_sink_message (event, &msg);
3038
3039           if (msg)
3040             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3041         }
3042         default:
3043           break;
3044       }
3045     }
3046   } else {
3047     g_return_val_if_reached (GST_FLOW_ERROR);
3048   }
3049
3050 done:
3051   if (step_end) {
3052     /* the step ended, check if we need to activate a new step */
3053     GST_DEBUG_OBJECT (basesink, "step ended");
3054     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3055         priv->current_rstart, priv->current_rstop, basesink->eos);
3056     goto again;
3057   }
3058
3059   gst_base_sink_perform_qos (basesink, late);
3060
3061   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3062   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3063   return ret;
3064
3065   /* ERRORS */
3066 sync_failed:
3067   {
3068     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3069     goto done;
3070   }
3071 dropped:
3072   {
3073     priv->dropped++;
3074     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3075
3076     if (g_atomic_int_get (&priv->qos_enabled)) {
3077       GstMessage *qos_msg;
3078       GstClockTime timestamp, duration;
3079
3080       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3081       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3082
3083       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3084           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3085           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3086           GST_TIME_ARGS (priv->current_rstart),
3087           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3088           GST_TIME_ARGS (duration));
3089       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3090           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3091           priv->rendered, priv->dropped);
3092
3093       qos_msg =
3094           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3095           priv->current_rstart, priv->current_sstart, timestamp, duration);
3096       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3097           1000000);
3098       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3099           priv->dropped);
3100       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3101     }
3102     goto done;
3103   }
3104 flushing:
3105   {
3106     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3107     gst_mini_object_unref (obj);
3108     return GST_FLOW_WRONG_STATE;
3109   }
3110 }
3111
3112 /* with STREAM_LOCK, PREROLL_LOCK
3113  *
3114  * Perform preroll on the given object. For buffers this means
3115  * calling the preroll subclass method.
3116  * If that succeeds, the state will be commited.
3117  *
3118  * function does not take ownership of obj.
3119  */
3120 static GstFlowReturn
3121 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3122     GstMiniObject * obj)
3123 {
3124   GstFlowReturn ret;
3125
3126   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3127
3128   /* if it's a buffer, we need to call the preroll method */
3129   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3130     GstBaseSinkClass *bclass;
3131     GstBuffer *buf;
3132     GstClockTime timestamp;
3133
3134     if (OBJ_IS_BUFFERLIST (obj_type)) {
3135       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3136       g_assert (NULL != buf);
3137     } else {
3138       buf = GST_BUFFER_CAST (obj);
3139     }
3140
3141     timestamp = GST_BUFFER_TIMESTAMP (buf);
3142
3143     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3144         GST_TIME_ARGS (timestamp));
3145
3146     /*
3147      * For buffer lists do not set last buffer. Creating buffer
3148      * with meaningful data can be done only with memcpy which will
3149      * significantly affect performance
3150      */
3151     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3152       gst_base_sink_set_last_buffer (basesink, buf);
3153     }
3154
3155     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3156     if (bclass->preroll)
3157       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3158         goto preroll_failed;
3159
3160     basesink->priv->call_preroll = FALSE;
3161   }
3162
3163   /* commit state */
3164   if (G_LIKELY (basesink->playing_async)) {
3165     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3166       goto stopping;
3167   }
3168
3169   return GST_FLOW_OK;
3170
3171   /* ERRORS */
3172 preroll_failed:
3173   {
3174     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3175     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3176     return ret;
3177   }
3178 stopping:
3179   {
3180     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3181     return GST_FLOW_WRONG_STATE;
3182   }
3183 }
3184
3185 /* with STREAM_LOCK, PREROLL_LOCK
3186  *
3187  * Queue an object for rendering.
3188  * The first prerollable object queued will complete the preroll. If the
3189  * preroll queue if filled, we render all the objects in the queue.
3190  *
3191  * This function takes ownership of the object.
3192  */
3193 static GstFlowReturn
3194 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3195     guint8 obj_type, gpointer obj, gboolean prerollable)
3196 {
3197   GstFlowReturn ret = GST_FLOW_OK;
3198   gint length;
3199   GQueue *q;
3200
3201   if (G_UNLIKELY (basesink->need_preroll)) {
3202     if (G_LIKELY (prerollable))
3203       basesink->preroll_queued++;
3204
3205     length = basesink->preroll_queued;
3206
3207     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3208
3209     /* first prerollable item needs to finish the preroll */
3210     if (length == 1) {
3211       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3212       if (G_UNLIKELY (ret != GST_FLOW_OK))
3213         goto preroll_failed;
3214     }
3215     /* need to recheck if we need preroll, commmit state during preroll
3216      * could have made us not need more preroll. */
3217     if (G_UNLIKELY (basesink->need_preroll)) {
3218       /* see if we can render now, if we can't add the object to the preroll
3219        * queue. */
3220       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3221         goto more_preroll;
3222     }
3223   }
3224   /* we can start rendering (or blocking) the queued object
3225    * if any. */
3226   q = basesink->preroll_queue;
3227   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3228     GstMiniObject *o;
3229     guint8 ot;
3230
3231     o = g_queue_pop_head (q);
3232     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3233
3234     ot = get_object_type (o);
3235
3236     /* do something with the return value */
3237     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3238     if (ret != GST_FLOW_OK)
3239       goto dequeue_failed;
3240   }
3241
3242   /* now render the object */
3243   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3244   basesink->preroll_queued = 0;
3245
3246   return ret;
3247
3248   /* special cases */
3249 preroll_failed:
3250   {
3251     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3252         gst_flow_get_name (ret));
3253     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3254     return ret;
3255   }
3256 more_preroll:
3257   {
3258     /* add object to the queue and return */
3259     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3260         length, basesink->preroll_queue_max_len);
3261     g_queue_push_tail (basesink->preroll_queue, obj);
3262     return GST_FLOW_OK;
3263   }
3264 dequeue_failed:
3265   {
3266     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3267         gst_flow_get_name (ret));
3268     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3269     return ret;
3270   }
3271 }
3272
3273 /* with STREAM_LOCK
3274  *
3275  * This function grabs the PREROLL_LOCK and adds the object to
3276  * the queue.
3277  *
3278  * This function takes ownership of obj.
3279  *
3280  * Note: Only GstEvent seem to be passed to this private method
3281  */
3282 static GstFlowReturn
3283 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3284     GstMiniObject * obj, gboolean prerollable)
3285 {
3286   GstFlowReturn ret;
3287
3288   GST_PAD_PREROLL_LOCK (pad);
3289   if (G_UNLIKELY (basesink->flushing))
3290     goto flushing;
3291
3292   if (G_UNLIKELY (basesink->priv->received_eos))
3293     goto was_eos;
3294
3295   ret =
3296       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3297       prerollable);
3298   GST_PAD_PREROLL_UNLOCK (pad);
3299
3300   return ret;
3301
3302   /* ERRORS */
3303 flushing:
3304   {
3305     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3306     GST_PAD_PREROLL_UNLOCK (pad);
3307     gst_mini_object_unref (obj);
3308     return GST_FLOW_WRONG_STATE;
3309   }
3310 was_eos:
3311   {
3312     GST_DEBUG_OBJECT (basesink,
3313         "we are EOS, dropping object, return UNEXPECTED");
3314     GST_PAD_PREROLL_UNLOCK (pad);
3315     gst_mini_object_unref (obj);
3316     return GST_FLOW_UNEXPECTED;
3317   }
3318 }
3319
3320 static void
3321 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3322 {
3323   /* make sure we are not blocked on the clock also clear any pending
3324    * eos state. */
3325   gst_base_sink_set_flushing (basesink, pad, TRUE);
3326
3327   /* we grab the stream lock but that is not needed since setting the
3328    * sink to flushing would make sure no state commit is being done
3329    * anymore */
3330   GST_PAD_STREAM_LOCK (pad);
3331   gst_base_sink_reset_qos (basesink);
3332   /* and we need to commit our state again on the next
3333    * prerolled buffer */
3334   basesink->playing_async = TRUE;
3335   if (basesink->priv->async_enabled) {
3336     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3337   } else {
3338     basesink->priv->have_latency = TRUE;
3339   }
3340   gst_base_sink_set_last_buffer (basesink, NULL);
3341   GST_PAD_STREAM_UNLOCK (pad);
3342 }
3343
3344 static void
3345 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3346 {
3347   /* unset flushing so we can accept new data, this also flushes out any EOS
3348    * event. */
3349   gst_base_sink_set_flushing (basesink, pad, FALSE);
3350
3351   /* for position reporting */
3352   GST_OBJECT_LOCK (basesink);
3353   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3354   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3355   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3356   basesink->priv->call_preroll = TRUE;
3357   basesink->priv->current_step.valid = FALSE;
3358   basesink->priv->pending_step.valid = FALSE;
3359   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3360     /* we need new segment info after the flush. */
3361     basesink->have_newsegment = FALSE;
3362     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3363     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3364   }
3365   GST_OBJECT_UNLOCK (basesink);
3366 }
3367
3368 static gboolean
3369 gst_base_sink_event (GstPad * pad, GstEvent * event)
3370 {
3371   GstBaseSink *basesink;
3372   gboolean result = TRUE;
3373   GstBaseSinkClass *bclass;
3374
3375   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3376
3377   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3378
3379   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3380       event);
3381
3382   switch (GST_EVENT_TYPE (event)) {
3383     case GST_EVENT_EOS:
3384     {
3385       GstFlowReturn ret;
3386
3387       GST_PAD_PREROLL_LOCK (pad);
3388       if (G_UNLIKELY (basesink->flushing))
3389         goto flushing;
3390
3391       if (G_UNLIKELY (basesink->priv->received_eos)) {
3392         /* we can't accept anything when we are EOS */
3393         result = FALSE;
3394         gst_event_unref (event);
3395       } else {
3396         /* we set the received EOS flag here so that we can use it when testing if
3397          * we are prerolled and to refuse more buffers. */
3398         basesink->priv->received_eos = TRUE;
3399
3400         /* EOS is a prerollable object, we call the unlocked version because it
3401          * does not check the received_eos flag. */
3402         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3403             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3404         if (G_UNLIKELY (ret != GST_FLOW_OK))
3405           result = FALSE;
3406       }
3407       GST_PAD_PREROLL_UNLOCK (pad);
3408       break;
3409     }
3410     case GST_EVENT_NEWSEGMENT:
3411     {
3412       GstFlowReturn ret;
3413       gboolean update;
3414
3415       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3416
3417       GST_PAD_PREROLL_LOCK (pad);
3418       if (G_UNLIKELY (basesink->flushing))
3419         goto flushing;
3420
3421       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3422           NULL, NULL);
3423
3424       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3425         /* we can't accept anything when we are EOS */
3426         result = FALSE;
3427         gst_event_unref (event);
3428       } else {
3429         /* the new segment is a non prerollable item and does not block anything,
3430          * we need to configure the current clipping segment and insert the event
3431          * in the queue to serialize it with the buffers for rendering. */
3432         gst_base_sink_configure_segment (basesink, pad, event,
3433             basesink->abidata.ABI.clip_segment);
3434
3435         ret =
3436             gst_base_sink_queue_object_unlocked (basesink, pad,
3437             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3438         if (G_UNLIKELY (ret != GST_FLOW_OK))
3439           result = FALSE;
3440         else {
3441           GST_OBJECT_LOCK (basesink);
3442           basesink->have_newsegment = TRUE;
3443           GST_OBJECT_UNLOCK (basesink);
3444         }
3445       }
3446       GST_PAD_PREROLL_UNLOCK (pad);
3447       break;
3448     }
3449     case GST_EVENT_FLUSH_START:
3450       if (bclass->event)
3451         bclass->event (basesink, event);
3452
3453       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3454
3455       gst_base_sink_flush_start (basesink, pad);
3456
3457       gst_event_unref (event);
3458       break;
3459     case GST_EVENT_FLUSH_STOP:
3460       if (bclass->event)
3461         bclass->event (basesink, event);
3462
3463       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3464
3465       gst_base_sink_flush_stop (basesink, pad);
3466
3467       gst_event_unref (event);
3468       break;
3469     default:
3470       /* other events are sent to queue or subclass depending on if they
3471        * are serialized. */
3472       if (GST_EVENT_IS_SERIALIZED (event)) {
3473         gst_base_sink_queue_object (basesink, pad,
3474             GST_MINI_OBJECT_CAST (event), FALSE);
3475       } else {
3476         if (bclass->event)
3477           bclass->event (basesink, event);
3478         gst_event_unref (event);
3479       }
3480       break;
3481   }
3482 done:
3483   gst_object_unref (basesink);
3484
3485   return result;
3486
3487   /* ERRORS */
3488 flushing:
3489   {
3490     GST_DEBUG_OBJECT (basesink, "we are flushing");
3491     GST_PAD_PREROLL_UNLOCK (pad);
3492     result = FALSE;
3493     gst_event_unref (event);
3494     goto done;
3495   }
3496 }
3497
3498 /* default implementation to calculate the start and end
3499  * timestamps on a buffer, subclasses can override
3500  */
3501 static void
3502 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3503     GstClockTime * start, GstClockTime * end)
3504 {
3505   GstClockTime timestamp, duration;
3506
3507   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3508   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3509
3510     /* get duration to calculate end time */
3511     duration = GST_BUFFER_DURATION (buffer);
3512     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3513       *end = timestamp + duration;
3514     }
3515     *start = timestamp;
3516   }
3517 }
3518
3519 /* must be called with PREROLL_LOCK */
3520 static gboolean
3521 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3522 {
3523   gboolean is_prerolled, res;
3524
3525   /* we have 2 cases where the PREROLL_LOCK is released:
3526    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3527    *  2) we are syncing on the clock
3528    */
3529   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3530   res = !is_prerolled;
3531
3532   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3533       basesink->have_preroll, basesink->priv->received_eos, res);
3534
3535   return res;
3536 }
3537
3538 /* with STREAM_LOCK, PREROLL_LOCK
3539  *
3540  * Takes a buffer and compare the timestamps with the last segment.
3541  * If the buffer falls outside of the segment boundaries, drop it.
3542  * Else queue the buffer for preroll and rendering.
3543  *
3544  * This function takes ownership of the buffer.
3545  */
3546 static GstFlowReturn
3547 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3548     guint8 obj_type, gpointer obj)
3549 {
3550   GstBaseSinkClass *bclass;
3551   GstFlowReturn result;
3552   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3553   GstSegment *clip_segment;
3554   GstBuffer *time_buf;
3555
3556   if (G_UNLIKELY (basesink->flushing))
3557     goto flushing;
3558
3559   if (G_UNLIKELY (basesink->priv->received_eos))
3560     goto was_eos;
3561
3562   if (OBJ_IS_BUFFERLIST (obj_type)) {
3563     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3564     g_assert (NULL != time_buf);
3565   } else {
3566     time_buf = GST_BUFFER_CAST (obj);
3567   }
3568
3569   /* for code clarity */
3570   clip_segment = basesink->abidata.ABI.clip_segment;
3571
3572   if (G_UNLIKELY (!basesink->have_newsegment)) {
3573     gboolean sync;
3574
3575     sync = gst_base_sink_get_sync (basesink);
3576     if (sync) {
3577       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3578           (_("Internal data flow problem.")),
3579           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3580     }
3581
3582     /* this means this sink will assume timestamps start from 0 */
3583     GST_OBJECT_LOCK (basesink);
3584     clip_segment->start = 0;
3585     clip_segment->stop = -1;
3586     basesink->segment.start = 0;
3587     basesink->segment.stop = -1;
3588     basesink->have_newsegment = TRUE;
3589     GST_OBJECT_UNLOCK (basesink);
3590   }
3591
3592   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3593
3594   /* check if the buffer needs to be dropped, we first ask the subclass for the
3595    * start and end */
3596   if (bclass->get_times)
3597     bclass->get_times (basesink, time_buf, &start, &end);
3598
3599   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3600     /* if the subclass does not want sync, we use our own values so that we at
3601      * least clip the buffer to the segment */
3602     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3603   }
3604
3605   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3606       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3607
3608   /* a dropped buffer does not participate in anything */
3609   if (GST_CLOCK_TIME_IS_VALID (start) &&
3610       (clip_segment->format == GST_FORMAT_TIME)) {
3611     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3612                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3613       goto out_of_segment;
3614   }
3615
3616   /* now we can process the buffer in the queue, this function takes ownership
3617    * of the buffer */
3618   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3619       obj_type, obj, TRUE);
3620   return result;
3621
3622   /* ERRORS */
3623 flushing:
3624   {
3625     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3626     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3627     return GST_FLOW_WRONG_STATE;
3628   }
3629 was_eos:
3630   {
3631     GST_DEBUG_OBJECT (basesink,
3632         "we are EOS, dropping object, return UNEXPECTED");
3633     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3634     return GST_FLOW_UNEXPECTED;
3635   }
3636 out_of_segment:
3637   {
3638     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3639     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3640     return GST_FLOW_OK;
3641   }
3642 }
3643
3644 /* with STREAM_LOCK
3645  */
3646 static GstFlowReturn
3647 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3648     guint8 obj_type, gpointer obj)
3649 {
3650   GstFlowReturn result;
3651
3652   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3653     goto wrong_mode;
3654
3655   GST_PAD_PREROLL_LOCK (pad);
3656   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3657   GST_PAD_PREROLL_UNLOCK (pad);
3658
3659 done:
3660   return result;
3661
3662   /* ERRORS */
3663 wrong_mode:
3664   {
3665     GST_OBJECT_LOCK (pad);
3666     GST_WARNING_OBJECT (basesink,
3667         "Push on pad %s:%s, but it was not activated in push mode",
3668         GST_DEBUG_PAD_NAME (pad));
3669     GST_OBJECT_UNLOCK (pad);
3670     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3671     /* we don't post an error message this will signal to the peer
3672      * pushing that EOS is reached. */
3673     result = GST_FLOW_UNEXPECTED;
3674     goto done;
3675   }
3676 }
3677
3678 static GstFlowReturn
3679 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3680 {
3681   GstBaseSink *basesink;
3682
3683   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3684
3685   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3686 }
3687
3688 static GstFlowReturn
3689 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3690 {
3691   GstBaseSink *basesink;
3692   GstBaseSinkClass *bclass;
3693   GstFlowReturn result;
3694
3695   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3696   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3697
3698   if (G_LIKELY (bclass->render_list)) {
3699     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3700   } else {
3701     GstBufferListIterator *it;
3702     GstBuffer *group;
3703
3704     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3705
3706     it = gst_buffer_list_iterate (list);
3707
3708     if (gst_buffer_list_iterator_next_group (it)) {
3709       do {
3710         group = gst_buffer_list_iterator_merge_group (it);
3711         if (group == NULL) {
3712           group = gst_buffer_new ();
3713           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3714         } else {
3715           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3716         }
3717         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3718       } while (result == GST_FLOW_OK
3719           && gst_buffer_list_iterator_next_group (it));
3720     } else {
3721       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3722       result =
3723           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3724           gst_buffer_new ());
3725     }
3726     gst_buffer_list_iterator_free (it);
3727     gst_buffer_list_unref (list);
3728   }
3729   return result;
3730 }
3731
3732
3733 static gboolean
3734 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3735 {
3736   gboolean res = TRUE;
3737
3738   /* update our offset if the start/stop position was updated */
3739   if (segment->format == GST_FORMAT_BYTES) {
3740     segment->time = segment->start;
3741   } else if (segment->start == 0) {
3742     /* seek to start, we can implement a default for this. */
3743     segment->time = 0;
3744   } else {
3745     res = FALSE;
3746     GST_INFO_OBJECT (sink, "Can't do a default seek");
3747   }
3748
3749   return res;
3750 }
3751
3752 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3753
3754 static gboolean
3755 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3756     GstEvent * event, GstSegment * segment)
3757 {
3758   /* By default, we try one of 2 things:
3759    *   - For absolute seek positions, convert the requested position to our
3760    *     configured processing format and place it in the output segment \
3761    *   - For relative seek positions, convert our current (input) values to the
3762    *     seek format, adjust by the relative seek offset and then convert back to
3763    *     the processing format
3764    */
3765   GstSeekType cur_type, stop_type;
3766   gint64 cur, stop;
3767   GstSeekFlags flags;
3768   GstFormat seek_format, dest_format;
3769   gdouble rate;
3770   gboolean update;
3771   gboolean res = TRUE;
3772
3773   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3774       &cur_type, &cur, &stop_type, &stop);
3775   dest_format = segment->format;
3776
3777   if (seek_format == dest_format) {
3778     gst_segment_set_seek (segment, rate, seek_format, flags,
3779         cur_type, cur, stop_type, stop, &update);
3780     return TRUE;
3781   }
3782
3783   if (cur_type != GST_SEEK_TYPE_NONE) {
3784     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3785     res =
3786         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3787         &cur);
3788     cur_type = GST_SEEK_TYPE_SET;
3789   }
3790
3791   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3792     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3793     res =
3794         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3795         &stop);
3796     stop_type = GST_SEEK_TYPE_SET;
3797   }
3798
3799   /* And finally, configure our output segment in the desired format */
3800   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3801       stop_type, stop, &update);
3802
3803   if (!res)
3804     goto no_format;
3805
3806   return res;
3807
3808 no_format:
3809   {
3810     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3811     return FALSE;
3812   }
3813 }
3814
3815 /* perform a seek, only executed in pull mode */
3816 static gboolean
3817 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3818 {
3819   gboolean flush;
3820   gdouble rate;
3821   GstFormat seek_format, dest_format;
3822   GstSeekFlags flags;
3823   GstSeekType cur_type, stop_type;
3824   gboolean seekseg_configured = FALSE;
3825   gint64 cur, stop;
3826   gboolean update, res = TRUE;
3827   GstSegment seeksegment;
3828
3829   dest_format = sink->segment.format;
3830
3831   if (event) {
3832     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3833     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3834         &cur_type, &cur, &stop_type, &stop);
3835
3836     flush = flags & GST_SEEK_FLAG_FLUSH;
3837   } else {
3838     GST_DEBUG_OBJECT (sink, "performing seek without event");
3839     flush = FALSE;
3840   }
3841
3842   if (flush) {
3843     GST_DEBUG_OBJECT (sink, "flushing upstream");
3844     gst_pad_push_event (pad, gst_event_new_flush_start ());
3845     gst_base_sink_flush_start (sink, pad);
3846   } else {
3847     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3848   }
3849
3850   GST_PAD_STREAM_LOCK (pad);
3851
3852   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3853    * copy the current segment info into the temp segment that we can actually
3854    * attempt the seek with. We only update the real segment if the seek suceeds. */
3855   if (!seekseg_configured) {
3856     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3857
3858     /* now configure the final seek segment */
3859     if (event) {
3860       if (sink->segment.format != seek_format) {
3861         /* OK, here's where we give the subclass a chance to convert the relative
3862          * seek into an absolute one in the processing format. We set up any
3863          * absolute seek above, before taking the stream lock. */
3864         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3865                 &seeksegment)) {
3866           GST_DEBUG_OBJECT (sink,
3867               "Preparing the seek failed after flushing. " "Aborting seek");
3868           res = FALSE;
3869         }
3870       } else {
3871         /* The seek format matches our processing format, no need to ask the
3872          * the subclass to configure the segment. */
3873         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3874             cur_type, cur, stop_type, stop, &update);
3875       }
3876     }
3877     /* Else, no seek event passed, so we're just (re)starting the
3878        current segment. */
3879   }
3880
3881   if (res) {
3882     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3883         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3884         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3885
3886     /* do the seek, segment.last_stop contains the new position. */
3887     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3888   }
3889
3890
3891   if (flush) {
3892     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3893     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3894     gst_base_sink_flush_stop (sink, pad);
3895   } else if (res && sink->abidata.ABI.running) {
3896     /* we are running the current segment and doing a non-flushing seek,
3897      * close the segment first based on the last_stop. */
3898     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3899         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3900   }
3901
3902   /* The subclass must have converted the segment to the processing format
3903    * by now */
3904   if (res && seeksegment.format != dest_format) {
3905     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3906         "in the correct format. Aborting seek.");
3907     res = FALSE;
3908   }
3909
3910   /* if successfull seek, we update our real segment and push
3911    * out the new segment. */
3912   if (res) {
3913     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3914
3915     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3916       gst_element_post_message (GST_ELEMENT (sink),
3917           gst_message_new_segment_start (GST_OBJECT (sink),
3918               sink->segment.format, sink->segment.last_stop));
3919     }
3920   }
3921
3922   sink->priv->discont = TRUE;
3923   sink->abidata.ABI.running = TRUE;
3924
3925   GST_PAD_STREAM_UNLOCK (pad);
3926
3927   return res;
3928 }
3929
3930 static void
3931 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3932     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3933     gboolean flush, gboolean intermediate)
3934 {
3935   GST_OBJECT_LOCK (sink);
3936   pending->seqnum = seqnum;
3937   pending->format = format;
3938   pending->amount = amount;
3939   pending->position = 0;
3940   pending->rate = rate;
3941   pending->flush = flush;
3942   pending->intermediate = intermediate;
3943   pending->valid = TRUE;
3944   /* flush invalidates the current stepping segment */
3945   if (flush)
3946     current->valid = FALSE;
3947   GST_OBJECT_UNLOCK (sink);
3948 }
3949
3950 static gboolean
3951 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3952 {
3953   GstBaseSinkPrivate *priv;
3954   GstBaseSinkClass *bclass;
3955   gboolean flush, intermediate;
3956   gdouble rate;
3957   GstFormat format;
3958   guint64 amount;
3959   guint seqnum;
3960   GstStepInfo *pending, *current;
3961   GstMessage *message;
3962
3963   bclass = GST_BASE_SINK_GET_CLASS (sink);
3964   priv = sink->priv;
3965
3966   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3967
3968   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3969   seqnum = gst_event_get_seqnum (event);
3970
3971   pending = &priv->pending_step;
3972   current = &priv->current_step;
3973
3974   /* post message first */
3975   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3976       amount, rate, flush, intermediate);
3977   gst_message_set_seqnum (message, seqnum);
3978   gst_element_post_message (GST_ELEMENT (sink), message);
3979
3980   if (flush) {
3981     /* we need to call ::unlock before locking PREROLL_LOCK
3982      * since we lock it before going into ::render */
3983     if (bclass->unlock)
3984       bclass->unlock (sink);
3985
3986     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3987     /* now that we have the PREROLL lock, clear our unlock request */
3988     if (bclass->unlock_stop)
3989       bclass->unlock_stop (sink);
3990
3991     /* update the stepinfo and make it valid */
3992     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3993         intermediate);
3994
3995     if (sink->priv->async_enabled) {
3996       /* and we need to commit our state again on the next
3997        * prerolled buffer */
3998       sink->playing_async = TRUE;
3999       priv->pending_step.need_preroll = TRUE;
4000       sink->need_preroll = FALSE;
4001       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4002     } else {
4003       sink->priv->have_latency = TRUE;
4004       sink->need_preroll = FALSE;
4005     }
4006     priv->current_sstart = GST_CLOCK_TIME_NONE;
4007     priv->current_sstop = GST_CLOCK_TIME_NONE;
4008     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4009     priv->call_preroll = TRUE;
4010     gst_base_sink_set_last_buffer (sink, NULL);
4011     gst_base_sink_reset_qos (sink);
4012
4013     if (sink->clock_id) {
4014       gst_clock_id_unschedule (sink->clock_id);
4015     }
4016
4017     if (sink->have_preroll) {
4018       GST_DEBUG_OBJECT (sink, "signal waiter");
4019       priv->step_unlock = TRUE;
4020       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4021     }
4022     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4023   } else {
4024     /* update the stepinfo and make it valid */
4025     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4026         intermediate);
4027   }
4028
4029   return TRUE;
4030 }
4031
4032 /* with STREAM_LOCK
4033  */
4034 static void
4035 gst_base_sink_loop (GstPad * pad)
4036 {
4037   GstBaseSink *basesink;
4038   GstBuffer *buf = NULL;
4039   GstFlowReturn result;
4040   guint blocksize;
4041   guint64 offset;
4042
4043   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4044
4045   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4046
4047   if ((blocksize = basesink->priv->blocksize) == 0)
4048     blocksize = -1;
4049
4050   offset = basesink->segment.last_stop;
4051
4052   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4053       offset, blocksize);
4054
4055   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4056   if (G_UNLIKELY (result != GST_FLOW_OK))
4057     goto paused;
4058
4059   if (G_UNLIKELY (buf == NULL))
4060     goto no_buffer;
4061
4062   offset += GST_BUFFER_SIZE (buf);
4063
4064   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4065
4066   GST_PAD_PREROLL_LOCK (pad);
4067   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4068   GST_PAD_PREROLL_UNLOCK (pad);
4069   if (G_UNLIKELY (result != GST_FLOW_OK))
4070     goto paused;
4071
4072   return;
4073
4074   /* ERRORS */
4075 paused:
4076   {
4077     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4078         gst_flow_get_name (result));
4079     gst_pad_pause_task (pad);
4080     if (result == GST_FLOW_UNEXPECTED) {
4081       /* perform EOS logic */
4082       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4083         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4084             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4085                 basesink->segment.format, basesink->segment.last_stop));
4086       } else {
4087         gst_base_sink_event (pad, gst_event_new_eos ());
4088       }
4089     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4090       /* for fatal errors we post an error message, post the error
4091        * first so the app knows about the error first. 
4092        * wrong-state is not a fatal error because it happens due to
4093        * flushing and posting an error message in that case is the
4094        * wrong thing to do, e.g. when basesrc is doing a flushing
4095        * seek. */
4096       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4097           (_("Internal data stream error.")),
4098           ("stream stopped, reason %s", gst_flow_get_name (result)));
4099       gst_base_sink_event (pad, gst_event_new_eos ());
4100     }
4101     return;
4102   }
4103 no_buffer:
4104   {
4105     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4106     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4107         (_("Internal data flow error.")), ("element returned NULL buffer"));
4108     result = GST_FLOW_ERROR;
4109     goto paused;
4110   }
4111 }
4112
4113 static gboolean
4114 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4115     gboolean flushing)
4116 {
4117   GstBaseSinkClass *bclass;
4118
4119   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4120
4121   if (flushing) {
4122     /* unlock any subclasses, we need to do this before grabbing the
4123      * PREROLL_LOCK since we hold this lock before going into ::render. */
4124     if (bclass->unlock)
4125       bclass->unlock (basesink);
4126   }
4127
4128   GST_PAD_PREROLL_LOCK (pad);
4129   basesink->flushing = flushing;
4130   if (flushing) {
4131     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4132     if (bclass->unlock_stop)
4133       bclass->unlock_stop (basesink);
4134
4135     /* set need_preroll before we unblock the clock. If the clock is unblocked
4136      * before timing out, we can reuse the buffer for preroll. */
4137     basesink->need_preroll = TRUE;
4138
4139     /* step 2, unblock clock sync (if any) or any other blocking thing */
4140     if (basesink->clock_id) {
4141       gst_clock_id_unschedule (basesink->clock_id);
4142     }
4143
4144     /* flush out the data thread if it's locked in finish_preroll, this will
4145      * also flush out the EOS state */
4146     GST_DEBUG_OBJECT (basesink,
4147         "flushing out data thread, need preroll to TRUE");
4148     gst_base_sink_preroll_queue_flush (basesink, pad);
4149   }
4150   GST_PAD_PREROLL_UNLOCK (pad);
4151
4152   return TRUE;
4153 }
4154
4155 static gboolean
4156 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4157 {
4158   gboolean result;
4159
4160   if (active) {
4161     /* start task */
4162     result = gst_pad_start_task (basesink->sinkpad,
4163         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4164   } else {
4165     /* step 2, make sure streaming finishes */
4166     result = gst_pad_stop_task (basesink->sinkpad);
4167   }
4168
4169   return result;
4170 }
4171
4172 static gboolean
4173 gst_base_sink_pad_activate (GstPad * pad)
4174 {
4175   gboolean result = FALSE;
4176   GstBaseSink *basesink;
4177
4178   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4179
4180   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4181
4182   gst_base_sink_set_flushing (basesink, pad, FALSE);
4183
4184   /* we need to have the pull mode enabled */
4185   if (!basesink->can_activate_pull) {
4186     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4187     goto fallback;
4188   }
4189
4190   /* check if downstreams supports pull mode at all */
4191   if (!gst_pad_check_pull_range (pad)) {
4192     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4193     goto fallback;
4194   }
4195
4196   /* set the pad mode before starting the task so that it's in the
4197    * correct state for the new thread. also the sink set_caps and get_caps
4198    * function checks this */
4199   basesink->pad_mode = GST_ACTIVATE_PULL;
4200
4201   /* we first try to negotiate a format so that when we try to activate
4202    * downstream, it knows about our format */
4203   if (!gst_base_sink_negotiate_pull (basesink)) {
4204     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4205     goto fallback;
4206   }
4207
4208   /* ok activate now */
4209   if (!gst_pad_activate_pull (pad, TRUE)) {
4210     /* clear any pending caps */
4211     GST_OBJECT_LOCK (basesink);
4212     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4213     GST_OBJECT_UNLOCK (basesink);
4214     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4215     goto fallback;
4216   }
4217
4218   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4219   result = TRUE;
4220   goto done;
4221
4222   /* push mode fallback */
4223 fallback:
4224   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4225   if ((result = gst_pad_activate_push (pad, TRUE))) {
4226     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4227   }
4228
4229 done:
4230   if (!result) {
4231     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4232     gst_base_sink_set_flushing (basesink, pad, TRUE);
4233   }
4234
4235   gst_object_unref (basesink);
4236
4237   return result;
4238 }
4239
4240 static gboolean
4241 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4242 {
4243   gboolean result;
4244   GstBaseSink *basesink;
4245
4246   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4247
4248   if (active) {
4249     if (!basesink->can_activate_push) {
4250       result = FALSE;
4251       basesink->pad_mode = GST_ACTIVATE_NONE;
4252     } else {
4253       result = TRUE;
4254       basesink->pad_mode = GST_ACTIVATE_PUSH;
4255     }
4256   } else {
4257     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4258       g_warning ("Internal GStreamer activation error!!!");
4259       result = FALSE;
4260     } else {
4261       gst_base_sink_set_flushing (basesink, pad, TRUE);
4262       result = TRUE;
4263       basesink->pad_mode = GST_ACTIVATE_NONE;
4264     }
4265   }
4266
4267   gst_object_unref (basesink);
4268
4269   return result;
4270 }
4271
4272 static gboolean
4273 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4274 {
4275   GstCaps *caps;
4276   gboolean result;
4277
4278   result = FALSE;
4279
4280   /* this returns the intersection between our caps and the peer caps. If there
4281    * is no peer, it returns NULL and we can't operate in pull mode so we can
4282    * fail the negotiation. */
4283   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4284   if (caps == NULL || gst_caps_is_empty (caps))
4285     goto no_caps_possible;
4286
4287   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4288
4289   caps = gst_caps_make_writable (caps);
4290   /* get the first (prefered) format */
4291   gst_caps_truncate (caps);
4292   /* try to fixate */
4293   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4294
4295   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4296
4297   if (gst_caps_is_any (caps)) {
4298     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4299         "allowing pull()");
4300     /* neither side has template caps in this case, so they are prepared for
4301        pull() without setcaps() */
4302     result = TRUE;
4303   } else if (gst_caps_is_fixed (caps)) {
4304     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4305       goto could_not_set_caps;
4306
4307     GST_OBJECT_LOCK (basesink);
4308     gst_caps_replace (&basesink->priv->pull_caps, caps);
4309     GST_OBJECT_UNLOCK (basesink);
4310
4311     result = TRUE;
4312   }
4313
4314   gst_caps_unref (caps);
4315
4316   return result;
4317
4318 no_caps_possible:
4319   {
4320     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4321     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4322     if (caps)
4323       gst_caps_unref (caps);
4324     return FALSE;
4325   }
4326 could_not_set_caps:
4327   {
4328     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4329     gst_caps_unref (caps);
4330     return FALSE;
4331   }
4332 }
4333
4334 /* this won't get called until we implement an activate function */
4335 static gboolean
4336 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4337 {
4338   gboolean result = FALSE;
4339   GstBaseSink *basesink;
4340   GstBaseSinkClass *bclass;
4341
4342   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4343   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4344
4345   if (active) {
4346     GstFormat format;
4347     gint64 duration;
4348
4349     /* we mark we have a newsegment here because pull based
4350      * mode works just fine without having a newsegment before the
4351      * first buffer */
4352     format = GST_FORMAT_BYTES;
4353
4354     gst_segment_init (&basesink->segment, format);
4355     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4356     GST_OBJECT_LOCK (basesink);
4357     basesink->have_newsegment = TRUE;
4358     GST_OBJECT_UNLOCK (basesink);
4359
4360     /* get the peer duration in bytes */
4361     result = gst_pad_query_peer_duration (pad, &format, &duration);
4362     if (result) {
4363       GST_DEBUG_OBJECT (basesink,
4364           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4365       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4366           duration);
4367       gst_segment_set_duration (&basesink->segment, format, duration);
4368     } else {
4369       GST_DEBUG_OBJECT (basesink, "unknown duration");
4370     }
4371
4372     if (bclass->activate_pull)
4373       result = bclass->activate_pull (basesink, TRUE);
4374     else
4375       result = FALSE;
4376
4377     if (!result)
4378       goto activate_failed;
4379
4380   } else {
4381     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4382       g_warning ("Internal GStreamer activation error!!!");
4383       result = FALSE;
4384     } else {
4385       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4386       if (bclass->activate_pull)
4387         result &= bclass->activate_pull (basesink, FALSE);
4388       basesink->pad_mode = GST_ACTIVATE_NONE;
4389       /* clear any pending caps */
4390       GST_OBJECT_LOCK (basesink);
4391       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4392       GST_OBJECT_UNLOCK (basesink);
4393     }
4394   }
4395   gst_object_unref (basesink);
4396
4397   return result;
4398
4399   /* ERRORS */
4400 activate_failed:
4401   {
4402     /* reset, as starting the thread failed */
4403     basesink->pad_mode = GST_ACTIVATE_NONE;
4404
4405     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4406     return FALSE;
4407   }
4408 }
4409
4410 /* send an event to our sinkpad peer. */
4411 static gboolean
4412 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4413 {
4414   GstPad *pad;
4415   GstBaseSink *basesink = GST_BASE_SINK (element);
4416   gboolean forward, result = TRUE;
4417   GstActivateMode mode;
4418
4419   GST_OBJECT_LOCK (element);
4420   /* get the pad and the scheduling mode */
4421   pad = gst_object_ref (basesink->sinkpad);
4422   mode = basesink->pad_mode;
4423   GST_OBJECT_UNLOCK (element);
4424
4425   /* only push UPSTREAM events upstream */
4426   forward = GST_EVENT_IS_UPSTREAM (event);
4427
4428   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4429       event);
4430
4431   switch (GST_EVENT_TYPE (event)) {
4432     case GST_EVENT_LATENCY:
4433     {
4434       GstClockTime latency;
4435
4436       gst_event_parse_latency (event, &latency);
4437
4438       /* store the latency. We use this to adjust the running_time before syncing
4439        * it to the clock. */
4440       GST_OBJECT_LOCK (element);
4441       basesink->priv->latency = latency;
4442       if (!basesink->priv->have_latency)
4443         forward = FALSE;
4444       GST_OBJECT_UNLOCK (element);
4445       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4446           GST_TIME_ARGS (latency));
4447
4448       /* We forward this event so that all elements know about the global pipeline
4449        * latency. This is interesting for an element when it wants to figure out
4450        * when a particular piece of data will be rendered. */
4451       break;
4452     }
4453     case GST_EVENT_SEEK:
4454       /* in pull mode we will execute the seek */
4455       if (mode == GST_ACTIVATE_PULL)
4456         result = gst_base_sink_perform_seek (basesink, pad, event);
4457       break;
4458     case GST_EVENT_STEP:
4459       result = gst_base_sink_perform_step (basesink, pad, event);
4460       forward = FALSE;
4461       break;
4462     default:
4463       break;
4464   }
4465
4466   if (forward) {
4467     result = gst_pad_push_event (pad, event);
4468   } else {
4469     /* not forwarded, unref the event */
4470     gst_event_unref (event);
4471   }
4472
4473   gst_object_unref (pad);
4474   return result;
4475 }
4476
4477 static gboolean
4478 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4479     gint64 * cur, gboolean * upstream)
4480 {
4481   GstClock *clock = NULL;
4482   gboolean res = FALSE;
4483   GstFormat oformat, tformat;
4484   GstSegment *segment;
4485   GstClockTime now, latency;
4486   GstClockTimeDiff base;
4487   gint64 time, accum, duration;
4488   gdouble rate;
4489   gint64 last;
4490   gboolean last_seen, with_clock, in_paused;
4491
4492   GST_OBJECT_LOCK (basesink);
4493   /* we can only get the segment when we are not NULL or READY */
4494   if (!basesink->have_newsegment)
4495     goto wrong_state;
4496
4497   in_paused = FALSE;
4498   /* when not in PLAYING or when we're busy with a state change, we
4499    * cannot read from the clock so we report time based on the
4500    * last seen timestamp. */
4501   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4502       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4503     in_paused = TRUE;
4504   }
4505
4506   /* we don't use the clip segment in pull mode, when seeking we update the
4507    * main segment directly with the new segment values without it having to be
4508    * activated by the rendering after preroll */
4509   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4510     segment = basesink->abidata.ABI.clip_segment;
4511   else
4512     segment = &basesink->segment;
4513
4514   /* our intermediate time format */
4515   tformat = GST_FORMAT_TIME;
4516   /* get the format in the segment */
4517   oformat = segment->format;
4518
4519   /* report with last seen position when EOS */
4520   last_seen = basesink->eos;
4521
4522   /* assume we will use the clock for getting the current position */
4523   with_clock = TRUE;
4524   if (basesink->sync == FALSE)
4525     with_clock = FALSE;
4526
4527   /* and we need a clock */
4528   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4529     with_clock = FALSE;
4530   else
4531     gst_object_ref (clock);
4532
4533   /* collect all data we need holding the lock */
4534   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4535     time = segment->time;
4536   else
4537     time = 0;
4538
4539   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4540     duration = segment->stop - segment->start;
4541   else
4542     duration = 0;
4543
4544   accum = segment->accum;
4545   rate = segment->rate * segment->applied_rate;
4546   latency = basesink->priv->latency;
4547
4548   if (oformat == GST_FORMAT_TIME) {
4549     gint64 start, stop;
4550
4551     start = basesink->priv->current_sstart;
4552     stop = basesink->priv->current_sstop;
4553
4554     if (in_paused) {
4555       /* in paused we use the last position as a lower bound */
4556       if (stop == -1 || segment->rate > 0.0)
4557         last = start;
4558       else
4559         last = stop;
4560     } else {
4561       /* in playing, use last stop time as upper bound */
4562       if (start == -1 || segment->rate > 0.0)
4563         last = stop;
4564       else
4565         last = start;
4566     }
4567   } else {
4568     /* convert last stop to stream time */
4569     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4570   }
4571
4572   if (in_paused) {
4573     /* in paused, use start_time */
4574     base = GST_ELEMENT_START_TIME (basesink);
4575     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4576         GST_TIME_ARGS (base));
4577   } else if (with_clock) {
4578     /* else use clock when needed */
4579     base = GST_ELEMENT_CAST (basesink)->base_time;
4580     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4581         GST_TIME_ARGS (base));
4582   } else {
4583     /* else, no sync or clock -> no base time */
4584     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4585     base = -1;
4586   }
4587
4588   /* no base, we can't calculate running_time, use last seem timestamp to report
4589    * time */
4590   if (base == -1)
4591     last_seen = TRUE;
4592
4593   /* need to release the object lock before we can get the time,
4594    * a clock might take the LOCK of the provider, which could be
4595    * a basesink subclass. */
4596   GST_OBJECT_UNLOCK (basesink);
4597
4598   if (last_seen) {
4599     /* in EOS or when no valid stream_time, report the value of last seen
4600      * timestamp */
4601     if (last == -1) {
4602       /* no timestamp, we need to ask upstream */
4603       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4604       res = FALSE;
4605       *upstream = TRUE;
4606       goto done;
4607     }
4608     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4609         GST_TIME_ARGS (last));
4610     *cur = last;
4611   } else {
4612     if (oformat != tformat) {
4613       /* convert accum, time and duration to time */
4614       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4615               &accum))
4616         goto convert_failed;
4617       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4618               &tformat, &duration))
4619         goto convert_failed;
4620       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4621               &time))
4622         goto convert_failed;
4623       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4624               &last))
4625         goto convert_failed;
4626
4627       /* assume time format from now on */
4628       oformat = tformat;
4629     }
4630
4631     if (!in_paused && with_clock) {
4632       now = gst_clock_get_time (clock);
4633     } else {
4634       now = base;
4635       base = 0;
4636     }
4637
4638     /* subtract base time and accumulated time from the clock time.
4639      * Make sure we don't go negative. This is the current time in
4640      * the segment which we need to scale with the combined
4641      * rate and applied rate. */
4642     base += accum;
4643     base += latency;
4644     if (GST_CLOCK_DIFF (base, now) < 0)
4645       base = now;
4646
4647     /* for negative rates we need to count back from the segment
4648      * duration. */
4649     if (rate < 0.0)
4650       time += duration;
4651
4652     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4653
4654     if (in_paused) {
4655       /* never report less than segment values in paused */
4656       if (last != -1)
4657         *cur = MAX (last, *cur);
4658     } else {
4659       /* never report more than last seen position in playing */
4660       if (last != -1)
4661         *cur = MIN (last, *cur);
4662     }
4663
4664     GST_DEBUG_OBJECT (basesink,
4665         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4666         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4667         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4668         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4669   }
4670
4671   if (oformat != format) {
4672     /* convert to final format */
4673     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4674       goto convert_failed;
4675   }
4676
4677   res = TRUE;
4678
4679 done:
4680   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4681       res, GST_TIME_ARGS (*cur));
4682
4683   if (clock)
4684     gst_object_unref (clock);
4685
4686   return res;
4687
4688   /* special cases */
4689 wrong_state:
4690   {
4691     /* in NULL or READY we always return FALSE and -1 */
4692     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4693     res = FALSE;
4694     *cur = -1;
4695     GST_OBJECT_UNLOCK (basesink);
4696     goto done;
4697   }
4698 convert_failed:
4699   {
4700     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4701     *upstream = TRUE;
4702     res = FALSE;
4703     goto done;
4704   }
4705 }
4706
4707 static gboolean
4708 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4709     gint64 * dur, gboolean * upstream)
4710 {
4711   gboolean res = FALSE;
4712
4713   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4714     GstFormat uformat = GST_FORMAT_BYTES;
4715     gint64 uduration;
4716
4717     /* get the duration in bytes, in pull mode that's all we are sure to
4718      * know. We have to explicitly get this value from upstream instead of
4719      * using our cached value because it might change. Duration caching
4720      * should be done at a higher level. */
4721     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4722     if (res) {
4723       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4724       if (format != uformat) {
4725         /* convert to the requested format */
4726         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4727             &format, dur);
4728       } else {
4729         *dur = uduration;
4730       }
4731     }
4732     *upstream = FALSE;
4733   } else {
4734     *upstream = TRUE;
4735   }
4736
4737   return res;
4738 }
4739
4740 static const GstQueryType *
4741 gst_base_sink_get_query_types (GstElement * element)
4742 {
4743   static const GstQueryType query_types[] = {
4744     GST_QUERY_DURATION,
4745     GST_QUERY_POSITION,
4746     GST_QUERY_SEGMENT,
4747     GST_QUERY_LATENCY,
4748     0
4749   };
4750
4751   return query_types;
4752 }
4753
4754 static gboolean
4755 gst_base_sink_query (GstElement * element, GstQuery * query)
4756 {
4757   gboolean res = FALSE;
4758
4759   GstBaseSink *basesink = GST_BASE_SINK (element);
4760
4761   switch (GST_QUERY_TYPE (query)) {
4762     case GST_QUERY_POSITION:
4763     {
4764       gint64 cur = 0;
4765       GstFormat format;
4766       gboolean upstream = FALSE;
4767
4768       gst_query_parse_position (query, &format, NULL);
4769
4770       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4771           gst_format_get_name (format));
4772
4773       /* first try to get the position based on the clock */
4774       if ((res =
4775               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4776         gst_query_set_position (query, format, cur);
4777       } else if (upstream) {
4778         /* fallback to peer query */
4779         res = gst_pad_peer_query (basesink->sinkpad, query);
4780       }
4781       if (!res) {
4782         /* we can handle a few things if upstream failed */
4783         if (format == GST_FORMAT_PERCENT) {
4784           gint64 dur = 0;
4785           GstFormat uformat = GST_FORMAT_TIME;
4786
4787           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4788               &upstream);
4789           if (!res && upstream) {
4790             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4791                 &cur);
4792           }
4793           if (res) {
4794             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4795                 &upstream);
4796             if (!res && upstream) {
4797               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4798                   &dur);
4799             }
4800           }
4801           if (res) {
4802             gint64 pos;
4803
4804             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4805                 dur);
4806             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4807           }
4808         }
4809       }
4810       break;
4811     }
4812     case GST_QUERY_DURATION:
4813     {
4814       gint64 dur = 0;
4815       GstFormat format;
4816       gboolean upstream = FALSE;
4817
4818       gst_query_parse_duration (query, &format, NULL);
4819
4820       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4821           gst_format_get_name (format));
4822
4823       if ((res =
4824               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4825         gst_query_set_duration (query, format, dur);
4826       } else if (upstream) {
4827         /* fallback to peer query */
4828         res = gst_pad_peer_query (basesink->sinkpad, query);
4829       }
4830       if (!res) {
4831         /* we can handle a few things if upstream failed */
4832         if (format == GST_FORMAT_PERCENT) {
4833           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4834               GST_FORMAT_PERCENT_MAX);
4835           res = TRUE;
4836         }
4837       }
4838       break;
4839     }
4840     case GST_QUERY_LATENCY:
4841     {
4842       gboolean live, us_live;
4843       GstClockTime min, max;
4844
4845       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4846                   &max))) {
4847         gst_query_set_latency (query, live, min, max);
4848       }
4849       break;
4850     }
4851     case GST_QUERY_JITTER:
4852       break;
4853     case GST_QUERY_RATE:
4854       /* gst_query_set_rate (query, basesink->segment_rate); */
4855       res = TRUE;
4856       break;
4857     case GST_QUERY_SEGMENT:
4858     {
4859       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4860         gst_query_set_segment (query, basesink->segment.rate,
4861             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4862         res = TRUE;
4863       } else {
4864         res = gst_pad_peer_query (basesink->sinkpad, query);
4865       }
4866       break;
4867     }
4868     case GST_QUERY_SEEKING:
4869     case GST_QUERY_CONVERT:
4870     case GST_QUERY_FORMATS:
4871     default:
4872       res = gst_pad_peer_query (basesink->sinkpad, query);
4873       break;
4874   }
4875   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4876       GST_QUERY_TYPE_NAME (query), res);
4877   return res;
4878 }
4879
4880 static GstStateChangeReturn
4881 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4882 {
4883   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4884   GstBaseSink *basesink = GST_BASE_SINK (element);
4885   GstBaseSinkClass *bclass;
4886   GstBaseSinkPrivate *priv;
4887
4888   priv = basesink->priv;
4889
4890   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4891
4892   switch (transition) {
4893     case GST_STATE_CHANGE_NULL_TO_READY:
4894       if (bclass->start)
4895         if (!bclass->start (basesink))
4896           goto start_failed;
4897       break;
4898     case GST_STATE_CHANGE_READY_TO_PAUSED:
4899       /* need to complete preroll before this state change completes, there
4900        * is no data flow in READY so we can safely assume we need to preroll. */
4901       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4902       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4903       basesink->have_newsegment = FALSE;
4904       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4905       gst_segment_init (basesink->abidata.ABI.clip_segment,
4906           GST_FORMAT_UNDEFINED);
4907       basesink->offset = 0;
4908       basesink->have_preroll = FALSE;
4909       priv->step_unlock = FALSE;
4910       basesink->need_preroll = TRUE;
4911       basesink->playing_async = TRUE;
4912       priv->current_sstart = GST_CLOCK_TIME_NONE;
4913       priv->current_sstop = GST_CLOCK_TIME_NONE;
4914       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4915       priv->latency = 0;
4916       basesink->eos = FALSE;
4917       priv->received_eos = FALSE;
4918       gst_base_sink_reset_qos (basesink);
4919       priv->commited = FALSE;
4920       priv->call_preroll = TRUE;
4921       priv->current_step.valid = FALSE;
4922       priv->pending_step.valid = FALSE;
4923       if (priv->async_enabled) {
4924         GST_DEBUG_OBJECT (basesink, "doing async state change");
4925         /* when async enabled, post async-start message and return ASYNC from
4926          * the state change function */
4927         ret = GST_STATE_CHANGE_ASYNC;
4928         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4929             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4930       } else {
4931         priv->have_latency = TRUE;
4932       }
4933       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4934       break;
4935     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4936       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4937       if (!gst_base_sink_needs_preroll (basesink)) {
4938         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4939         /* no preroll needed anymore now. */
4940         basesink->playing_async = FALSE;
4941         basesink->need_preroll = FALSE;
4942         if (basesink->eos) {
4943           GstMessage *message;
4944
4945           /* need to post EOS message here */
4946           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4947           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4948           gst_message_set_seqnum (message, basesink->priv->seqnum);
4949           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4950         } else {
4951           GST_DEBUG_OBJECT (basesink, "signal preroll");
4952           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4953         }
4954       } else {
4955         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4956         basesink->need_preroll = TRUE;
4957         basesink->playing_async = TRUE;
4958         priv->call_preroll = TRUE;
4959         priv->commited = FALSE;
4960         if (priv->async_enabled) {
4961           GST_DEBUG_OBJECT (basesink, "doing async state change");
4962           ret = GST_STATE_CHANGE_ASYNC;
4963           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4964               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4965         }
4966       }
4967       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4968       break;
4969     default:
4970       break;
4971   }
4972
4973   {
4974     GstStateChangeReturn bret;
4975
4976     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4977     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4978       goto activate_failed;
4979   }
4980
4981   switch (transition) {
4982     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4983       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4984       /* FIXME, make sure we cannot enter _render first */
4985
4986       /* we need to call ::unlock before locking PREROLL_LOCK
4987        * since we lock it before going into ::render */
4988       if (bclass->unlock)
4989         bclass->unlock (basesink);
4990
4991       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4992       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4993       /* now that we have the PREROLL lock, clear our unlock request */
4994       if (bclass->unlock_stop)
4995         bclass->unlock_stop (basesink);
4996
4997       /* we need preroll again and we set the flag before unlocking the clockid
4998        * because if the clockid is unlocked before a current buffer expired, we
4999        * can use that buffer to preroll with */
5000       basesink->need_preroll = TRUE;
5001
5002       if (basesink->clock_id) {
5003         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5004         gst_clock_id_unschedule (basesink->clock_id);
5005       }
5006
5007       /* if we don't have a preroll buffer we need to wait for a preroll and
5008        * return ASYNC. */
5009       if (!gst_base_sink_needs_preroll (basesink)) {
5010         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5011         basesink->playing_async = FALSE;
5012       } else {
5013         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5014           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5015           ret = GST_STATE_CHANGE_SUCCESS;
5016         } else {
5017           GST_DEBUG_OBJECT (basesink,
5018               "PLAYING to PAUSED, we are not prerolled");
5019           basesink->playing_async = TRUE;
5020           priv->commited = FALSE;
5021           priv->call_preroll = TRUE;
5022           if (priv->async_enabled) {
5023             GST_DEBUG_OBJECT (basesink, "doing async state change");
5024             ret = GST_STATE_CHANGE_ASYNC;
5025             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5026                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5027                     FALSE));
5028           }
5029         }
5030       }
5031       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5032           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5033
5034       gst_base_sink_reset_qos (basesink);
5035       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5036       break;
5037     case GST_STATE_CHANGE_PAUSED_TO_READY:
5038       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5039       /* start by reseting our position state with the object lock so that the
5040        * position query gets the right idea. We do this before we post the
5041        * messages so that the message handlers pick this up. */
5042       GST_OBJECT_LOCK (basesink);
5043       basesink->have_newsegment = FALSE;
5044       priv->current_sstart = GST_CLOCK_TIME_NONE;
5045       priv->current_sstop = GST_CLOCK_TIME_NONE;
5046       priv->have_latency = FALSE;
5047       if (priv->cached_clock_id) {
5048         gst_clock_id_unref (priv->cached_clock_id);
5049         priv->cached_clock_id = NULL;
5050       }
5051       GST_OBJECT_UNLOCK (basesink);
5052
5053       gst_base_sink_set_last_buffer (basesink, NULL);
5054       priv->call_preroll = FALSE;
5055
5056       if (!priv->commited) {
5057         if (priv->async_enabled) {
5058           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5059
5060           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5061               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5062                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5063
5064           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5065               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5066         }
5067         priv->commited = TRUE;
5068       } else {
5069         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5070       }
5071       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5072       break;
5073     case GST_STATE_CHANGE_READY_TO_NULL:
5074       if (bclass->stop) {
5075         if (!bclass->stop (basesink)) {
5076           GST_WARNING_OBJECT (basesink, "failed to stop");
5077         }
5078       }
5079       gst_base_sink_set_last_buffer (basesink, NULL);
5080       priv->call_preroll = FALSE;
5081       break;
5082     default:
5083       break;
5084   }
5085
5086   return ret;
5087
5088   /* ERRORS */
5089 start_failed:
5090   {
5091     GST_DEBUG_OBJECT (basesink, "failed to start");
5092     return GST_STATE_CHANGE_FAILURE;
5093   }
5094 activate_failed:
5095   {
5096     GST_DEBUG_OBJECT (basesink,
5097         "element failed to change states -- activation problem?");
5098     return GST_STATE_CHANGE_FAILURE;
5099   }
5100 }