basesink: try harder to arrange increasing position reporting
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199   /* the running time of the previous buffer */
200   GstClockTime prev_rstart;
201
202   /* EOS sync time in running time */
203   GstClockTime eos_rtime;
204
205   /* last buffer that arrived in time, running time */
206   GstClockTime last_render_time;
207   /* when the last buffer left the sink, running time */
208   GstClockTime last_left;
209
210   /* running averages go here these are done on running time */
211   GstClockTime avg_pt;
212   GstClockTime avg_duration;
213   gdouble avg_rate;
214   GstClockTime avg_in_diff;
215
216   /* these are done on system time. avg_jitter and avg_render are
217    * compared to eachother to see if the rendering time takes a
218    * huge amount of the processing, If so we are flooded with
219    * buffers. */
220   GstClockTime last_left_systime;
221   GstClockTime avg_jitter;
222   GstClockTime start, stop;
223   GstClockTime avg_render;
224
225   /* number of rendered and dropped frames */
226   guint64 rendered;
227   guint64 dropped;
228
229   /* latency stuff */
230   GstClockTime latency;
231
232   /* if we already commited the state */
233   gboolean commited;
234   /* state change to playing ongoing */
235   gboolean to_playing;
236
237   /* when we received EOS */
238   gboolean received_eos;
239
240   /* when we are prerolled and able to report latency */
241   gboolean have_latency;
242
243   /* the last buffer we prerolled or rendered. Useful for making snapshots */
244   gint enable_last_buffer;      /* atomic */
245   GstBuffer *last_buffer;
246
247   /* caps for pull based scheduling */
248   GstCaps *pull_caps;
249
250   /* blocksize for pulling */
251   guint blocksize;
252
253   gboolean discont;
254
255   /* seqnum of the stream */
256   guint32 seqnum;
257
258   gboolean call_preroll;
259   gboolean step_unlock;
260
261   /* we have a pending and a current step operation */
262   GstStepInfo current_step;
263   GstStepInfo pending_step;
264
265   /* Cached GstClockID */
266   GstClockID cached_clock_id;
267
268   /* for throttling and QoS */
269   GstClockTime earliest_in_time;
270   GstClockTime throttle_time;
271 };
272
273 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
274
275 /* generic running average, this has a neutral window size */
276 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
277
278 /* the windows for these running averages are experimentally obtained.
279  * possitive values get averaged more while negative values use a small
280  * window so we can react faster to badness. */
281 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
282 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
283
284 enum
285 {
286   _PR_IS_NOTHING = 1 << 0,
287   _PR_IS_BUFFER = 1 << 1,
288   _PR_IS_BUFFERLIST = 1 << 2,
289   _PR_IS_EVENT = 1 << 3
290 } PrivateObjectType;
291
292 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
293 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
294 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
295 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
296
297 /* BaseSink properties */
298
299 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
300 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
301
302 #define DEFAULT_PREROLL_QUEUE_LEN   0
303 #define DEFAULT_SYNC                TRUE
304 #define DEFAULT_MAX_LATENESS        -1
305 #define DEFAULT_QOS                 FALSE
306 #define DEFAULT_ASYNC               TRUE
307 #define DEFAULT_TS_OFFSET           0
308 #define DEFAULT_BLOCKSIZE           4096
309 #define DEFAULT_RENDER_DELAY        0
310 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
311 #define DEFAULT_THROTTLE_TIME       0
312
313 enum
314 {
315   PROP_0,
316   PROP_PREROLL_QUEUE_LEN,
317   PROP_SYNC,
318   PROP_MAX_LATENESS,
319   PROP_QOS,
320   PROP_ASYNC,
321   PROP_TS_OFFSET,
322   PROP_ENABLE_LAST_BUFFER,
323   PROP_LAST_BUFFER,
324   PROP_BLOCKSIZE,
325   PROP_RENDER_DELAY,
326   PROP_THROTTLE_TIME,
327   PROP_LAST
328 };
329
330 static GstElementClass *parent_class = NULL;
331
332 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
333 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
334 static void gst_base_sink_finalize (GObject * object);
335
336 GType
337 gst_base_sink_get_type (void)
338 {
339   static volatile gsize base_sink_type = 0;
340
341   if (g_once_init_enter (&base_sink_type)) {
342     GType _type;
343     static const GTypeInfo base_sink_info = {
344       sizeof (GstBaseSinkClass),
345       NULL,
346       NULL,
347       (GClassInitFunc) gst_base_sink_class_init,
348       NULL,
349       NULL,
350       sizeof (GstBaseSink),
351       0,
352       (GInstanceInitFunc) gst_base_sink_init,
353     };
354
355     _type = g_type_register_static (GST_TYPE_ELEMENT,
356         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
357     g_once_init_leave (&base_sink_type, _type);
358   }
359   return base_sink_type;
360 }
361
362 static void gst_base_sink_set_property (GObject * object, guint prop_id,
363     const GValue * value, GParamSpec * pspec);
364 static void gst_base_sink_get_property (GObject * object, guint prop_id,
365     GValue * value, GParamSpec * pspec);
366
367 static gboolean gst_base_sink_send_event (GstElement * element,
368     GstEvent * event);
369 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
370 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
371
372 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
373 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
374 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
375     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
376 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
377     GstClockTime * start, GstClockTime * end);
378 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
379     GstPad * pad, gboolean flushing);
380 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
381     gboolean active);
382 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
383     GstSegment * segment);
384 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
385     GstEvent * event, GstSegment * segment);
386
387 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
388     GstStateChange transition);
389
390 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
391 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
392     GstBufferList * list);
393
394 static void gst_base_sink_loop (GstPad * pad);
395 static gboolean gst_base_sink_pad_activate (GstPad * pad);
396 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
397 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
398 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
399
400 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
401 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
402 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
403 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
404 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
405     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
406
407
408 /* check if an object was too late */
409 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
410     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
411     GstClockReturn status, GstClockTimeDiff jitter);
412 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
413     guint8 obj_type, GstMiniObject * obj);
414
415 static void
416 gst_base_sink_class_init (GstBaseSinkClass * klass)
417 {
418   GObjectClass *gobject_class;
419   GstElementClass *gstelement_class;
420
421   gobject_class = G_OBJECT_CLASS (klass);
422   gstelement_class = GST_ELEMENT_CLASS (klass);
423
424   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
425       "basesink element");
426
427   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
428
429   parent_class = g_type_class_peek_parent (klass);
430
431   gobject_class->finalize = gst_base_sink_finalize;
432   gobject_class->set_property = gst_base_sink_set_property;
433   gobject_class->get_property = gst_base_sink_get_property;
434
435   /* FIXME, this next value should be configured using an event from the
436    * upstream element, ie, the BUFFER_SIZE event. */
437   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
438       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
439           "Number of buffers to queue during preroll", 0, G_MAXUINT,
440           DEFAULT_PREROLL_QUEUE_LEN,
441           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
442
443   g_object_class_install_property (gobject_class, PROP_SYNC,
444       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
448       g_param_spec_int64 ("max-lateness", "Max Lateness",
449           "Maximum number of nanoseconds that a buffer can be late before it "
450           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
451           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_QOS,
454       g_param_spec_boolean ("qos", "Qos",
455           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457   /**
458    * GstBaseSink:async
459    *
460    * If set to #TRUE, the basesink will perform asynchronous state changes.
461    * When set to #FALSE, the sink will not signal the parent when it prerolls.
462    * Use this option when dealing with sparse streams or when synchronisation is
463    * not required.
464    *
465    * Since: 0.10.15
466    */
467   g_object_class_install_property (gobject_class, PROP_ASYNC,
468       g_param_spec_boolean ("async", "Async",
469           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471   /**
472    * GstBaseSink:ts-offset
473    *
474    * Controls the final synchronisation, a negative value will render the buffer
475    * earlier while a positive value delays playback. This property can be
476    * used to fix synchronisation in bad files.
477    *
478    * Since: 0.10.15
479    */
480   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
481       g_param_spec_int64 ("ts-offset", "TS Offset",
482           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
483           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
484
485   /**
486    * GstBaseSink:enable-last-buffer
487    *
488    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
489    * reference to the last buffer arrived and the last-buffer property is always
490    * set to NULL. This can be useful if you need buffers to be released as soon
491    * as possible, eg. if you're using a buffer pool.
492    *
493    * Since: 0.10.30
494    */
495   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
496       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
497           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
498           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
499
500   /**
501    * GstBaseSink:last-buffer
502    *
503    * The last buffer that arrived in the sink and was used for preroll or for
504    * rendering. This property can be used to generate thumbnails. This property
505    * can be NULL when the sink has not yet received a bufer.
506    *
507    * Since: 0.10.15
508    */
509   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
510       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
511           "The last buffer received in the sink", GST_TYPE_BUFFER,
512           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
513   /**
514    * GstBaseSink:blocksize
515    *
516    * The amount of bytes to pull when operating in pull mode.
517    *
518    * Since: 0.10.22
519    */
520   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
521   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
522       g_param_spec_uint ("blocksize", "Block size",
523           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
524           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
525   /**
526    * GstBaseSink:render-delay
527    *
528    * The additional delay between synchronisation and actual rendering of the
529    * media. This property will add additional latency to the device in order to
530    * make other sinks compensate for the delay.
531    *
532    * Since: 0.10.22
533    */
534   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
535       g_param_spec_uint64 ("render-delay", "Render Delay",
536           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
537           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
538   /**
539    * GstBaseSink:throttle-time
540    *
541    * The time to insert between buffers. This property can be used to control
542    * the maximum amount of buffers per second to render. Setting this property
543    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
544    *
545    * Since: 0.10.33
546    */
547   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
548       g_param_spec_uint64 ("throttle-time", "Throttle time",
549           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
550           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551
552   gstelement_class->change_state =
553       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
554   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
555   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
556   gstelement_class->get_query_types =
557       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
558
559   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
560   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
561   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
562   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
563   klass->activate_pull =
564       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
565
566   /* Registering debug symbols for function pointers */
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
571   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
572   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
573   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
575   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
576   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
577 }
578
579 static GstCaps *
580 gst_base_sink_pad_getcaps (GstPad * pad)
581 {
582   GstBaseSinkClass *bclass;
583   GstBaseSink *bsink;
584   GstCaps *caps = NULL;
585
586   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
587   bclass = GST_BASE_SINK_GET_CLASS (bsink);
588
589   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
590     /* if we are operating in pull mode we only accept the negotiated caps */
591     GST_OBJECT_LOCK (pad);
592     if ((caps = GST_PAD_CAPS (pad)))
593       gst_caps_ref (caps);
594     GST_OBJECT_UNLOCK (pad);
595   }
596   if (caps == NULL) {
597     if (bclass->get_caps)
598       caps = bclass->get_caps (bsink);
599
600     if (caps == NULL) {
601       GstPadTemplate *pad_template;
602
603       pad_template =
604           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
605           "sink");
606       if (pad_template != NULL) {
607         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
608       }
609     }
610   }
611   gst_object_unref (bsink);
612
613   return caps;
614 }
615
616 static gboolean
617 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
618 {
619   GstBaseSinkClass *bclass;
620   GstBaseSink *bsink;
621   gboolean res = TRUE;
622
623   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
624   bclass = GST_BASE_SINK_GET_CLASS (bsink);
625
626   if (res && bclass->set_caps)
627     res = bclass->set_caps (bsink, caps);
628
629   gst_object_unref (bsink);
630
631   return res;
632 }
633
634 static void
635 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
636 {
637   GstBaseSinkClass *bclass;
638   GstBaseSink *bsink;
639
640   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
641   bclass = GST_BASE_SINK_GET_CLASS (bsink);
642
643   if (bclass->fixate)
644     bclass->fixate (bsink, caps);
645
646   gst_object_unref (bsink);
647 }
648
649 static GstFlowReturn
650 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
651     GstCaps * caps, GstBuffer ** buf)
652 {
653   GstBaseSinkClass *bclass;
654   GstBaseSink *bsink;
655   GstFlowReturn result = GST_FLOW_OK;
656
657   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
658   if (G_UNLIKELY (bsink == NULL))
659     return GST_FLOW_WRONG_STATE;
660   bclass = GST_BASE_SINK_GET_CLASS (bsink);
661
662   if (bclass->buffer_alloc)
663     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
664   else
665     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
666
667   gst_object_unref (bsink);
668
669   return result;
670 }
671
672 static void
673 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
674 {
675   GstPadTemplate *pad_template;
676   GstBaseSinkPrivate *priv;
677
678   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
679
680   pad_template =
681       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
682   g_return_if_fail (pad_template != NULL);
683
684   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
685
686   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
687   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
688   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
689   gst_pad_set_bufferalloc_function (basesink->sinkpad,
690       gst_base_sink_pad_buffer_alloc);
691   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
692   gst_pad_set_activatepush_function (basesink->sinkpad,
693       gst_base_sink_pad_activate_push);
694   gst_pad_set_activatepull_function (basesink->sinkpad,
695       gst_base_sink_pad_activate_pull);
696   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
697   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
698   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
699   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
700
701   basesink->pad_mode = GST_ACTIVATE_NONE;
702   basesink->preroll_queue = g_queue_new ();
703   basesink->abidata.ABI.clip_segment = gst_segment_new ();
704   priv->have_latency = FALSE;
705
706   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
707   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
708
709   basesink->sync = DEFAULT_SYNC;
710   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
711   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
712   priv->async_enabled = DEFAULT_ASYNC;
713   priv->ts_offset = DEFAULT_TS_OFFSET;
714   priv->render_delay = DEFAULT_RENDER_DELAY;
715   priv->blocksize = DEFAULT_BLOCKSIZE;
716   priv->cached_clock_id = NULL;
717   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
718   priv->throttle_time = DEFAULT_THROTTLE_TIME;
719
720   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
721 }
722
723 static void
724 gst_base_sink_finalize (GObject * object)
725 {
726   GstBaseSink *basesink;
727
728   basesink = GST_BASE_SINK (object);
729
730   g_queue_free (basesink->preroll_queue);
731   gst_segment_free (basesink->abidata.ABI.clip_segment);
732
733   G_OBJECT_CLASS (parent_class)->finalize (object);
734 }
735
736 /**
737  * gst_base_sink_set_sync:
738  * @sink: the sink
739  * @sync: the new sync value.
740  *
741  * Configures @sink to synchronize on the clock or not. When
742  * @sync is FALSE, incomming samples will be played as fast as
743  * possible. If @sync is TRUE, the timestamps of the incomming
744  * buffers will be used to schedule the exact render time of its
745  * contents.
746  *
747  * Since: 0.10.4
748  */
749 void
750 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
751 {
752   g_return_if_fail (GST_IS_BASE_SINK (sink));
753
754   GST_OBJECT_LOCK (sink);
755   sink->sync = sync;
756   GST_OBJECT_UNLOCK (sink);
757 }
758
759 /**
760  * gst_base_sink_get_sync:
761  * @sink: the sink
762  *
763  * Checks if @sink is currently configured to synchronize against the
764  * clock.
765  *
766  * Returns: TRUE if the sink is configured to synchronize against the clock.
767  *
768  * Since: 0.10.4
769  */
770 gboolean
771 gst_base_sink_get_sync (GstBaseSink * sink)
772 {
773   gboolean res;
774
775   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
776
777   GST_OBJECT_LOCK (sink);
778   res = sink->sync;
779   GST_OBJECT_UNLOCK (sink);
780
781   return res;
782 }
783
784 /**
785  * gst_base_sink_set_max_lateness:
786  * @sink: the sink
787  * @max_lateness: the new max lateness value.
788  *
789  * Sets the new max lateness value to @max_lateness. This value is
790  * used to decide if a buffer should be dropped or not based on the
791  * buffer timestamp and the current clock time. A value of -1 means
792  * an unlimited time.
793  *
794  * Since: 0.10.4
795  */
796 void
797 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
798 {
799   g_return_if_fail (GST_IS_BASE_SINK (sink));
800
801   GST_OBJECT_LOCK (sink);
802   sink->abidata.ABI.max_lateness = max_lateness;
803   GST_OBJECT_UNLOCK (sink);
804 }
805
806 /**
807  * gst_base_sink_get_max_lateness:
808  * @sink: the sink
809  *
810  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
811  * more details.
812  *
813  * Returns: The maximum time in nanoseconds that a buffer can be late
814  * before it is dropped and not rendered. A value of -1 means an
815  * unlimited time.
816  *
817  * Since: 0.10.4
818  */
819 gint64
820 gst_base_sink_get_max_lateness (GstBaseSink * sink)
821 {
822   gint64 res;
823
824   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
825
826   GST_OBJECT_LOCK (sink);
827   res = sink->abidata.ABI.max_lateness;
828   GST_OBJECT_UNLOCK (sink);
829
830   return res;
831 }
832
833 /**
834  * gst_base_sink_set_qos_enabled:
835  * @sink: the sink
836  * @enabled: the new qos value.
837  *
838  * Configures @sink to send Quality-of-Service events upstream.
839  *
840  * Since: 0.10.5
841  */
842 void
843 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
844 {
845   g_return_if_fail (GST_IS_BASE_SINK (sink));
846
847   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
848 }
849
850 /**
851  * gst_base_sink_is_qos_enabled:
852  * @sink: the sink
853  *
854  * Checks if @sink is currently configured to send Quality-of-Service events
855  * upstream.
856  *
857  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
858  *
859  * Since: 0.10.5
860  */
861 gboolean
862 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
863 {
864   gboolean res;
865
866   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
867
868   res = g_atomic_int_get (&sink->priv->qos_enabled);
869
870   return res;
871 }
872
873 /**
874  * gst_base_sink_set_async_enabled:
875  * @sink: the sink
876  * @enabled: the new async value.
877  *
878  * Configures @sink to perform all state changes asynchronusly. When async is
879  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
880  * preroll buffer. This feature is usefull if the sink does not synchronize
881  * against the clock or when it is dealing with sparse streams.
882  *
883  * Since: 0.10.15
884  */
885 void
886 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
887 {
888   g_return_if_fail (GST_IS_BASE_SINK (sink));
889
890   GST_PAD_PREROLL_LOCK (sink->sinkpad);
891   g_atomic_int_set (&sink->priv->async_enabled, enabled);
892   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
893   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
894 }
895
896 /**
897  * gst_base_sink_is_async_enabled:
898  * @sink: the sink
899  *
900  * Checks if @sink is currently configured to perform asynchronous state
901  * changes to PAUSED.
902  *
903  * Returns: TRUE if the sink is configured to perform asynchronous state
904  * changes.
905  *
906  * Since: 0.10.15
907  */
908 gboolean
909 gst_base_sink_is_async_enabled (GstBaseSink * sink)
910 {
911   gboolean res;
912
913   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
914
915   res = g_atomic_int_get (&sink->priv->async_enabled);
916
917   return res;
918 }
919
920 /**
921  * gst_base_sink_set_ts_offset:
922  * @sink: the sink
923  * @offset: the new offset
924  *
925  * Adjust the synchronisation of @sink with @offset. A negative value will
926  * render buffers earlier than their timestamp. A positive value will delay
927  * rendering. This function can be used to fix playback of badly timestamped
928  * buffers.
929  *
930  * Since: 0.10.15
931  */
932 void
933 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
934 {
935   g_return_if_fail (GST_IS_BASE_SINK (sink));
936
937   GST_OBJECT_LOCK (sink);
938   sink->priv->ts_offset = offset;
939   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
940   GST_OBJECT_UNLOCK (sink);
941 }
942
943 /**
944  * gst_base_sink_get_ts_offset:
945  * @sink: the sink
946  *
947  * Get the synchronisation offset of @sink.
948  *
949  * Returns: The synchronisation offset.
950  *
951  * Since: 0.10.15
952  */
953 GstClockTimeDiff
954 gst_base_sink_get_ts_offset (GstBaseSink * sink)
955 {
956   GstClockTimeDiff res;
957
958   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
959
960   GST_OBJECT_LOCK (sink);
961   res = sink->priv->ts_offset;
962   GST_OBJECT_UNLOCK (sink);
963
964   return res;
965 }
966
967 /**
968  * gst_base_sink_get_last_buffer:
969  * @sink: the sink
970  *
971  * Get the last buffer that arrived in the sink and was used for preroll or for
972  * rendering. This property can be used to generate thumbnails.
973  *
974  * The #GstCaps on the buffer can be used to determine the type of the buffer.
975  *
976  * Free-function: gst_buffer_unref
977  *
978  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
979  *     This function returns NULL when no buffer has arrived in the sink yet
980  *     or when the sink is not in PAUSED or PLAYING.
981  *
982  * Since: 0.10.15
983  */
984 GstBuffer *
985 gst_base_sink_get_last_buffer (GstBaseSink * sink)
986 {
987   GstBuffer *res;
988
989   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
990
991   GST_OBJECT_LOCK (sink);
992   if ((res = sink->priv->last_buffer))
993     gst_buffer_ref (res);
994   GST_OBJECT_UNLOCK (sink);
995
996   return res;
997 }
998
999 /* with OBJECT_LOCK */
1000 static void
1001 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1002 {
1003   GstBuffer *old;
1004
1005   old = sink->priv->last_buffer;
1006   if (G_LIKELY (old != buffer)) {
1007     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1008     if (G_LIKELY (buffer))
1009       gst_buffer_ref (buffer);
1010     sink->priv->last_buffer = buffer;
1011   } else {
1012     old = NULL;
1013   }
1014   /* avoid unreffing with the lock because cleanup code might want to take the
1015    * lock too */
1016   if (G_LIKELY (old)) {
1017     GST_OBJECT_UNLOCK (sink);
1018     gst_buffer_unref (old);
1019     GST_OBJECT_LOCK (sink);
1020   }
1021 }
1022
1023 static void
1024 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1025 {
1026   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1027     return;
1028
1029   GST_OBJECT_LOCK (sink);
1030   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1031   GST_OBJECT_UNLOCK (sink);
1032 }
1033
1034 /**
1035  * gst_base_sink_set_last_buffer_enabled:
1036  * @sink: the sink
1037  * @enabled: the new enable-last-buffer value.
1038  *
1039  * Configures @sink to store the last received buffer in the last-buffer
1040  * property.
1041  *
1042  * Since: 0.10.30
1043  */
1044 void
1045 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1046 {
1047   g_return_if_fail (GST_IS_BASE_SINK (sink));
1048
1049   /* Only take lock if we change the value */
1050   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1051           !enabled, enabled) && !enabled) {
1052     GST_OBJECT_LOCK (sink);
1053     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1054     GST_OBJECT_UNLOCK (sink);
1055   }
1056 }
1057
1058 /**
1059  * gst_base_sink_is_last_buffer_enabled:
1060  * @sink: the sink
1061  *
1062  * Checks if @sink is currently configured to store the last received buffer in
1063  * the last-buffer property.
1064  *
1065  * Returns: TRUE if the sink is configured to store the last received buffer.
1066  *
1067  * Since: 0.10.30
1068  */
1069 gboolean
1070 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1071 {
1072   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1073
1074   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1075 }
1076
1077 /**
1078  * gst_base_sink_get_latency:
1079  * @sink: the sink
1080  *
1081  * Get the currently configured latency.
1082  *
1083  * Returns: The configured latency.
1084  *
1085  * Since: 0.10.12
1086  */
1087 GstClockTime
1088 gst_base_sink_get_latency (GstBaseSink * sink)
1089 {
1090   GstClockTime res;
1091
1092   GST_OBJECT_LOCK (sink);
1093   res = sink->priv->latency;
1094   GST_OBJECT_UNLOCK (sink);
1095
1096   return res;
1097 }
1098
1099 /**
1100  * gst_base_sink_query_latency:
1101  * @sink: the sink
1102  * @live: (out) (allow-none): if the sink is live
1103  * @upstream_live: (out) (allow-none): if an upstream element is live
1104  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1105  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1106  *
1107  * Query the sink for the latency parameters. The latency will be queried from
1108  * the upstream elements. @live will be TRUE if @sink is configured to
1109  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1110  * element is live.
1111  *
1112  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1113  * for the latency introduced by the upstream elements by setting the
1114  * @min_latency to a strictly possitive value.
1115  *
1116  * This function is mostly used by subclasses.
1117  *
1118  * Returns: TRUE if the query succeeded.
1119  *
1120  * Since: 0.10.12
1121  */
1122 gboolean
1123 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1124     gboolean * upstream_live, GstClockTime * min_latency,
1125     GstClockTime * max_latency)
1126 {
1127   gboolean l, us_live, res, have_latency;
1128   GstClockTime min, max, render_delay;
1129   GstQuery *query;
1130   GstClockTime us_min, us_max;
1131
1132   /* we are live when we sync to the clock */
1133   GST_OBJECT_LOCK (sink);
1134   l = sink->sync;
1135   have_latency = sink->priv->have_latency;
1136   render_delay = sink->priv->render_delay;
1137   GST_OBJECT_UNLOCK (sink);
1138
1139   /* assume no latency */
1140   min = 0;
1141   max = -1;
1142   us_live = FALSE;
1143
1144   if (have_latency) {
1145     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1146     /* we are ready for a latency query this is when we preroll or when we are
1147      * not async. */
1148     query = gst_query_new_latency ();
1149
1150     /* ask the peer for the latency */
1151     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1152       /* get upstream min and max latency */
1153       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1154
1155       if (us_live) {
1156         /* upstream live, use its latency, subclasses should use these
1157          * values to create the complete latency. */
1158         min = us_min;
1159         max = us_max;
1160       }
1161       if (l) {
1162         /* we need to add the render delay if we are live */
1163         if (min != -1)
1164           min += render_delay;
1165         if (max != -1)
1166           max += render_delay;
1167       }
1168     }
1169     gst_query_unref (query);
1170   } else {
1171     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1172     res = FALSE;
1173   }
1174
1175   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1176   if (!res) {
1177     if (!l) {
1178       res = TRUE;
1179       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1180     } else {
1181       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1182     }
1183   }
1184
1185   if (res) {
1186     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1187         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1188         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1189
1190     if (live)
1191       *live = l;
1192     if (upstream_live)
1193       *upstream_live = us_live;
1194     if (min_latency)
1195       *min_latency = min;
1196     if (max_latency)
1197       *max_latency = max;
1198   }
1199   return res;
1200 }
1201
1202 /**
1203  * gst_base_sink_set_render_delay:
1204  * @sink: a #GstBaseSink
1205  * @delay: the new delay
1206  *
1207  * Set the render delay in @sink to @delay. The render delay is the time
1208  * between actual rendering of a buffer and its synchronisation time. Some
1209  * devices might delay media rendering which can be compensated for with this
1210  * function.
1211  *
1212  * After calling this function, this sink will report additional latency and
1213  * other sinks will adjust their latency to delay the rendering of their media.
1214  *
1215  * This function is usually called by subclasses.
1216  *
1217  * Since: 0.10.21
1218  */
1219 void
1220 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1221 {
1222   GstClockTime old_render_delay;
1223
1224   g_return_if_fail (GST_IS_BASE_SINK (sink));
1225
1226   GST_OBJECT_LOCK (sink);
1227   old_render_delay = sink->priv->render_delay;
1228   sink->priv->render_delay = delay;
1229   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1230       GST_TIME_ARGS (delay));
1231   GST_OBJECT_UNLOCK (sink);
1232
1233   if (delay != old_render_delay) {
1234     GST_DEBUG_OBJECT (sink, "posting latency changed");
1235     gst_element_post_message (GST_ELEMENT_CAST (sink),
1236         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1237   }
1238 }
1239
1240 /**
1241  * gst_base_sink_get_render_delay:
1242  * @sink: a #GstBaseSink
1243  *
1244  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1245  * information about the render delay.
1246  *
1247  * Returns: the render delay of @sink.
1248  *
1249  * Since: 0.10.21
1250  */
1251 GstClockTime
1252 gst_base_sink_get_render_delay (GstBaseSink * sink)
1253 {
1254   GstClockTimeDiff res;
1255
1256   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1257
1258   GST_OBJECT_LOCK (sink);
1259   res = sink->priv->render_delay;
1260   GST_OBJECT_UNLOCK (sink);
1261
1262   return res;
1263 }
1264
1265 /**
1266  * gst_base_sink_set_blocksize:
1267  * @sink: a #GstBaseSink
1268  * @blocksize: the blocksize in bytes
1269  *
1270  * Set the number of bytes that the sink will pull when it is operating in pull
1271  * mode.
1272  *
1273  * Since: 0.10.22
1274  */
1275 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1276 void
1277 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1278 {
1279   g_return_if_fail (GST_IS_BASE_SINK (sink));
1280
1281   GST_OBJECT_LOCK (sink);
1282   sink->priv->blocksize = blocksize;
1283   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1284   GST_OBJECT_UNLOCK (sink);
1285 }
1286
1287 /**
1288  * gst_base_sink_get_blocksize:
1289  * @sink: a #GstBaseSink
1290  *
1291  * Get the number of bytes that the sink will pull when it is operating in pull
1292  * mode.
1293  *
1294  * Returns: the number of bytes @sink will pull in pull mode.
1295  *
1296  * Since: 0.10.22
1297  */
1298 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1299 guint
1300 gst_base_sink_get_blocksize (GstBaseSink * sink)
1301 {
1302   guint res;
1303
1304   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1305
1306   GST_OBJECT_LOCK (sink);
1307   res = sink->priv->blocksize;
1308   GST_OBJECT_UNLOCK (sink);
1309
1310   return res;
1311 }
1312
1313 /**
1314  * gst_base_sink_set_throttle_time:
1315  * @sink: a #GstBaseSink
1316  * @throttle: the throttle time in nanoseconds
1317  *
1318  * Set the time that will be inserted between rendered buffers. This
1319  * can be used to control the maximum buffers per second that the sink
1320  * will render. 
1321  *
1322  * Since: 0.10.33
1323  */
1324 void
1325 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1326 {
1327   g_return_if_fail (GST_IS_BASE_SINK (sink));
1328
1329   GST_OBJECT_LOCK (sink);
1330   sink->priv->throttle_time = throttle;
1331   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1332   GST_OBJECT_UNLOCK (sink);
1333 }
1334
1335 /**
1336  * gst_base_sink_get_throttle_time:
1337  * @sink: a #GstBaseSink
1338  *
1339  * Get the time that will be inserted between frames to control the 
1340  * maximum buffers per second.
1341  *
1342  * Returns: the number of nanoseconds @sink will put between frames.
1343  *
1344  * Since: 0.10.33
1345  */
1346 guint64
1347 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1348 {
1349   guint64 res;
1350
1351   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1352
1353   GST_OBJECT_LOCK (sink);
1354   res = sink->priv->throttle_time;
1355   GST_OBJECT_UNLOCK (sink);
1356
1357   return res;
1358 }
1359
1360 static void
1361 gst_base_sink_set_property (GObject * object, guint prop_id,
1362     const GValue * value, GParamSpec * pspec)
1363 {
1364   GstBaseSink *sink = GST_BASE_SINK (object);
1365
1366   switch (prop_id) {
1367     case PROP_PREROLL_QUEUE_LEN:
1368       /* preroll lock necessary to serialize with finish_preroll */
1369       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1370       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1371       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1372       break;
1373     case PROP_SYNC:
1374       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1375       break;
1376     case PROP_MAX_LATENESS:
1377       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1378       break;
1379     case PROP_QOS:
1380       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1381       break;
1382     case PROP_ASYNC:
1383       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1384       break;
1385     case PROP_TS_OFFSET:
1386       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1387       break;
1388     case PROP_BLOCKSIZE:
1389       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1390       break;
1391     case PROP_RENDER_DELAY:
1392       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1393       break;
1394     case PROP_ENABLE_LAST_BUFFER:
1395       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1396       break;
1397     case PROP_THROTTLE_TIME:
1398       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1399       break;
1400     default:
1401       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1402       break;
1403   }
1404 }
1405
1406 static void
1407 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1408     GParamSpec * pspec)
1409 {
1410   GstBaseSink *sink = GST_BASE_SINK (object);
1411
1412   switch (prop_id) {
1413     case PROP_PREROLL_QUEUE_LEN:
1414       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1415       break;
1416     case PROP_SYNC:
1417       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1418       break;
1419     case PROP_MAX_LATENESS:
1420       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1421       break;
1422     case PROP_QOS:
1423       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1424       break;
1425     case PROP_ASYNC:
1426       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1427       break;
1428     case PROP_TS_OFFSET:
1429       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1430       break;
1431     case PROP_LAST_BUFFER:
1432       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1433       break;
1434     case PROP_ENABLE_LAST_BUFFER:
1435       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1436       break;
1437     case PROP_BLOCKSIZE:
1438       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1439       break;
1440     case PROP_RENDER_DELAY:
1441       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1442       break;
1443     case PROP_THROTTLE_TIME:
1444       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1445       break;
1446     default:
1447       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1448       break;
1449   }
1450 }
1451
1452
1453 static GstCaps *
1454 gst_base_sink_get_caps (GstBaseSink * sink)
1455 {
1456   return NULL;
1457 }
1458
1459 static gboolean
1460 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1461 {
1462   return TRUE;
1463 }
1464
1465 static GstFlowReturn
1466 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1467     GstCaps * caps, GstBuffer ** buf)
1468 {
1469   *buf = NULL;
1470   return GST_FLOW_OK;
1471 }
1472
1473 /* with PREROLL_LOCK, STREAM_LOCK */
1474 static void
1475 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1476 {
1477   GstMiniObject *obj;
1478
1479   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1480   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1481     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1482     gst_mini_object_unref (obj);
1483   }
1484   /* we can't have EOS anymore now */
1485   basesink->eos = FALSE;
1486   basesink->priv->received_eos = FALSE;
1487   basesink->have_preroll = FALSE;
1488   basesink->priv->step_unlock = FALSE;
1489   basesink->eos_queued = FALSE;
1490   basesink->preroll_queued = 0;
1491   basesink->buffers_queued = 0;
1492   basesink->events_queued = 0;
1493   /* can't report latency anymore until we preroll again */
1494   if (basesink->priv->async_enabled) {
1495     GST_OBJECT_LOCK (basesink);
1496     basesink->priv->have_latency = FALSE;
1497     GST_OBJECT_UNLOCK (basesink);
1498   }
1499   /* and signal any waiters now */
1500   GST_PAD_PREROLL_SIGNAL (pad);
1501 }
1502
1503 /* with STREAM_LOCK, configures given segment with the event information. */
1504 static void
1505 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1506     GstEvent * event, GstSegment * segment)
1507 {
1508   gboolean update;
1509   gdouble rate, arate;
1510   GstFormat format;
1511   gint64 start;
1512   gint64 stop;
1513   gint64 time;
1514
1515   /* the newsegment event is needed to bring the buffer timestamps to the
1516    * stream time and to drop samples outside of the playback segment. */
1517   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1518       &start, &stop, &time);
1519
1520   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1521    * We protect with the OBJECT_LOCK so that we can use the values to
1522    * safely answer a POSITION query. */
1523   GST_OBJECT_LOCK (basesink);
1524   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1525       stop, time);
1526
1527   if (format == GST_FORMAT_TIME) {
1528     GST_DEBUG_OBJECT (basesink,
1529         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1530         "format GST_FORMAT_TIME, "
1531         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1532         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1533         update, rate, arate, GST_TIME_ARGS (segment->start),
1534         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1535         GST_TIME_ARGS (segment->accum));
1536   } else {
1537     GST_DEBUG_OBJECT (basesink,
1538         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1539         "format %d, "
1540         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1541         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1542         segment->format, segment->start, segment->stop, segment->time,
1543         segment->accum);
1544   }
1545   GST_OBJECT_UNLOCK (basesink);
1546 }
1547
1548 /* with PREROLL_LOCK, STREAM_LOCK */
1549 static gboolean
1550 gst_base_sink_commit_state (GstBaseSink * basesink)
1551 {
1552   /* commit state and proceed to next pending state */
1553   GstState current, next, pending, post_pending;
1554   gboolean post_paused = FALSE;
1555   gboolean post_async_done = FALSE;
1556   gboolean post_playing = FALSE;
1557
1558   /* we are certainly not playing async anymore now */
1559   basesink->playing_async = FALSE;
1560
1561   GST_OBJECT_LOCK (basesink);
1562   current = GST_STATE (basesink);
1563   next = GST_STATE_NEXT (basesink);
1564   pending = GST_STATE_PENDING (basesink);
1565   post_pending = pending;
1566
1567   switch (pending) {
1568     case GST_STATE_PLAYING:
1569     {
1570       GstBaseSinkClass *bclass;
1571       GstStateChangeReturn ret;
1572
1573       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1574
1575       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1576
1577       basesink->need_preroll = FALSE;
1578       post_async_done = TRUE;
1579       basesink->priv->commited = TRUE;
1580       post_playing = TRUE;
1581       /* post PAUSED too when we were READY */
1582       if (current == GST_STATE_READY) {
1583         post_paused = TRUE;
1584       }
1585
1586       /* make sure we notify the subclass of async playing */
1587       if (bclass->async_play) {
1588         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1589         ret = bclass->async_play (basesink);
1590         if (ret == GST_STATE_CHANGE_FAILURE)
1591           goto async_failed;
1592       }
1593       break;
1594     }
1595     case GST_STATE_PAUSED:
1596       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1597       post_paused = TRUE;
1598       post_async_done = TRUE;
1599       basesink->priv->commited = TRUE;
1600       post_pending = GST_STATE_VOID_PENDING;
1601       break;
1602     case GST_STATE_READY:
1603     case GST_STATE_NULL:
1604       goto stopping;
1605     case GST_STATE_VOID_PENDING:
1606       goto nothing_pending;
1607     default:
1608       break;
1609   }
1610
1611   /* we can report latency queries now */
1612   basesink->priv->have_latency = TRUE;
1613
1614   GST_STATE (basesink) = pending;
1615   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1616   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1617   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1618   GST_OBJECT_UNLOCK (basesink);
1619
1620   if (post_paused) {
1621     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1622     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1623         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1624             current, next, post_pending));
1625   }
1626   if (post_async_done) {
1627     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1628     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1629         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1630   }
1631   if (post_playing) {
1632     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1633     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1634         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1635             next, pending, GST_STATE_VOID_PENDING));
1636   }
1637
1638   GST_STATE_BROADCAST (basesink);
1639
1640   return TRUE;
1641
1642 nothing_pending:
1643   {
1644     /* Depending on the state, set our vars. We get in this situation when the
1645      * state change function got a change to update the state vars before the
1646      * streaming thread did. This is fine but we need to make sure that we
1647      * update the need_preroll var since it was TRUE when we got here and might
1648      * become FALSE if we got to PLAYING. */
1649     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1650         gst_element_state_get_name (current));
1651     switch (current) {
1652       case GST_STATE_PLAYING:
1653         basesink->need_preroll = FALSE;
1654         break;
1655       case GST_STATE_PAUSED:
1656         basesink->need_preroll = TRUE;
1657         break;
1658       default:
1659         basesink->need_preroll = FALSE;
1660         basesink->flushing = TRUE;
1661         break;
1662     }
1663     /* we can report latency queries now */
1664     basesink->priv->have_latency = TRUE;
1665     GST_OBJECT_UNLOCK (basesink);
1666     return TRUE;
1667   }
1668 stopping:
1669   {
1670     /* app is going to READY */
1671     GST_DEBUG_OBJECT (basesink, "stopping");
1672     basesink->need_preroll = FALSE;
1673     basesink->flushing = TRUE;
1674     GST_OBJECT_UNLOCK (basesink);
1675     return FALSE;
1676   }
1677 async_failed:
1678   {
1679     GST_DEBUG_OBJECT (basesink, "async commit failed");
1680     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1681     GST_OBJECT_UNLOCK (basesink);
1682     return FALSE;
1683   }
1684 }
1685
1686 static void
1687 start_stepping (GstBaseSink * sink, GstSegment * segment,
1688     GstStepInfo * pending, GstStepInfo * current)
1689 {
1690   gint64 end;
1691   GstMessage *message;
1692
1693   GST_DEBUG_OBJECT (sink, "update pending step");
1694
1695   GST_OBJECT_LOCK (sink);
1696   memcpy (current, pending, sizeof (GstStepInfo));
1697   pending->valid = FALSE;
1698   GST_OBJECT_UNLOCK (sink);
1699
1700   /* post message first */
1701   message =
1702       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1703       current->amount, current->rate, current->flush, current->intermediate);
1704   gst_message_set_seqnum (message, current->seqnum);
1705   gst_element_post_message (GST_ELEMENT (sink), message);
1706
1707   /* get the running time of where we paused and remember it */
1708   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1709   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1710
1711   /* set the new rate for the remainder of the segment */
1712   current->start_rate = segment->rate;
1713   segment->rate *= current->rate;
1714   segment->abs_rate = ABS (segment->rate);
1715
1716   /* save values */
1717   if (segment->rate > 0.0)
1718     current->start_stop = segment->stop;
1719   else
1720     current->start_start = segment->start;
1721
1722   if (current->format == GST_FORMAT_TIME) {
1723     end = current->start + current->amount;
1724     if (!current->flush) {
1725       /* update the segment clipping regions for non-flushing seeks */
1726       if (segment->rate > 0.0) {
1727         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1728         segment->last_stop = segment->stop;
1729       } else {
1730         gint64 position;
1731
1732         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1733         segment->time = position;
1734         segment->start = position;
1735         segment->last_stop = position;
1736       }
1737     }
1738   }
1739
1740   GST_DEBUG_OBJECT (sink,
1741       "segment now rate %lf, applied rate %lf, "
1742       "format GST_FORMAT_TIME, "
1743       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1744       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1745       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1746       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1747       GST_TIME_ARGS (segment->accum));
1748
1749   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1750       GST_TIME_ARGS (current->start));
1751
1752   if (current->amount == -1) {
1753     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1754     current->valid = FALSE;
1755   } else {
1756     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1757         "rate: %f", current->amount, gst_format_get_name (current->format),
1758         current->rate);
1759   }
1760 }
1761
1762 static void
1763 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1764     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1765 {
1766   gint64 stop, position;
1767   GstMessage *message;
1768
1769   GST_DEBUG_OBJECT (sink, "step complete");
1770
1771   if (segment->rate > 0.0)
1772     stop = rstart;
1773   else
1774     stop = rstop;
1775
1776   GST_DEBUG_OBJECT (sink,
1777       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1778
1779   if (stop == -1)
1780     current->duration = current->position;
1781   else
1782     current->duration = stop - current->start;
1783
1784   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1785       GST_TIME_ARGS (current->duration));
1786
1787   position = current->start + current->duration;
1788
1789   /* now move the segment to the new running time */
1790   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1791
1792   if (current->flush) {
1793     /* and remove the accumulated time we flushed, start time did not change */
1794     segment->accum = current->start;
1795   } else {
1796     /* start time is now the stepped position */
1797     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1798   }
1799
1800   /* restore the previous rate */
1801   segment->rate = current->start_rate;
1802   segment->abs_rate = ABS (segment->rate);
1803
1804   if (segment->rate > 0.0)
1805     segment->stop = current->start_stop;
1806   else
1807     segment->start = current->start_start;
1808
1809   /* the clip segment is used for position report in paused... */
1810   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1811
1812   /* post the step done when we know the stepped duration in TIME */
1813   message =
1814       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1815       current->amount, current->rate, current->flush, current->intermediate,
1816       current->duration, eos);
1817   gst_message_set_seqnum (message, current->seqnum);
1818   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1819
1820   if (!current->intermediate)
1821     sink->need_preroll = current->need_preroll;
1822
1823   /* and the current step info finished and becomes invalid */
1824   current->valid = FALSE;
1825 }
1826
1827 static gboolean
1828 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1829     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1830     gint64 * rstop)
1831 {
1832   gboolean step_end = FALSE;
1833
1834   /* see if we need to skip this buffer because of stepping */
1835   switch (current->format) {
1836     case GST_FORMAT_TIME:
1837     {
1838       guint64 end;
1839       gint64 first, last;
1840
1841       if (segment->rate > 0.0) {
1842         if (segment->stop == *cstop)
1843           *rstop = *rstart + current->amount;
1844
1845         first = *rstart;
1846         last = *rstop;
1847       } else {
1848         if (segment->start == *cstart)
1849           *rstart = *rstop + current->amount;
1850
1851         first = *rstop;
1852         last = *rstart;
1853       }
1854
1855       end = current->start + current->amount;
1856       current->position = first - current->start;
1857
1858       if (G_UNLIKELY (segment->abs_rate != 1.0))
1859         current->position /= segment->abs_rate;
1860
1861       GST_DEBUG_OBJECT (sink,
1862           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1863           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1864       GST_DEBUG_OBJECT (sink,
1865           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1866           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1867           GST_TIME_ARGS (last - current->start),
1868           GST_TIME_ARGS (current->amount));
1869
1870       if ((current->flush && current->position >= current->amount)
1871           || last >= end) {
1872         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1873         step_end = TRUE;
1874         if (segment->rate > 0.0) {
1875           *rstart = end;
1876           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1877         } else {
1878           *rstop = end;
1879           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1880         }
1881       }
1882       GST_DEBUG_OBJECT (sink,
1883           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1884           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1885       GST_DEBUG_OBJECT (sink,
1886           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1887           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1888       break;
1889     }
1890     case GST_FORMAT_BUFFERS:
1891       GST_DEBUG_OBJECT (sink,
1892           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1893           current->position, current->amount);
1894
1895       if (current->position < current->amount) {
1896         current->position++;
1897       } else {
1898         step_end = TRUE;
1899       }
1900       break;
1901     case GST_FORMAT_DEFAULT:
1902     default:
1903       GST_DEBUG_OBJECT (sink,
1904           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1905           current->position, current->amount);
1906       break;
1907   }
1908   return step_end;
1909 }
1910
1911 /* with STREAM_LOCK, PREROLL_LOCK
1912  *
1913  * Returns TRUE if the object needs synchronisation and takes therefore
1914  * part in prerolling.
1915  *
1916  * rsstart/rsstop contain the start/stop in stream time.
1917  * rrstart/rrstop contain the start/stop in running time.
1918  */
1919 static gboolean
1920 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1921     GstClockTime * rsstart, GstClockTime * rsstop,
1922     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1923     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1924     gboolean * step_end, guint8 obj_type)
1925 {
1926   GstBaseSinkClass *bclass;
1927   GstBuffer *buffer;
1928   GstClockTime start, stop;     /* raw start/stop timestamps */
1929   gint64 cstart, cstop;         /* clipped raw timestamps */
1930   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1931   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1932   GstFormat format;
1933   GstBaseSinkPrivate *priv;
1934   gboolean eos;
1935
1936   priv = basesink->priv;
1937
1938   /* start with nothing */
1939   start = stop = GST_CLOCK_TIME_NONE;
1940
1941   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1942     GstEvent *event = GST_EVENT_CAST (obj);
1943
1944     switch (GST_EVENT_TYPE (event)) {
1945         /* EOS event needs syncing */
1946       case GST_EVENT_EOS:
1947       {
1948         if (basesink->segment.rate >= 0.0) {
1949           sstart = sstop = priv->current_sstop;
1950           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1951             /* we have not seen a buffer yet, use the segment values */
1952             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1953                 basesink->segment.format, basesink->segment.stop);
1954           }
1955         } else {
1956           sstart = sstop = priv->current_sstart;
1957           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1958             /* we have not seen a buffer yet, use the segment values */
1959             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1960                 basesink->segment.format, basesink->segment.start);
1961           }
1962         }
1963
1964         rstart = rstop = priv->eos_rtime;
1965         *do_sync = rstart != -1;
1966         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1967             GST_TIME_ARGS (rstart));
1968         /* if we are stepping, we end now */
1969         *step_end = step->valid;
1970         eos = TRUE;
1971         goto eos_done;
1972       }
1973       default:
1974         /* other events do not need syncing */
1975         /* FIXME, maybe NEWSEGMENT might need synchronisation
1976          * since the POSITION query depends on accumulated times and
1977          * we cannot accumulate the current segment before the previous
1978          * one completed.
1979          */
1980         return FALSE;
1981     }
1982   }
1983
1984   eos = FALSE;
1985
1986 again:
1987   /* else do buffer sync code */
1988   buffer = GST_BUFFER_CAST (obj);
1989
1990   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1991
1992   /* just get the times to see if we need syncing, if the start returns -1 we
1993    * don't sync. */
1994   if (bclass->get_times)
1995     bclass->get_times (basesink, buffer, &start, &stop);
1996
1997   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1998     /* we don't need to sync but we still want to get the timestamps for
1999      * tracking the position */
2000     gst_base_sink_get_times (basesink, buffer, &start, &stop);
2001     *do_sync = FALSE;
2002   } else {
2003     *do_sync = TRUE;
2004   }
2005
2006   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2007       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2008       GST_TIME_ARGS (stop), *do_sync);
2009
2010   /* collect segment and format for code clarity */
2011   format = segment->format;
2012
2013   /* no timestamp clipping if we did not get a TIME segment format */
2014   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2015     cstart = start;
2016     cstop = stop;
2017     /* do running and stream time in TIME format */
2018     format = GST_FORMAT_TIME;
2019     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2020     goto do_times;
2021   }
2022
2023   /* clip, only when we know about time */
2024   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2025               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2026     if (step->valid) {
2027       GST_DEBUG_OBJECT (basesink, "step out of segment");
2028       /* when we are stepping, pretend we're at the end of the segment */
2029       if (segment->rate > 0.0) {
2030         cstart = segment->stop;
2031         cstop = segment->stop;
2032       } else {
2033         cstart = segment->start;
2034         cstop = segment->start;
2035       }
2036       goto do_times;
2037     }
2038     goto out_of_segment;
2039   }
2040
2041   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2042     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2043         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2044         GST_TIME_ARGS (cstop));
2045   }
2046
2047   /* set last stop position */
2048   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2049     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2050   else
2051     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2052
2053 do_times:
2054   rstart = gst_segment_to_running_time (segment, format, cstart);
2055   rstop = gst_segment_to_running_time (segment, format, cstop);
2056
2057   if (G_UNLIKELY (step->valid)) {
2058     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2059                 &rstart, &rstop))) {
2060       /* step is still busy, we discard data when we are flushing */
2061       *stepped = step->flush;
2062       GST_DEBUG_OBJECT (basesink, "stepping busy");
2063     }
2064   }
2065   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2066    * upstream is behaving very badly */
2067   sstart = gst_segment_to_stream_time (segment, format, cstart);
2068   sstop = gst_segment_to_stream_time (segment, format, cstop);
2069
2070 eos_done:
2071   /* eos_done label only called when doing EOS, we also stop stepping then */
2072   if (*step_end && step->flush) {
2073     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2074     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2075     *step_end = FALSE;
2076     /* re-determine running start times for adjusted segment
2077      * (which has a flushed amount of running/accumulated time removed) */
2078     if (!GST_IS_EVENT (obj)) {
2079       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2080       goto again;
2081     }
2082   }
2083
2084   /* save times */
2085   *rsstart = sstart;
2086   *rsstop = sstop;
2087   *rrstart = rstart;
2088   *rrstop = rstop;
2089
2090   /* buffers and EOS always need syncing and preroll */
2091   return TRUE;
2092
2093   /* special cases */
2094 out_of_segment:
2095   {
2096     /* we usually clip in the chain function already but stepping could cause
2097      * the segment to be updated later. we return FALSE so that we don't try
2098      * to sync on it. */
2099     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2100     return FALSE;
2101   }
2102 }
2103
2104 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2105  * adjust a timestamp with the latency and timestamp offset. This function does
2106  * not adjust for the render delay. */
2107 static GstClockTime
2108 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2109 {
2110   GstClockTimeDiff ts_offset;
2111
2112   /* don't do anything funny with invalid timestamps */
2113   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2114     return time;
2115
2116   time += basesink->priv->latency;
2117
2118   /* apply offset, be carefull for underflows */
2119   ts_offset = basesink->priv->ts_offset;
2120   if (ts_offset < 0) {
2121     ts_offset = -ts_offset;
2122     if (ts_offset < time)
2123       time -= ts_offset;
2124     else
2125       time = 0;
2126   } else
2127     time += ts_offset;
2128
2129   /* subtract the render delay again, which was included in the latency */
2130   if (time > basesink->priv->render_delay)
2131     time -= basesink->priv->render_delay;
2132   else
2133     time = 0;
2134
2135   return time;
2136 }
2137
2138 /**
2139  * gst_base_sink_wait_clock:
2140  * @sink: the sink
2141  * @time: the running_time to be reached
2142  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2143  *
2144  * This function will block until @time is reached. It is usually called by
2145  * subclasses that use their own internal synchronisation.
2146  *
2147  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2148  * returned. Likewise, if synchronisation is disabled in the element or there
2149  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2150  *
2151  * This function should only be called with the PREROLL_LOCK held, like when
2152  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2153  * receiving a buffer in
2154  * the #GstBaseSinkClass.render() vmethod.
2155  *
2156  * The @time argument should be the running_time of when this method should
2157  * return and is not adjusted with any latency or offset configured in the
2158  * sink.
2159  *
2160  * Since: 0.10.20
2161  *
2162  * Returns: #GstClockReturn
2163  */
2164 GstClockReturn
2165 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2166     GstClockTimeDiff * jitter)
2167 {
2168   GstClockReturn ret;
2169   GstClock *clock;
2170   GstClockTime base_time;
2171
2172   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2173     goto invalid_time;
2174
2175   GST_OBJECT_LOCK (sink);
2176   if (G_UNLIKELY (!sink->sync))
2177     goto no_sync;
2178
2179   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2180     goto no_clock;
2181
2182   base_time = GST_ELEMENT_CAST (sink)->base_time;
2183   GST_LOG_OBJECT (sink,
2184       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2185       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2186
2187   /* add base_time to running_time to get the time against the clock */
2188   time += base_time;
2189
2190   /* Re-use existing clockid if available */
2191   /* FIXME: Casting to GstClockEntry only works because the types
2192    * are the same */
2193   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2194           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
2195               priv->cached_clock_id) == clock)) {
2196     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2197             time)) {
2198       gst_clock_id_unref (sink->priv->cached_clock_id);
2199       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2200     }
2201   } else {
2202     if (sink->priv->cached_clock_id != NULL)
2203       gst_clock_id_unref (sink->priv->cached_clock_id);
2204     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2205   }
2206   GST_OBJECT_UNLOCK (sink);
2207
2208   /* A blocking wait is performed on the clock. We save the ClockID
2209    * so we can unlock the entry at any time. While we are blocking, we
2210    * release the PREROLL_LOCK so that other threads can interrupt the
2211    * entry. */
2212   sink->clock_id = sink->priv->cached_clock_id;
2213   /* release the preroll lock while waiting */
2214   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2215
2216   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2217
2218   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2219   sink->clock_id = NULL;
2220
2221   return ret;
2222
2223   /* no syncing needed */
2224 invalid_time:
2225   {
2226     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2227     return GST_CLOCK_BADTIME;
2228   }
2229 no_sync:
2230   {
2231     GST_DEBUG_OBJECT (sink, "sync disabled");
2232     GST_OBJECT_UNLOCK (sink);
2233     return GST_CLOCK_BADTIME;
2234   }
2235 no_clock:
2236   {
2237     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2238     GST_OBJECT_UNLOCK (sink);
2239     return GST_CLOCK_BADTIME;
2240   }
2241 }
2242
2243 /**
2244  * gst_base_sink_wait_preroll:
2245  * @sink: the sink
2246  *
2247  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2248  * against the clock it must unblock when going from PLAYING to the PAUSED state
2249  * and call this method before continuing to render the remaining data.
2250  *
2251  * This function will block until a state change to PLAYING happens (in which
2252  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2253  * to a state change to READY or a FLUSH event (in which case this function
2254  * returns #GST_FLOW_WRONG_STATE).
2255  *
2256  * This function should only be called with the PREROLL_LOCK held, like in the
2257  * render function.
2258  *
2259  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2260  * continue. Any other return value should be returned from the render vmethod.
2261  *
2262  * Since: 0.10.11
2263  */
2264 GstFlowReturn
2265 gst_base_sink_wait_preroll (GstBaseSink * sink)
2266 {
2267   sink->have_preroll = TRUE;
2268   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2269   /* block until the state changes, or we get a flush, or something */
2270   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2271   sink->have_preroll = FALSE;
2272   if (G_UNLIKELY (sink->flushing))
2273     goto stopping;
2274   if (G_UNLIKELY (sink->priv->step_unlock))
2275     goto step_unlocked;
2276   GST_DEBUG_OBJECT (sink, "continue after preroll");
2277
2278   return GST_FLOW_OK;
2279
2280   /* ERRORS */
2281 stopping:
2282   {
2283     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2284     return GST_FLOW_WRONG_STATE;
2285   }
2286 step_unlocked:
2287   {
2288     sink->priv->step_unlock = FALSE;
2289     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2290     return GST_FLOW_STEP;
2291   }
2292 }
2293
2294 static inline guint8
2295 get_object_type (GstMiniObject * obj)
2296 {
2297   guint8 obj_type;
2298
2299   if (G_LIKELY (GST_IS_BUFFER (obj)))
2300     obj_type = _PR_IS_BUFFER;
2301   else if (GST_IS_EVENT (obj))
2302     obj_type = _PR_IS_EVENT;
2303   else if (GST_IS_BUFFER_LIST (obj))
2304     obj_type = _PR_IS_BUFFERLIST;
2305   else
2306     obj_type = _PR_IS_NOTHING;
2307
2308   return obj_type;
2309 }
2310
2311 /**
2312  * gst_base_sink_do_preroll:
2313  * @sink: the sink
2314  * @obj: (transfer none): the mini object that caused the preroll
2315  *
2316  * If the @sink spawns its own thread for pulling buffers from upstream it
2317  * should call this method after it has pulled a buffer. If the element needed
2318  * to preroll, this function will perform the preroll and will then block
2319  * until the element state is changed.
2320  *
2321  * This function should be called with the PREROLL_LOCK held.
2322  *
2323  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2324  * continue. Any other return value should be returned from the render vmethod.
2325  *
2326  * Since: 0.10.22
2327  */
2328 GstFlowReturn
2329 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2330 {
2331   GstFlowReturn ret;
2332
2333   while (G_UNLIKELY (sink->need_preroll)) {
2334     guint8 obj_type;
2335     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2336
2337     obj_type = get_object_type (obj);
2338
2339     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2340     if (ret != GST_FLOW_OK)
2341       goto preroll_failed;
2342
2343     /* need to recheck here because the commit state could have
2344      * made us not need the preroll anymore */
2345     if (G_LIKELY (sink->need_preroll)) {
2346       /* block until the state changes, or we get a flush, or something */
2347       ret = gst_base_sink_wait_preroll (sink);
2348       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2349         goto preroll_failed;
2350     }
2351   }
2352   return GST_FLOW_OK;
2353
2354   /* ERRORS */
2355 preroll_failed:
2356   {
2357     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2358     return ret;
2359   }
2360 }
2361
2362 /**
2363  * gst_base_sink_wait_eos:
2364  * @sink: the sink
2365  * @time: the running_time to be reached
2366  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2367  *
2368  * This function will block until @time is reached. It is usually called by
2369  * subclasses that use their own internal synchronisation but want to let the
2370  * EOS be handled by the base class.
2371  *
2372  * This function should only be called with the PREROLL_LOCK held, like when
2373  * receiving an EOS event in the ::event vmethod.
2374  *
2375  * The @time argument should be the running_time of when the EOS should happen
2376  * and will be adjusted with any latency and offset configured in the sink.
2377  *
2378  * Returns: #GstFlowReturn
2379  *
2380  * Since: 0.10.15
2381  */
2382 GstFlowReturn
2383 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2384     GstClockTimeDiff * jitter)
2385 {
2386   GstClockReturn status;
2387   GstFlowReturn ret;
2388
2389   do {
2390     GstClockTime stime;
2391
2392     GST_DEBUG_OBJECT (sink, "checking preroll");
2393
2394     /* first wait for the playing state before we can continue */
2395     while (G_UNLIKELY (sink->need_preroll)) {
2396       ret = gst_base_sink_wait_preroll (sink);
2397       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2398         goto flushing;
2399     }
2400
2401     /* preroll done, we can sync since we are in PLAYING now. */
2402     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2403         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2404
2405     /* compensate for latency and ts_offset. We don't adjust for render delay
2406      * because we don't interact with the device on EOS normally. */
2407     stime = gst_base_sink_adjust_time (sink, time);
2408
2409     /* wait for the clock, this can be interrupted because we got shut down or
2410      * we PAUSED. */
2411     status = gst_base_sink_wait_clock (sink, stime, jitter);
2412
2413     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2414
2415     /* invalid time, no clock or sync disabled, just continue then */
2416     if (status == GST_CLOCK_BADTIME)
2417       break;
2418
2419     /* waiting could have been interrupted and we can be flushing now */
2420     if (G_UNLIKELY (sink->flushing))
2421       goto flushing;
2422
2423     /* retry if we got unscheduled, which means we did not reach the timeout
2424      * yet. if some other error occures, we continue. */
2425   } while (status == GST_CLOCK_UNSCHEDULED);
2426
2427   GST_DEBUG_OBJECT (sink, "end of stream");
2428
2429   return GST_FLOW_OK;
2430
2431   /* ERRORS */
2432 flushing:
2433   {
2434     GST_DEBUG_OBJECT (sink, "we are flushing");
2435     return GST_FLOW_WRONG_STATE;
2436   }
2437 }
2438
2439 /* with STREAM_LOCK, PREROLL_LOCK
2440  *
2441  * Make sure we are in PLAYING and synchronize an object to the clock.
2442  *
2443  * If we need preroll, we are not in PLAYING. We try to commit the state
2444  * if needed and then block if we still are not PLAYING.
2445  *
2446  * We start waiting on the clock in PLAYING. If we got interrupted, we
2447  * immediatly try to re-preroll.
2448  *
2449  * Some objects do not need synchronisation (most events) and so this function
2450  * immediatly returns GST_FLOW_OK.
2451  *
2452  * for objects that arrive later than max-lateness to be synchronized to the
2453  * clock have the @late boolean set to TRUE.
2454  *
2455  * This function keeps a running average of the jitter (the diff between the
2456  * clock time and the requested sync time). The jitter is negative for
2457  * objects that arrive in time and positive for late buffers.
2458  *
2459  * does not take ownership of obj.
2460  */
2461 static GstFlowReturn
2462 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2463     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2464 {
2465   GstClockTimeDiff jitter = 0;
2466   gboolean syncable;
2467   GstClockReturn status = GST_CLOCK_OK;
2468   GstClockTime rstart, rstop, sstart, sstop, stime;
2469   gboolean do_sync;
2470   GstBaseSinkPrivate *priv;
2471   GstFlowReturn ret;
2472   GstStepInfo *current, *pending;
2473   gboolean stepped;
2474
2475   priv = basesink->priv;
2476
2477 do_step:
2478   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2479   do_sync = TRUE;
2480   stepped = FALSE;
2481
2482   priv->current_rstart = GST_CLOCK_TIME_NONE;
2483
2484   /* get stepping info */
2485   current = &priv->current_step;
2486   pending = &priv->pending_step;
2487
2488   /* get timing information for this object against the render segment */
2489   syncable = gst_base_sink_get_sync_times (basesink, obj,
2490       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2491       current, step_end, obj_type);
2492
2493   if (G_UNLIKELY (stepped))
2494     goto step_skipped;
2495
2496   /* a syncable object needs to participate in preroll and
2497    * clocking. All buffers and EOS are syncable. */
2498   if (G_UNLIKELY (!syncable))
2499     goto not_syncable;
2500
2501   /* store timing info for current object */
2502   priv->current_rstart = rstart;
2503   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2504
2505   /* save sync time for eos when the previous object needed sync */
2506   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2507
2508   /* calculate inter frame spacing */
2509   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2510     GstClockTime in_diff;
2511
2512     in_diff = rstart - priv->prev_rstart;
2513
2514     if (priv->avg_in_diff == -1)
2515       priv->avg_in_diff = in_diff;
2516     else
2517       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2518
2519     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2520         GST_TIME_ARGS (priv->avg_in_diff));
2521
2522   }
2523   priv->prev_rstart = rstart;
2524
2525   if (G_UNLIKELY (priv->earliest_in_time != -1
2526           && rstart < priv->earliest_in_time))
2527     goto qos_dropped;
2528
2529 again:
2530   /* first do preroll, this makes sure we commit our state
2531    * to PAUSED and can continue to PLAYING. We cannot perform
2532    * any clock sync in PAUSED because there is no clock. */
2533   ret = gst_base_sink_do_preroll (basesink, obj);
2534   if (G_UNLIKELY (ret != GST_FLOW_OK))
2535     goto preroll_failed;
2536
2537   /* update the segment with a pending step if the current one is invalid and we
2538    * have a new pending one. We only accept new step updates after a preroll */
2539   if (G_UNLIKELY (pending->valid && !current->valid)) {
2540     start_stepping (basesink, &basesink->segment, pending, current);
2541     goto do_step;
2542   }
2543
2544   /* After rendering we store the position of the last buffer so that we can use
2545    * it to report the position. We need to take the lock here. */
2546   GST_OBJECT_LOCK (basesink);
2547   priv->current_sstart = sstart;
2548   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2549   GST_OBJECT_UNLOCK (basesink);
2550
2551   if (!do_sync)
2552     goto done;
2553
2554   /* adjust for latency */
2555   stime = gst_base_sink_adjust_time (basesink, rstart);
2556
2557   /* adjust for render-delay, avoid underflows */
2558   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2559     if (stime > priv->render_delay)
2560       stime -= priv->render_delay;
2561     else
2562       stime = 0;
2563   }
2564
2565   /* preroll done, we can sync since we are in PLAYING now. */
2566   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2567       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2568       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2569
2570   /* This function will return immediatly if start == -1, no clock
2571    * or sync is disabled with GST_CLOCK_BADTIME. */
2572   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2573
2574   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2575       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2576
2577   /* invalid time, no clock or sync disabled, just render */
2578   if (status == GST_CLOCK_BADTIME)
2579     goto done;
2580
2581   /* waiting could have been interrupted and we can be flushing now */
2582   if (G_UNLIKELY (basesink->flushing))
2583     goto flushing;
2584
2585   /* check for unlocked by a state change, we are not flushing so
2586    * we can try to preroll on the current buffer. */
2587   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2588     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2589     priv->call_preroll = TRUE;
2590     goto again;
2591   }
2592
2593   /* successful syncing done, record observation */
2594   priv->current_jitter = jitter;
2595
2596   /* check if the object should be dropped */
2597   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2598       status, jitter);
2599
2600 done:
2601   return GST_FLOW_OK;
2602
2603   /* ERRORS */
2604 step_skipped:
2605   {
2606     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2607     *late = TRUE;
2608     return GST_FLOW_OK;
2609   }
2610 not_syncable:
2611   {
2612     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2613     return GST_FLOW_OK;
2614   }
2615 qos_dropped:
2616   {
2617     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2618     *late = TRUE;
2619     return GST_FLOW_OK;
2620   }
2621 flushing:
2622   {
2623     GST_DEBUG_OBJECT (basesink, "we are flushing");
2624     return GST_FLOW_WRONG_STATE;
2625   }
2626 preroll_failed:
2627   {
2628     GST_DEBUG_OBJECT (basesink, "preroll failed");
2629     *step_end = FALSE;
2630     return ret;
2631   }
2632 }
2633
2634 static gboolean
2635 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2636     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2637 {
2638   GstEvent *event;
2639   gboolean res;
2640
2641   /* generate Quality-of-Service event */
2642   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2643       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2644       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2645
2646   event = gst_event_new_qos_full (type, proportion, diff, time);
2647
2648   /* send upstream */
2649   res = gst_pad_push_event (basesink->sinkpad, event);
2650
2651   return res;
2652 }
2653
2654 static void
2655 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2656 {
2657   GstBaseSinkPrivate *priv;
2658   GstClockTime start, stop;
2659   GstClockTimeDiff jitter;
2660   GstClockTime pt, entered, left;
2661   GstClockTime duration;
2662   gdouble rate;
2663
2664   priv = sink->priv;
2665
2666   start = priv->current_rstart;
2667
2668   if (priv->current_step.valid)
2669     return;
2670
2671   /* if Quality-of-Service disabled, do nothing */
2672   if (!g_atomic_int_get (&priv->qos_enabled) ||
2673       !GST_CLOCK_TIME_IS_VALID (start))
2674     return;
2675
2676   stop = priv->current_rstop;
2677   jitter = priv->current_jitter;
2678
2679   if (jitter < 0) {
2680     /* this is the time the buffer entered the sink */
2681     if (start < -jitter)
2682       entered = 0;
2683     else
2684       entered = start + jitter;
2685     left = start;
2686   } else {
2687     /* this is the time the buffer entered the sink */
2688     entered = start + jitter;
2689     /* this is the time the buffer left the sink */
2690     left = start + jitter;
2691   }
2692
2693   /* calculate duration of the buffer */
2694   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2695     duration = stop - start;
2696   else
2697     duration = priv->avg_in_diff;
2698
2699   /* if we have the time when the last buffer left us, calculate
2700    * processing time */
2701   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2702     if (entered > priv->last_left) {
2703       pt = entered - priv->last_left;
2704     } else {
2705       pt = 0;
2706     }
2707   } else {
2708     pt = priv->avg_pt;
2709   }
2710
2711   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2712       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2713       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2714       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2715       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2716       GST_TIME_ARGS (duration), jitter);
2717
2718   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2719       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2720       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2721       priv->avg_rate);
2722
2723   /* collect running averages. for first observations, we copy the
2724    * values */
2725   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2726     priv->avg_duration = duration;
2727   else
2728     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2729
2730   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2731     priv->avg_pt = pt;
2732   else
2733     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2734
2735   if (priv->avg_duration != 0)
2736     rate =
2737         gst_guint64_to_gdouble (priv->avg_pt) /
2738         gst_guint64_to_gdouble (priv->avg_duration);
2739   else
2740     rate = 1.0;
2741
2742   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2743     if (dropped || priv->avg_rate < 0.0) {
2744       priv->avg_rate = rate;
2745     } else {
2746       if (rate > 1.0)
2747         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2748       else
2749         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2750     }
2751   }
2752
2753   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2754       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2755       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2756       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2757
2758
2759   if (priv->avg_rate >= 0.0) {
2760     GstQOSType type;
2761     GstClockTimeDiff diff;
2762
2763     /* if we have a valid rate, start sending QoS messages */
2764     if (priv->current_jitter < 0) {
2765       /* make sure we never go below 0 when adding the jitter to the
2766        * timestamp. */
2767       if (priv->current_rstart < -priv->current_jitter)
2768         priv->current_jitter = -priv->current_rstart;
2769     }
2770
2771     if (priv->throttle_time > 0) {
2772       diff = priv->throttle_time;
2773       type = GST_QOS_TYPE_THROTTLE;
2774     } else {
2775       diff = priv->current_jitter;
2776       if (diff <= 0)
2777         type = GST_QOS_TYPE_OVERFLOW;
2778       else
2779         type = GST_QOS_TYPE_UNDERFLOW;
2780     }
2781
2782     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2783         diff);
2784   }
2785
2786   /* record when this buffer will leave us */
2787   priv->last_left = left;
2788 }
2789
2790 /* reset all qos measuring */
2791 static void
2792 gst_base_sink_reset_qos (GstBaseSink * sink)
2793 {
2794   GstBaseSinkPrivate *priv;
2795
2796   priv = sink->priv;
2797
2798   priv->last_render_time = GST_CLOCK_TIME_NONE;
2799   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2800   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2801   priv->last_left = GST_CLOCK_TIME_NONE;
2802   priv->avg_duration = GST_CLOCK_TIME_NONE;
2803   priv->avg_pt = GST_CLOCK_TIME_NONE;
2804   priv->avg_rate = -1.0;
2805   priv->avg_render = GST_CLOCK_TIME_NONE;
2806   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2807   priv->rendered = 0;
2808   priv->dropped = 0;
2809
2810 }
2811
2812 /* Checks if the object was scheduled too late.
2813  *
2814  * rstart/rstop contain the running_time start and stop values
2815  * of the object.
2816  *
2817  * status and jitter contain the return values from the clock wait.
2818  *
2819  * returns TRUE if the buffer was too late.
2820  */
2821 static gboolean
2822 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2823     GstClockTime rstart, GstClockTime rstop,
2824     GstClockReturn status, GstClockTimeDiff jitter)
2825 {
2826   gboolean late;
2827   gint64 max_lateness;
2828   GstBaseSinkPrivate *priv;
2829
2830   priv = basesink->priv;
2831
2832   late = FALSE;
2833
2834   /* only for objects that were too late */
2835   if (G_LIKELY (status != GST_CLOCK_EARLY))
2836     goto in_time;
2837
2838   max_lateness = basesink->abidata.ABI.max_lateness;
2839
2840   /* check if frame dropping is enabled */
2841   if (max_lateness == -1)
2842     goto no_drop;
2843
2844   /* only check for buffers */
2845   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2846     goto not_buffer;
2847
2848   /* can't do check if we don't have a timestamp */
2849   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2850     goto no_timestamp;
2851
2852   /* we can add a valid stop time */
2853   if (GST_CLOCK_TIME_IS_VALID (rstop))
2854     max_lateness += rstop;
2855   else {
2856     max_lateness += rstart;
2857     /* no stop time, use avg frame diff */
2858     if (priv->avg_in_diff != -1)
2859       max_lateness += priv->avg_in_diff;
2860   }
2861
2862   /* if the jitter bigger than duration and lateness we are too late */
2863   if ((late = rstart + jitter > max_lateness)) {
2864     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2865         "buffer is too late %" GST_TIME_FORMAT
2866         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2867         GST_TIME_ARGS (max_lateness));
2868     /* !!emergency!!, if we did not receive anything valid for more than a
2869      * second, render it anyway so the user sees something */
2870     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2871         rstart - priv->last_render_time > GST_SECOND) {
2872       late = FALSE;
2873       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2874           (_("A lot of buffers are being dropped.")),
2875           ("There may be a timestamping problem, or this computer is too slow."));
2876       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2877           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2878           GST_TIME_ARGS (priv->last_render_time));
2879     }
2880   }
2881
2882 done:
2883   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2884     priv->last_render_time = rstart;
2885     /* the next allowed input timestamp */
2886     if (priv->throttle_time > 0)
2887       priv->earliest_in_time = rstart + priv->throttle_time;
2888   }
2889   return late;
2890
2891   /* all is fine */
2892 in_time:
2893   {
2894     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2895     goto done;
2896   }
2897 no_drop:
2898   {
2899     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2900     goto done;
2901   }
2902 not_buffer:
2903   {
2904     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2905     return FALSE;
2906   }
2907 no_timestamp:
2908   {
2909     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2910     return FALSE;
2911   }
2912 }
2913
2914 /* called before and after calling the render vmethod. It keeps track of how
2915  * much time was spent in the render method and is used to check if we are
2916  * flooded */
2917 static void
2918 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2919 {
2920   GstBaseSinkPrivate *priv;
2921
2922   priv = basesink->priv;
2923
2924   if (start) {
2925     priv->start = gst_util_get_timestamp ();
2926   } else {
2927     GstClockTime elapsed;
2928
2929     priv->stop = gst_util_get_timestamp ();
2930
2931     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2932
2933     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2934       priv->avg_render = elapsed;
2935     else
2936       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2937
2938     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2939         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2940   }
2941 }
2942
2943 /* with STREAM_LOCK, PREROLL_LOCK,
2944  *
2945  * Synchronize the object on the clock and then render it.
2946  *
2947  * takes ownership of obj.
2948  */
2949 static GstFlowReturn
2950 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2951     guint8 obj_type, gpointer obj)
2952 {
2953   GstFlowReturn ret;
2954   GstBaseSinkClass *bclass;
2955   gboolean late, step_end;
2956   gpointer sync_obj;
2957   GstBaseSinkPrivate *priv;
2958
2959   priv = basesink->priv;
2960
2961   if (OBJ_IS_BUFFERLIST (obj_type)) {
2962     /*
2963      * If buffer list, use the first group buffer within the list
2964      * for syncing
2965      */
2966     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2967     g_assert (NULL != sync_obj);
2968   } else {
2969     sync_obj = obj;
2970   }
2971
2972 again:
2973   late = FALSE;
2974   step_end = FALSE;
2975
2976   /* synchronize this object, non syncable objects return OK
2977    * immediatly. */
2978   ret =
2979       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2980       obj_type);
2981   if (G_UNLIKELY (ret != GST_FLOW_OK))
2982     goto sync_failed;
2983
2984   /* and now render, event or buffer/buffer list. */
2985   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2986     /* drop late buffers unconditionally, let's hope it's unlikely */
2987     if (G_UNLIKELY (late))
2988       goto dropped;
2989
2990     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2991
2992     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2993             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2994       gint do_qos;
2995
2996       /* read once, to get same value before and after */
2997       do_qos = g_atomic_int_get (&priv->qos_enabled);
2998
2999       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
3000
3001       /* record rendering time for QoS and stats */
3002       if (do_qos)
3003         gst_base_sink_do_render_stats (basesink, TRUE);
3004
3005       if (!OBJ_IS_BUFFERLIST (obj_type)) {
3006         GstBuffer *buf;
3007
3008         /* For buffer lists do not set last buffer. Creating buffer
3009          * with meaningful data can be done only with memcpy which will
3010          * significantly affect performance */
3011         buf = GST_BUFFER_CAST (obj);
3012         gst_base_sink_set_last_buffer (basesink, buf);
3013
3014         ret = bclass->render (basesink, buf);
3015       } else {
3016         GstBufferList *buflist;
3017
3018         buflist = GST_BUFFER_LIST_CAST (obj);
3019
3020         ret = bclass->render_list (basesink, buflist);
3021       }
3022
3023       if (do_qos)
3024         gst_base_sink_do_render_stats (basesink, FALSE);
3025
3026       if (ret == GST_FLOW_STEP)
3027         goto again;
3028
3029       if (G_UNLIKELY (basesink->flushing))
3030         goto flushing;
3031
3032       priv->rendered++;
3033     }
3034   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3035     GstEvent *event = GST_EVENT_CAST (obj);
3036     gboolean event_res = TRUE;
3037     GstEventType type;
3038
3039     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3040
3041     type = GST_EVENT_TYPE (event);
3042
3043     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3044         gst_event_type_get_name (type));
3045
3046     if (bclass->event)
3047       event_res = bclass->event (basesink, event);
3048
3049     /* when we get here we could be flushing again when the event handler calls
3050      * _wait_eos(). We have to ignore this object in that case. */
3051     if (G_UNLIKELY (basesink->flushing))
3052       goto flushing;
3053
3054     if (G_LIKELY (event_res)) {
3055       guint32 seqnum;
3056
3057       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3058       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3059
3060       switch (type) {
3061         case GST_EVENT_EOS:
3062         {
3063           GstMessage *message;
3064
3065           /* the EOS event is completely handled so we mark
3066            * ourselves as being in the EOS state. eos is also
3067            * protected by the object lock so we can read it when
3068            * answering the POSITION query. */
3069           GST_OBJECT_LOCK (basesink);
3070           basesink->eos = TRUE;
3071           GST_OBJECT_UNLOCK (basesink);
3072
3073           /* ok, now we can post the message */
3074           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3075
3076           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3077           gst_message_set_seqnum (message, seqnum);
3078           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3079           break;
3080         }
3081         case GST_EVENT_NEWSEGMENT:
3082           /* configure the segment */
3083           gst_base_sink_configure_segment (basesink, pad, event,
3084               &basesink->segment);
3085           break;
3086         case GST_EVENT_SINK_MESSAGE:{
3087           GstMessage *msg = NULL;
3088
3089           gst_event_parse_sink_message (event, &msg);
3090
3091           if (msg)
3092             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3093         }
3094         default:
3095           break;
3096       }
3097     }
3098   } else {
3099     g_return_val_if_reached (GST_FLOW_ERROR);
3100   }
3101
3102 done:
3103   if (step_end) {
3104     /* the step ended, check if we need to activate a new step */
3105     GST_DEBUG_OBJECT (basesink, "step ended");
3106     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3107         priv->current_rstart, priv->current_rstop, basesink->eos);
3108     goto again;
3109   }
3110
3111   gst_base_sink_perform_qos (basesink, late);
3112
3113   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3114   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3115   return ret;
3116
3117   /* ERRORS */
3118 sync_failed:
3119   {
3120     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3121     goto done;
3122   }
3123 dropped:
3124   {
3125     priv->dropped++;
3126     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3127
3128     if (g_atomic_int_get (&priv->qos_enabled)) {
3129       GstMessage *qos_msg;
3130       GstClockTime timestamp, duration;
3131
3132       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3133       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3134
3135       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3136           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3137           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3138           GST_TIME_ARGS (priv->current_rstart),
3139           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3140           GST_TIME_ARGS (duration));
3141       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3142           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3143           priv->rendered, priv->dropped);
3144
3145       qos_msg =
3146           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3147           priv->current_rstart, priv->current_sstart, timestamp, duration);
3148       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3149           1000000);
3150       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3151           priv->dropped);
3152       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3153     }
3154     goto done;
3155   }
3156 flushing:
3157   {
3158     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3159     gst_mini_object_unref (obj);
3160     return GST_FLOW_WRONG_STATE;
3161   }
3162 }
3163
3164 /* with STREAM_LOCK, PREROLL_LOCK
3165  *
3166  * Perform preroll on the given object. For buffers this means
3167  * calling the preroll subclass method.
3168  * If that succeeds, the state will be commited.
3169  *
3170  * function does not take ownership of obj.
3171  */
3172 static GstFlowReturn
3173 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3174     GstMiniObject * obj)
3175 {
3176   GstFlowReturn ret;
3177
3178   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3179
3180   /* if it's a buffer, we need to call the preroll method */
3181   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3182     GstBaseSinkClass *bclass;
3183     GstBuffer *buf;
3184     GstClockTime timestamp;
3185
3186     if (OBJ_IS_BUFFERLIST (obj_type)) {
3187       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3188       g_assert (NULL != buf);
3189     } else {
3190       buf = GST_BUFFER_CAST (obj);
3191     }
3192
3193     timestamp = GST_BUFFER_TIMESTAMP (buf);
3194
3195     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3196         GST_TIME_ARGS (timestamp));
3197
3198     /*
3199      * For buffer lists do not set last buffer. Creating buffer
3200      * with meaningful data can be done only with memcpy which will
3201      * significantly affect performance
3202      */
3203     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3204       gst_base_sink_set_last_buffer (basesink, buf);
3205     }
3206
3207     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3208     if (bclass->preroll)
3209       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3210         goto preroll_failed;
3211
3212     basesink->priv->call_preroll = FALSE;
3213   }
3214
3215   /* commit state */
3216   if (G_LIKELY (basesink->playing_async)) {
3217     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3218       goto stopping;
3219   }
3220
3221   return GST_FLOW_OK;
3222
3223   /* ERRORS */
3224 preroll_failed:
3225   {
3226     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3227     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3228     return ret;
3229   }
3230 stopping:
3231   {
3232     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3233     return GST_FLOW_WRONG_STATE;
3234   }
3235 }
3236
3237 /* with STREAM_LOCK, PREROLL_LOCK
3238  *
3239  * Queue an object for rendering.
3240  * The first prerollable object queued will complete the preroll. If the
3241  * preroll queue is filled, we render all the objects in the queue.
3242  *
3243  * This function takes ownership of the object.
3244  */
3245 static GstFlowReturn
3246 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3247     guint8 obj_type, gpointer obj, gboolean prerollable)
3248 {
3249   GstFlowReturn ret = GST_FLOW_OK;
3250   gint length;
3251   GQueue *q;
3252
3253   if (G_UNLIKELY (basesink->need_preroll)) {
3254     if (G_LIKELY (prerollable))
3255       basesink->preroll_queued++;
3256
3257     length = basesink->preroll_queued;
3258
3259     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3260
3261     /* first prerollable item needs to finish the preroll */
3262     if (length == 1) {
3263       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3264       if (G_UNLIKELY (ret != GST_FLOW_OK))
3265         goto preroll_failed;
3266     }
3267     /* need to recheck if we need preroll, commmit state during preroll
3268      * could have made us not need more preroll. */
3269     if (G_UNLIKELY (basesink->need_preroll)) {
3270       /* see if we can render now, if we can't add the object to the preroll
3271        * queue. */
3272       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3273         goto more_preroll;
3274     }
3275   }
3276   /* we can start rendering (or blocking) the queued object
3277    * if any. */
3278   q = basesink->preroll_queue;
3279   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3280     GstMiniObject *o;
3281     guint8 ot;
3282
3283     o = g_queue_pop_head (q);
3284     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3285
3286     ot = get_object_type (o);
3287
3288     /* do something with the return value */
3289     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3290     if (ret != GST_FLOW_OK)
3291       goto dequeue_failed;
3292   }
3293
3294   /* now render the object */
3295   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3296   basesink->preroll_queued = 0;
3297
3298   return ret;
3299
3300   /* special cases */
3301 preroll_failed:
3302   {
3303     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3304         gst_flow_get_name (ret));
3305     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3306     return ret;
3307   }
3308 more_preroll:
3309   {
3310     /* add object to the queue and return */
3311     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3312         length, basesink->preroll_queue_max_len);
3313     g_queue_push_tail (basesink->preroll_queue, obj);
3314     return GST_FLOW_OK;
3315   }
3316 dequeue_failed:
3317   {
3318     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3319         gst_flow_get_name (ret));
3320     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3321     return ret;
3322   }
3323 }
3324
3325 /* with STREAM_LOCK
3326  *
3327  * This function grabs the PREROLL_LOCK and adds the object to
3328  * the queue.
3329  *
3330  * This function takes ownership of obj.
3331  *
3332  * Note: Only GstEvent seem to be passed to this private method
3333  */
3334 static GstFlowReturn
3335 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3336     GstMiniObject * obj, gboolean prerollable)
3337 {
3338   GstFlowReturn ret;
3339
3340   GST_PAD_PREROLL_LOCK (pad);
3341   if (G_UNLIKELY (basesink->flushing))
3342     goto flushing;
3343
3344   if (G_UNLIKELY (basesink->priv->received_eos))
3345     goto was_eos;
3346
3347   ret =
3348       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3349       prerollable);
3350   GST_PAD_PREROLL_UNLOCK (pad);
3351
3352   return ret;
3353
3354   /* ERRORS */
3355 flushing:
3356   {
3357     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3358     GST_PAD_PREROLL_UNLOCK (pad);
3359     gst_mini_object_unref (obj);
3360     return GST_FLOW_WRONG_STATE;
3361   }
3362 was_eos:
3363   {
3364     GST_DEBUG_OBJECT (basesink,
3365         "we are EOS, dropping object, return UNEXPECTED");
3366     GST_PAD_PREROLL_UNLOCK (pad);
3367     gst_mini_object_unref (obj);
3368     return GST_FLOW_UNEXPECTED;
3369   }
3370 }
3371
3372 static void
3373 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3374 {
3375   /* make sure we are not blocked on the clock also clear any pending
3376    * eos state. */
3377   gst_base_sink_set_flushing (basesink, pad, TRUE);
3378
3379   /* we grab the stream lock but that is not needed since setting the
3380    * sink to flushing would make sure no state commit is being done
3381    * anymore */
3382   GST_PAD_STREAM_LOCK (pad);
3383   gst_base_sink_reset_qos (basesink);
3384   /* and we need to commit our state again on the next
3385    * prerolled buffer */
3386   basesink->playing_async = TRUE;
3387   if (basesink->priv->async_enabled) {
3388     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3389   } else {
3390     basesink->priv->have_latency = TRUE;
3391   }
3392   gst_base_sink_set_last_buffer (basesink, NULL);
3393   GST_PAD_STREAM_UNLOCK (pad);
3394 }
3395
3396 static void
3397 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3398 {
3399   /* unset flushing so we can accept new data, this also flushes out any EOS
3400    * event. */
3401   gst_base_sink_set_flushing (basesink, pad, FALSE);
3402
3403   /* for position reporting */
3404   GST_OBJECT_LOCK (basesink);
3405   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3406   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3407   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3408   basesink->priv->call_preroll = TRUE;
3409   basesink->priv->current_step.valid = FALSE;
3410   basesink->priv->pending_step.valid = FALSE;
3411   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3412     /* we need new segment info after the flush. */
3413     basesink->have_newsegment = FALSE;
3414     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3415     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3416   }
3417   GST_OBJECT_UNLOCK (basesink);
3418 }
3419
3420 static gboolean
3421 gst_base_sink_event (GstPad * pad, GstEvent * event)
3422 {
3423   GstBaseSink *basesink;
3424   gboolean result = TRUE;
3425   GstBaseSinkClass *bclass;
3426
3427   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3428   if (G_UNLIKELY (basesink == NULL)) {
3429     gst_event_unref (event);
3430     return FALSE;
3431   }
3432
3433   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3434
3435   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3436       event);
3437
3438   switch (GST_EVENT_TYPE (event)) {
3439     case GST_EVENT_EOS:
3440     {
3441       GstFlowReturn ret;
3442
3443       GST_PAD_PREROLL_LOCK (pad);
3444       if (G_UNLIKELY (basesink->flushing))
3445         goto flushing;
3446
3447       if (G_UNLIKELY (basesink->priv->received_eos)) {
3448         /* we can't accept anything when we are EOS */
3449         result = FALSE;
3450         gst_event_unref (event);
3451       } else {
3452         /* we set the received EOS flag here so that we can use it when testing if
3453          * we are prerolled and to refuse more buffers. */
3454         basesink->priv->received_eos = TRUE;
3455
3456         /* EOS is a prerollable object, we call the unlocked version because it
3457          * does not check the received_eos flag. */
3458         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3459             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3460         if (G_UNLIKELY (ret != GST_FLOW_OK))
3461           result = FALSE;
3462       }
3463       GST_PAD_PREROLL_UNLOCK (pad);
3464       break;
3465     }
3466     case GST_EVENT_NEWSEGMENT:
3467     {
3468       GstFlowReturn ret;
3469       gboolean update;
3470
3471       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3472
3473       GST_PAD_PREROLL_LOCK (pad);
3474       if (G_UNLIKELY (basesink->flushing))
3475         goto flushing;
3476
3477       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3478           NULL, NULL);
3479
3480       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3481         /* we can't accept anything when we are EOS */
3482         result = FALSE;
3483         gst_event_unref (event);
3484       } else {
3485         /* the new segment is a non prerollable item and does not block anything,
3486          * we need to configure the current clipping segment and insert the event
3487          * in the queue to serialize it with the buffers for rendering. */
3488         gst_base_sink_configure_segment (basesink, pad, event,
3489             basesink->abidata.ABI.clip_segment);
3490
3491         ret =
3492             gst_base_sink_queue_object_unlocked (basesink, pad,
3493             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3494         if (G_UNLIKELY (ret != GST_FLOW_OK))
3495           result = FALSE;
3496         else {
3497           GST_OBJECT_LOCK (basesink);
3498           basesink->have_newsegment = TRUE;
3499           GST_OBJECT_UNLOCK (basesink);
3500         }
3501       }
3502       GST_PAD_PREROLL_UNLOCK (pad);
3503       break;
3504     }
3505     case GST_EVENT_FLUSH_START:
3506       if (bclass->event)
3507         bclass->event (basesink, event);
3508
3509       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3510
3511       gst_base_sink_flush_start (basesink, pad);
3512
3513       gst_event_unref (event);
3514       break;
3515     case GST_EVENT_FLUSH_STOP:
3516       if (bclass->event)
3517         bclass->event (basesink, event);
3518
3519       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3520
3521       gst_base_sink_flush_stop (basesink, pad);
3522
3523       gst_event_unref (event);
3524       break;
3525     default:
3526       /* other events are sent to queue or subclass depending on if they
3527        * are serialized. */
3528       if (GST_EVENT_IS_SERIALIZED (event)) {
3529         gst_base_sink_queue_object (basesink, pad,
3530             GST_MINI_OBJECT_CAST (event), FALSE);
3531       } else {
3532         if (bclass->event)
3533           bclass->event (basesink, event);
3534         gst_event_unref (event);
3535       }
3536       break;
3537   }
3538 done:
3539   gst_object_unref (basesink);
3540
3541   return result;
3542
3543   /* ERRORS */
3544 flushing:
3545   {
3546     GST_DEBUG_OBJECT (basesink, "we are flushing");
3547     GST_PAD_PREROLL_UNLOCK (pad);
3548     result = FALSE;
3549     gst_event_unref (event);
3550     goto done;
3551   }
3552 }
3553
3554 /* default implementation to calculate the start and end
3555  * timestamps on a buffer, subclasses can override
3556  */
3557 static void
3558 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3559     GstClockTime * start, GstClockTime * end)
3560 {
3561   GstClockTime timestamp, duration;
3562
3563   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3564   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3565
3566     /* get duration to calculate end time */
3567     duration = GST_BUFFER_DURATION (buffer);
3568     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3569       *end = timestamp + duration;
3570     }
3571     *start = timestamp;
3572   }
3573 }
3574
3575 /* must be called with PREROLL_LOCK */
3576 static gboolean
3577 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3578 {
3579   gboolean is_prerolled, res;
3580
3581   /* we have 2 cases where the PREROLL_LOCK is released:
3582    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3583    *  2) we are syncing on the clock
3584    */
3585   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3586   res = !is_prerolled;
3587
3588   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3589       basesink->have_preroll, basesink->priv->received_eos, res);
3590
3591   return res;
3592 }
3593
3594 /* with STREAM_LOCK, PREROLL_LOCK
3595  *
3596  * Takes a buffer and compare the timestamps with the last segment.
3597  * If the buffer falls outside of the segment boundaries, drop it.
3598  * Else queue the buffer for preroll and rendering.
3599  *
3600  * This function takes ownership of the buffer.
3601  */
3602 static GstFlowReturn
3603 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3604     guint8 obj_type, gpointer obj)
3605 {
3606   GstBaseSinkClass *bclass;
3607   GstFlowReturn result;
3608   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3609   GstSegment *clip_segment;
3610   GstBuffer *time_buf;
3611
3612   if (G_UNLIKELY (basesink->flushing))
3613     goto flushing;
3614
3615   if (G_UNLIKELY (basesink->priv->received_eos))
3616     goto was_eos;
3617
3618   if (OBJ_IS_BUFFERLIST (obj_type)) {
3619     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3620     g_assert (NULL != time_buf);
3621   } else {
3622     time_buf = GST_BUFFER_CAST (obj);
3623   }
3624
3625   /* for code clarity */
3626   clip_segment = basesink->abidata.ABI.clip_segment;
3627
3628   if (G_UNLIKELY (!basesink->have_newsegment)) {
3629     gboolean sync;
3630
3631     sync = gst_base_sink_get_sync (basesink);
3632     if (sync) {
3633       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3634           (_("Internal data flow problem.")),
3635           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3636     }
3637
3638     /* this means this sink will assume timestamps start from 0 */
3639     GST_OBJECT_LOCK (basesink);
3640     clip_segment->start = 0;
3641     clip_segment->stop = -1;
3642     basesink->segment.start = 0;
3643     basesink->segment.stop = -1;
3644     basesink->have_newsegment = TRUE;
3645     GST_OBJECT_UNLOCK (basesink);
3646   }
3647
3648   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3649
3650   /* check if the buffer needs to be dropped, we first ask the subclass for the
3651    * start and end */
3652   if (bclass->get_times)
3653     bclass->get_times (basesink, time_buf, &start, &end);
3654
3655   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3656     /* if the subclass does not want sync, we use our own values so that we at
3657      * least clip the buffer to the segment */
3658     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3659   }
3660
3661   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3662       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3663
3664   /* a dropped buffer does not participate in anything */
3665   if (GST_CLOCK_TIME_IS_VALID (start) &&
3666       (clip_segment->format == GST_FORMAT_TIME)) {
3667     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3668                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3669       goto out_of_segment;
3670   }
3671
3672   /* now we can process the buffer in the queue, this function takes ownership
3673    * of the buffer */
3674   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3675       obj_type, obj, TRUE);
3676   return result;
3677
3678   /* ERRORS */
3679 flushing:
3680   {
3681     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3682     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3683     return GST_FLOW_WRONG_STATE;
3684   }
3685 was_eos:
3686   {
3687     GST_DEBUG_OBJECT (basesink,
3688         "we are EOS, dropping object, return UNEXPECTED");
3689     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3690     return GST_FLOW_UNEXPECTED;
3691   }
3692 out_of_segment:
3693   {
3694     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3695     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3696     return GST_FLOW_OK;
3697   }
3698 }
3699
3700 /* with STREAM_LOCK
3701  */
3702 static GstFlowReturn
3703 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3704     guint8 obj_type, gpointer obj)
3705 {
3706   GstFlowReturn result;
3707
3708   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3709     goto wrong_mode;
3710
3711   GST_PAD_PREROLL_LOCK (pad);
3712   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3713   GST_PAD_PREROLL_UNLOCK (pad);
3714
3715 done:
3716   return result;
3717
3718   /* ERRORS */
3719 wrong_mode:
3720   {
3721     GST_OBJECT_LOCK (pad);
3722     GST_WARNING_OBJECT (basesink,
3723         "Push on pad %s:%s, but it was not activated in push mode",
3724         GST_DEBUG_PAD_NAME (pad));
3725     GST_OBJECT_UNLOCK (pad);
3726     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3727     /* we don't post an error message this will signal to the peer
3728      * pushing that EOS is reached. */
3729     result = GST_FLOW_UNEXPECTED;
3730     goto done;
3731   }
3732 }
3733
3734 static GstFlowReturn
3735 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3736 {
3737   GstBaseSink *basesink;
3738
3739   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3740
3741   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3742 }
3743
3744 static GstFlowReturn
3745 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3746 {
3747   GstBaseSink *basesink;
3748   GstBaseSinkClass *bclass;
3749   GstFlowReturn result;
3750
3751   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3752   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3753
3754   if (G_LIKELY (bclass->render_list)) {
3755     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3756   } else {
3757     GstBufferListIterator *it;
3758     GstBuffer *group;
3759
3760     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3761
3762     it = gst_buffer_list_iterate (list);
3763
3764     if (gst_buffer_list_iterator_next_group (it)) {
3765       do {
3766         group = gst_buffer_list_iterator_merge_group (it);
3767         if (group == NULL) {
3768           group = gst_buffer_new ();
3769           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3770         } else {
3771           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3772         }
3773         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3774       } while (result == GST_FLOW_OK
3775           && gst_buffer_list_iterator_next_group (it));
3776     } else {
3777       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3778       result =
3779           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3780           gst_buffer_new ());
3781     }
3782     gst_buffer_list_iterator_free (it);
3783     gst_buffer_list_unref (list);
3784   }
3785   return result;
3786 }
3787
3788
3789 static gboolean
3790 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3791 {
3792   gboolean res = TRUE;
3793
3794   /* update our offset if the start/stop position was updated */
3795   if (segment->format == GST_FORMAT_BYTES) {
3796     segment->time = segment->start;
3797   } else if (segment->start == 0) {
3798     /* seek to start, we can implement a default for this. */
3799     segment->time = 0;
3800   } else {
3801     res = FALSE;
3802     GST_INFO_OBJECT (sink, "Can't do a default seek");
3803   }
3804
3805   return res;
3806 }
3807
3808 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3809
3810 static gboolean
3811 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3812     GstEvent * event, GstSegment * segment)
3813 {
3814   /* By default, we try one of 2 things:
3815    *   - For absolute seek positions, convert the requested position to our
3816    *     configured processing format and place it in the output segment \
3817    *   - For relative seek positions, convert our current (input) values to the
3818    *     seek format, adjust by the relative seek offset and then convert back to
3819    *     the processing format
3820    */
3821   GstSeekType cur_type, stop_type;
3822   gint64 cur, stop;
3823   GstSeekFlags flags;
3824   GstFormat seek_format, dest_format;
3825   gdouble rate;
3826   gboolean update;
3827   gboolean res = TRUE;
3828
3829   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3830       &cur_type, &cur, &stop_type, &stop);
3831   dest_format = segment->format;
3832
3833   if (seek_format == dest_format) {
3834     gst_segment_set_seek (segment, rate, seek_format, flags,
3835         cur_type, cur, stop_type, stop, &update);
3836     return TRUE;
3837   }
3838
3839   if (cur_type != GST_SEEK_TYPE_NONE) {
3840     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3841     res =
3842         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3843         &cur);
3844     cur_type = GST_SEEK_TYPE_SET;
3845   }
3846
3847   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3848     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3849     res =
3850         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3851         &stop);
3852     stop_type = GST_SEEK_TYPE_SET;
3853   }
3854
3855   /* And finally, configure our output segment in the desired format */
3856   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3857       stop_type, stop, &update);
3858
3859   if (!res)
3860     goto no_format;
3861
3862   return res;
3863
3864 no_format:
3865   {
3866     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3867     return FALSE;
3868   }
3869 }
3870
3871 /* perform a seek, only executed in pull mode */
3872 static gboolean
3873 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3874 {
3875   gboolean flush;
3876   gdouble rate;
3877   GstFormat seek_format, dest_format;
3878   GstSeekFlags flags;
3879   GstSeekType cur_type, stop_type;
3880   gboolean seekseg_configured = FALSE;
3881   gint64 cur, stop;
3882   gboolean update, res = TRUE;
3883   GstSegment seeksegment;
3884
3885   dest_format = sink->segment.format;
3886
3887   if (event) {
3888     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3889     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3890         &cur_type, &cur, &stop_type, &stop);
3891
3892     flush = flags & GST_SEEK_FLAG_FLUSH;
3893   } else {
3894     GST_DEBUG_OBJECT (sink, "performing seek without event");
3895     flush = FALSE;
3896   }
3897
3898   if (flush) {
3899     GST_DEBUG_OBJECT (sink, "flushing upstream");
3900     gst_pad_push_event (pad, gst_event_new_flush_start ());
3901     gst_base_sink_flush_start (sink, pad);
3902   } else {
3903     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3904   }
3905
3906   GST_PAD_STREAM_LOCK (pad);
3907
3908   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3909    * copy the current segment info into the temp segment that we can actually
3910    * attempt the seek with. We only update the real segment if the seek suceeds. */
3911   if (!seekseg_configured) {
3912     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3913
3914     /* now configure the final seek segment */
3915     if (event) {
3916       if (sink->segment.format != seek_format) {
3917         /* OK, here's where we give the subclass a chance to convert the relative
3918          * seek into an absolute one in the processing format. We set up any
3919          * absolute seek above, before taking the stream lock. */
3920         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3921                 &seeksegment)) {
3922           GST_DEBUG_OBJECT (sink,
3923               "Preparing the seek failed after flushing. " "Aborting seek");
3924           res = FALSE;
3925         }
3926       } else {
3927         /* The seek format matches our processing format, no need to ask the
3928          * the subclass to configure the segment. */
3929         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3930             cur_type, cur, stop_type, stop, &update);
3931       }
3932     }
3933     /* Else, no seek event passed, so we're just (re)starting the
3934        current segment. */
3935   }
3936
3937   if (res) {
3938     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3939         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3940         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3941
3942     /* do the seek, segment.last_stop contains the new position. */
3943     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3944   }
3945
3946
3947   if (flush) {
3948     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3949     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3950     gst_base_sink_flush_stop (sink, pad);
3951   } else if (res && sink->abidata.ABI.running) {
3952     /* we are running the current segment and doing a non-flushing seek,
3953      * close the segment first based on the last_stop. */
3954     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3955         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3956   }
3957
3958   /* The subclass must have converted the segment to the processing format
3959    * by now */
3960   if (res && seeksegment.format != dest_format) {
3961     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3962         "in the correct format. Aborting seek.");
3963     res = FALSE;
3964   }
3965
3966   /* if successfull seek, we update our real segment and push
3967    * out the new segment. */
3968   if (res) {
3969     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3970
3971     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3972       gst_element_post_message (GST_ELEMENT (sink),
3973           gst_message_new_segment_start (GST_OBJECT (sink),
3974               sink->segment.format, sink->segment.last_stop));
3975     }
3976   }
3977
3978   sink->priv->discont = TRUE;
3979   sink->abidata.ABI.running = TRUE;
3980
3981   GST_PAD_STREAM_UNLOCK (pad);
3982
3983   return res;
3984 }
3985
3986 static void
3987 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3988     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3989     gboolean flush, gboolean intermediate)
3990 {
3991   GST_OBJECT_LOCK (sink);
3992   pending->seqnum = seqnum;
3993   pending->format = format;
3994   pending->amount = amount;
3995   pending->position = 0;
3996   pending->rate = rate;
3997   pending->flush = flush;
3998   pending->intermediate = intermediate;
3999   pending->valid = TRUE;
4000   /* flush invalidates the current stepping segment */
4001   if (flush)
4002     current->valid = FALSE;
4003   GST_OBJECT_UNLOCK (sink);
4004 }
4005
4006 static gboolean
4007 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
4008 {
4009   GstBaseSinkPrivate *priv;
4010   GstBaseSinkClass *bclass;
4011   gboolean flush, intermediate;
4012   gdouble rate;
4013   GstFormat format;
4014   guint64 amount;
4015   guint seqnum;
4016   GstStepInfo *pending, *current;
4017   GstMessage *message;
4018
4019   bclass = GST_BASE_SINK_GET_CLASS (sink);
4020   priv = sink->priv;
4021
4022   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
4023
4024   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
4025   seqnum = gst_event_get_seqnum (event);
4026
4027   pending = &priv->pending_step;
4028   current = &priv->current_step;
4029
4030   /* post message first */
4031   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4032       amount, rate, flush, intermediate);
4033   gst_message_set_seqnum (message, seqnum);
4034   gst_element_post_message (GST_ELEMENT (sink), message);
4035
4036   if (flush) {
4037     /* we need to call ::unlock before locking PREROLL_LOCK
4038      * since we lock it before going into ::render */
4039     if (bclass->unlock)
4040       bclass->unlock (sink);
4041
4042     GST_PAD_PREROLL_LOCK (sink->sinkpad);
4043     /* now that we have the PREROLL lock, clear our unlock request */
4044     if (bclass->unlock_stop)
4045       bclass->unlock_stop (sink);
4046
4047     /* update the stepinfo and make it valid */
4048     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4049         intermediate);
4050
4051     if (sink->priv->async_enabled) {
4052       /* and we need to commit our state again on the next
4053        * prerolled buffer */
4054       sink->playing_async = TRUE;
4055       priv->pending_step.need_preroll = TRUE;
4056       sink->need_preroll = FALSE;
4057       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4058     } else {
4059       sink->priv->have_latency = TRUE;
4060       sink->need_preroll = FALSE;
4061     }
4062     priv->current_sstart = GST_CLOCK_TIME_NONE;
4063     priv->current_sstop = GST_CLOCK_TIME_NONE;
4064     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4065     priv->call_preroll = TRUE;
4066     gst_base_sink_set_last_buffer (sink, NULL);
4067     gst_base_sink_reset_qos (sink);
4068
4069     if (sink->clock_id) {
4070       gst_clock_id_unschedule (sink->clock_id);
4071     }
4072
4073     if (sink->have_preroll) {
4074       GST_DEBUG_OBJECT (sink, "signal waiter");
4075       priv->step_unlock = TRUE;
4076       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4077     }
4078     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4079   } else {
4080     /* update the stepinfo and make it valid */
4081     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4082         intermediate);
4083   }
4084
4085   return TRUE;
4086 }
4087
4088 /* with STREAM_LOCK
4089  */
4090 static void
4091 gst_base_sink_loop (GstPad * pad)
4092 {
4093   GstBaseSink *basesink;
4094   GstBuffer *buf = NULL;
4095   GstFlowReturn result;
4096   guint blocksize;
4097   guint64 offset;
4098
4099   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4100
4101   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4102
4103   if ((blocksize = basesink->priv->blocksize) == 0)
4104     blocksize = -1;
4105
4106   offset = basesink->segment.last_stop;
4107
4108   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4109       offset, blocksize);
4110
4111   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4112   if (G_UNLIKELY (result != GST_FLOW_OK))
4113     goto paused;
4114
4115   if (G_UNLIKELY (buf == NULL))
4116     goto no_buffer;
4117
4118   offset += GST_BUFFER_SIZE (buf);
4119
4120   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4121
4122   GST_PAD_PREROLL_LOCK (pad);
4123   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4124   GST_PAD_PREROLL_UNLOCK (pad);
4125   if (G_UNLIKELY (result != GST_FLOW_OK))
4126     goto paused;
4127
4128   return;
4129
4130   /* ERRORS */
4131 paused:
4132   {
4133     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4134         gst_flow_get_name (result));
4135     gst_pad_pause_task (pad);
4136     if (result == GST_FLOW_UNEXPECTED) {
4137       /* perform EOS logic */
4138       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4139         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4140             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4141                 basesink->segment.format, basesink->segment.last_stop));
4142       } else {
4143         gst_base_sink_event (pad, gst_event_new_eos ());
4144       }
4145     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4146       /* for fatal errors we post an error message, post the error
4147        * first so the app knows about the error first. 
4148        * wrong-state is not a fatal error because it happens due to
4149        * flushing and posting an error message in that case is the
4150        * wrong thing to do, e.g. when basesrc is doing a flushing
4151        * seek. */
4152       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4153           (_("Internal data stream error.")),
4154           ("stream stopped, reason %s", gst_flow_get_name (result)));
4155       gst_base_sink_event (pad, gst_event_new_eos ());
4156     }
4157     return;
4158   }
4159 no_buffer:
4160   {
4161     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4162     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4163         (_("Internal data flow error.")), ("element returned NULL buffer"));
4164     result = GST_FLOW_ERROR;
4165     goto paused;
4166   }
4167 }
4168
4169 static gboolean
4170 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4171     gboolean flushing)
4172 {
4173   GstBaseSinkClass *bclass;
4174
4175   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4176
4177   if (flushing) {
4178     /* unlock any subclasses, we need to do this before grabbing the
4179      * PREROLL_LOCK since we hold this lock before going into ::render. */
4180     if (bclass->unlock)
4181       bclass->unlock (basesink);
4182   }
4183
4184   GST_PAD_PREROLL_LOCK (pad);
4185   basesink->flushing = flushing;
4186   if (flushing) {
4187     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4188     if (bclass->unlock_stop)
4189       bclass->unlock_stop (basesink);
4190
4191     /* set need_preroll before we unblock the clock. If the clock is unblocked
4192      * before timing out, we can reuse the buffer for preroll. */
4193     basesink->need_preroll = TRUE;
4194
4195     /* step 2, unblock clock sync (if any) or any other blocking thing */
4196     if (basesink->clock_id) {
4197       gst_clock_id_unschedule (basesink->clock_id);
4198     }
4199
4200     /* flush out the data thread if it's locked in finish_preroll, this will
4201      * also flush out the EOS state */
4202     GST_DEBUG_OBJECT (basesink,
4203         "flushing out data thread, need preroll to TRUE");
4204     gst_base_sink_preroll_queue_flush (basesink, pad);
4205   }
4206   GST_PAD_PREROLL_UNLOCK (pad);
4207
4208   return TRUE;
4209 }
4210
4211 static gboolean
4212 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4213 {
4214   gboolean result;
4215
4216   if (active) {
4217     /* start task */
4218     result = gst_pad_start_task (basesink->sinkpad,
4219         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4220   } else {
4221     /* step 2, make sure streaming finishes */
4222     result = gst_pad_stop_task (basesink->sinkpad);
4223   }
4224
4225   return result;
4226 }
4227
4228 static gboolean
4229 gst_base_sink_pad_activate (GstPad * pad)
4230 {
4231   gboolean result = FALSE;
4232   GstBaseSink *basesink;
4233
4234   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4235
4236   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4237
4238   gst_base_sink_set_flushing (basesink, pad, FALSE);
4239
4240   /* we need to have the pull mode enabled */
4241   if (!basesink->can_activate_pull) {
4242     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4243     goto fallback;
4244   }
4245
4246   /* check if downstreams supports pull mode at all */
4247   if (!gst_pad_check_pull_range (pad)) {
4248     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4249     goto fallback;
4250   }
4251
4252   /* set the pad mode before starting the task so that it's in the
4253    * correct state for the new thread. also the sink set_caps and get_caps
4254    * function checks this */
4255   basesink->pad_mode = GST_ACTIVATE_PULL;
4256
4257   /* we first try to negotiate a format so that when we try to activate
4258    * downstream, it knows about our format */
4259   if (!gst_base_sink_negotiate_pull (basesink)) {
4260     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4261     goto fallback;
4262   }
4263
4264   /* ok activate now */
4265   if (!gst_pad_activate_pull (pad, TRUE)) {
4266     /* clear any pending caps */
4267     GST_OBJECT_LOCK (basesink);
4268     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4269     GST_OBJECT_UNLOCK (basesink);
4270     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4271     goto fallback;
4272   }
4273
4274   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4275   result = TRUE;
4276   goto done;
4277
4278   /* push mode fallback */
4279 fallback:
4280   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4281   if ((result = gst_pad_activate_push (pad, TRUE))) {
4282     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4283   }
4284
4285 done:
4286   if (!result) {
4287     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4288     gst_base_sink_set_flushing (basesink, pad, TRUE);
4289   }
4290
4291   gst_object_unref (basesink);
4292
4293   return result;
4294 }
4295
4296 static gboolean
4297 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4298 {
4299   gboolean result;
4300   GstBaseSink *basesink;
4301
4302   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4303
4304   if (active) {
4305     if (!basesink->can_activate_push) {
4306       result = FALSE;
4307       basesink->pad_mode = GST_ACTIVATE_NONE;
4308     } else {
4309       result = TRUE;
4310       basesink->pad_mode = GST_ACTIVATE_PUSH;
4311     }
4312   } else {
4313     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4314       g_warning ("Internal GStreamer activation error!!!");
4315       result = FALSE;
4316     } else {
4317       gst_base_sink_set_flushing (basesink, pad, TRUE);
4318       result = TRUE;
4319       basesink->pad_mode = GST_ACTIVATE_NONE;
4320     }
4321   }
4322
4323   gst_object_unref (basesink);
4324
4325   return result;
4326 }
4327
4328 static gboolean
4329 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4330 {
4331   GstCaps *caps;
4332   gboolean result;
4333
4334   result = FALSE;
4335
4336   /* this returns the intersection between our caps and the peer caps. If there
4337    * is no peer, it returns NULL and we can't operate in pull mode so we can
4338    * fail the negotiation. */
4339   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4340   if (caps == NULL || gst_caps_is_empty (caps))
4341     goto no_caps_possible;
4342
4343   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4344
4345   caps = gst_caps_make_writable (caps);
4346   /* get the first (prefered) format */
4347   gst_caps_truncate (caps);
4348   /* try to fixate */
4349   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4350
4351   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4352
4353   if (gst_caps_is_any (caps)) {
4354     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4355         "allowing pull()");
4356     /* neither side has template caps in this case, so they are prepared for
4357        pull() without setcaps() */
4358     result = TRUE;
4359   } else if (gst_caps_is_fixed (caps)) {
4360     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4361       goto could_not_set_caps;
4362
4363     GST_OBJECT_LOCK (basesink);
4364     gst_caps_replace (&basesink->priv->pull_caps, caps);
4365     GST_OBJECT_UNLOCK (basesink);
4366
4367     result = TRUE;
4368   }
4369
4370   gst_caps_unref (caps);
4371
4372   return result;
4373
4374 no_caps_possible:
4375   {
4376     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4377     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4378     if (caps)
4379       gst_caps_unref (caps);
4380     return FALSE;
4381   }
4382 could_not_set_caps:
4383   {
4384     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4385     gst_caps_unref (caps);
4386     return FALSE;
4387   }
4388 }
4389
4390 /* this won't get called until we implement an activate function */
4391 static gboolean
4392 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4393 {
4394   gboolean result = FALSE;
4395   GstBaseSink *basesink;
4396   GstBaseSinkClass *bclass;
4397
4398   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4399   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4400
4401   if (active) {
4402     GstFormat format;
4403     gint64 duration;
4404
4405     /* we mark we have a newsegment here because pull based
4406      * mode works just fine without having a newsegment before the
4407      * first buffer */
4408     format = GST_FORMAT_BYTES;
4409
4410     gst_segment_init (&basesink->segment, format);
4411     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4412     GST_OBJECT_LOCK (basesink);
4413     basesink->have_newsegment = TRUE;
4414     GST_OBJECT_UNLOCK (basesink);
4415
4416     /* get the peer duration in bytes */
4417     result = gst_pad_query_peer_duration (pad, &format, &duration);
4418     if (result) {
4419       GST_DEBUG_OBJECT (basesink,
4420           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4421       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4422           duration);
4423       gst_segment_set_duration (&basesink->segment, format, duration);
4424     } else {
4425       GST_DEBUG_OBJECT (basesink, "unknown duration");
4426     }
4427
4428     if (bclass->activate_pull)
4429       result = bclass->activate_pull (basesink, TRUE);
4430     else
4431       result = FALSE;
4432
4433     if (!result)
4434       goto activate_failed;
4435
4436   } else {
4437     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4438       g_warning ("Internal GStreamer activation error!!!");
4439       result = FALSE;
4440     } else {
4441       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4442       if (bclass->activate_pull)
4443         result &= bclass->activate_pull (basesink, FALSE);
4444       basesink->pad_mode = GST_ACTIVATE_NONE;
4445       /* clear any pending caps */
4446       GST_OBJECT_LOCK (basesink);
4447       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4448       GST_OBJECT_UNLOCK (basesink);
4449     }
4450   }
4451   gst_object_unref (basesink);
4452
4453   return result;
4454
4455   /* ERRORS */
4456 activate_failed:
4457   {
4458     /* reset, as starting the thread failed */
4459     basesink->pad_mode = GST_ACTIVATE_NONE;
4460
4461     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4462     return FALSE;
4463   }
4464 }
4465
4466 /* send an event to our sinkpad peer. */
4467 static gboolean
4468 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4469 {
4470   GstPad *pad;
4471   GstBaseSink *basesink = GST_BASE_SINK (element);
4472   gboolean forward, result = TRUE;
4473   GstActivateMode mode;
4474
4475   GST_OBJECT_LOCK (element);
4476   /* get the pad and the scheduling mode */
4477   pad = gst_object_ref (basesink->sinkpad);
4478   mode = basesink->pad_mode;
4479   GST_OBJECT_UNLOCK (element);
4480
4481   /* only push UPSTREAM events upstream */
4482   forward = GST_EVENT_IS_UPSTREAM (event);
4483
4484   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4485       event);
4486
4487   switch (GST_EVENT_TYPE (event)) {
4488     case GST_EVENT_LATENCY:
4489     {
4490       GstClockTime latency;
4491
4492       gst_event_parse_latency (event, &latency);
4493
4494       /* store the latency. We use this to adjust the running_time before syncing
4495        * it to the clock. */
4496       GST_OBJECT_LOCK (element);
4497       basesink->priv->latency = latency;
4498       if (!basesink->priv->have_latency)
4499         forward = FALSE;
4500       GST_OBJECT_UNLOCK (element);
4501       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4502           GST_TIME_ARGS (latency));
4503
4504       /* We forward this event so that all elements know about the global pipeline
4505        * latency. This is interesting for an element when it wants to figure out
4506        * when a particular piece of data will be rendered. */
4507       break;
4508     }
4509     case GST_EVENT_SEEK:
4510       /* in pull mode we will execute the seek */
4511       if (mode == GST_ACTIVATE_PULL)
4512         result = gst_base_sink_perform_seek (basesink, pad, event);
4513       break;
4514     case GST_EVENT_STEP:
4515       result = gst_base_sink_perform_step (basesink, pad, event);
4516       forward = FALSE;
4517       break;
4518     default:
4519       break;
4520   }
4521
4522   if (forward) {
4523     result = gst_pad_push_event (pad, event);
4524   } else {
4525     /* not forwarded, unref the event */
4526     gst_event_unref (event);
4527   }
4528
4529   gst_object_unref (pad);
4530   return result;
4531 }
4532
4533 static gboolean
4534 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4535     gint64 * cur, gboolean * upstream)
4536 {
4537   GstClock *clock = NULL;
4538   gboolean res = FALSE;
4539   GstFormat oformat, tformat;
4540   GstSegment *segment;
4541   GstClockTime now, latency;
4542   GstClockTimeDiff base;
4543   gint64 time, accum, duration;
4544   gdouble rate;
4545   gint64 last;
4546   gboolean last_seen, with_clock, in_paused;
4547
4548   GST_OBJECT_LOCK (basesink);
4549   /* we can only get the segment when we are not NULL or READY */
4550   if (!basesink->have_newsegment)
4551     goto wrong_state;
4552
4553   in_paused = FALSE;
4554   /* when not in PLAYING or when we're busy with a state change, we
4555    * cannot read from the clock so we report time based on the
4556    * last seen timestamp. */
4557   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4558       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4559     in_paused = TRUE;
4560   }
4561
4562   /* we don't use the clip segment in pull mode, when seeking we update the
4563    * main segment directly with the new segment values without it having to be
4564    * activated by the rendering after preroll */
4565   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4566     segment = basesink->abidata.ABI.clip_segment;
4567   else
4568     segment = &basesink->segment;
4569
4570   /* our intermediate time format */
4571   tformat = GST_FORMAT_TIME;
4572   /* get the format in the segment */
4573   oformat = segment->format;
4574
4575   /* report with last seen position when EOS */
4576   last_seen = basesink->eos;
4577
4578   /* assume we will use the clock for getting the current position */
4579   with_clock = TRUE;
4580   if (basesink->sync == FALSE)
4581     with_clock = FALSE;
4582
4583   /* and we need a clock */
4584   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4585     with_clock = FALSE;
4586   else
4587     gst_object_ref (clock);
4588
4589   /* mainloop might be querying position when going to playing async,
4590    * while (audio) rendering might be quickly advancing stream position,
4591    * so use clock asap rather than last reported position */
4592   if (in_paused && with_clock && g_atomic_int_get (&basesink->priv->to_playing)) {
4593     GST_DEBUG_OBJECT (basesink, "going to PLAYING, so not PAUSED");
4594     in_paused = FALSE;
4595   }
4596
4597   /* collect all data we need holding the lock */
4598   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4599     time = segment->time;
4600   else
4601     time = 0;
4602
4603   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4604     duration = segment->stop - segment->start;
4605   else
4606     duration = 0;
4607
4608   accum = segment->accum;
4609   rate = segment->rate * segment->applied_rate;
4610   latency = basesink->priv->latency;
4611
4612   if (oformat == GST_FORMAT_TIME) {
4613     gint64 start, stop;
4614
4615     start = basesink->priv->current_sstart;
4616     stop = basesink->priv->current_sstop;
4617
4618     if (in_paused) {
4619       /* in paused we use the last position as a lower bound */
4620       if (stop == -1 || segment->rate > 0.0)
4621         last = start;
4622       else
4623         last = stop;
4624     } else {
4625       /* in playing, use last stop time as upper bound */
4626       if (start == -1 || segment->rate > 0.0)
4627         last = stop;
4628       else
4629         last = start;
4630     }
4631   } else {
4632     /* convert last stop to stream time */
4633     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4634   }
4635
4636   if (in_paused) {
4637     /* in paused, use start_time */
4638     base = GST_ELEMENT_START_TIME (basesink);
4639     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4640         GST_TIME_ARGS (base));
4641   } else if (with_clock) {
4642     /* else use clock when needed */
4643     base = GST_ELEMENT_CAST (basesink)->base_time;
4644     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4645         GST_TIME_ARGS (base));
4646   } else {
4647     /* else, no sync or clock -> no base time */
4648     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4649     base = -1;
4650   }
4651
4652   /* no base, we can't calculate running_time, use last seem timestamp to report
4653    * time */
4654   if (base == -1)
4655     last_seen = TRUE;
4656
4657   /* need to release the object lock before we can get the time,
4658    * a clock might take the LOCK of the provider, which could be
4659    * a basesink subclass. */
4660   GST_OBJECT_UNLOCK (basesink);
4661
4662   if (last_seen) {
4663     /* in EOS or when no valid stream_time, report the value of last seen
4664      * timestamp */
4665     if (last == -1) {
4666       /* no timestamp, we need to ask upstream */
4667       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4668       res = FALSE;
4669       *upstream = TRUE;
4670       goto done;
4671     }
4672     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4673         GST_TIME_ARGS (last));
4674     *cur = last;
4675   } else {
4676     if (oformat != tformat) {
4677       /* convert accum, time and duration to time */
4678       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4679               &accum))
4680         goto convert_failed;
4681       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4682               &tformat, &duration))
4683         goto convert_failed;
4684       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4685               &time))
4686         goto convert_failed;
4687       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4688               &last))
4689         goto convert_failed;
4690
4691       /* assume time format from now on */
4692       oformat = tformat;
4693     }
4694
4695     if (!in_paused && with_clock) {
4696       now = gst_clock_get_time (clock);
4697     } else {
4698       now = base;
4699       base = 0;
4700     }
4701
4702     /* subtract base time and accumulated time from the clock time.
4703      * Make sure we don't go negative. This is the current time in
4704      * the segment which we need to scale with the combined
4705      * rate and applied rate. */
4706     base += accum;
4707     base += latency;
4708     if (GST_CLOCK_DIFF (base, now) < 0)
4709       base = now;
4710
4711     /* for negative rates we need to count back from the segment
4712      * duration. */
4713     if (rate < 0.0)
4714       time += duration;
4715
4716     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4717
4718     if (in_paused) {
4719       /* never report less than segment values in paused */
4720       if (last != -1)
4721         *cur = MAX (last, *cur);
4722     } else {
4723       /* never report more than last seen position in playing */
4724       if (last != -1)
4725         *cur = MIN (last, *cur);
4726     }
4727
4728     GST_DEBUG_OBJECT (basesink,
4729         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4730         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4731         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4732         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4733   }
4734
4735   if (oformat != format) {
4736     /* convert to final format */
4737     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4738       goto convert_failed;
4739   }
4740
4741   res = TRUE;
4742
4743 done:
4744   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4745       res, GST_TIME_ARGS (*cur));
4746
4747   if (clock)
4748     gst_object_unref (clock);
4749
4750   return res;
4751
4752   /* special cases */
4753 wrong_state:
4754   {
4755     /* in NULL or READY we always return FALSE and -1 */
4756     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4757     res = FALSE;
4758     *cur = -1;
4759     GST_OBJECT_UNLOCK (basesink);
4760     goto done;
4761   }
4762 convert_failed:
4763   {
4764     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4765     *upstream = TRUE;
4766     res = FALSE;
4767     goto done;
4768   }
4769 }
4770
4771 static gboolean
4772 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4773     gint64 * dur, gboolean * upstream)
4774 {
4775   gboolean res = FALSE;
4776
4777   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4778     GstFormat uformat = GST_FORMAT_BYTES;
4779     gint64 uduration;
4780
4781     /* get the duration in bytes, in pull mode that's all we are sure to
4782      * know. We have to explicitly get this value from upstream instead of
4783      * using our cached value because it might change. Duration caching
4784      * should be done at a higher level. */
4785     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4786     if (res) {
4787       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4788       if (format != uformat) {
4789         /* convert to the requested format */
4790         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4791             &format, dur);
4792       } else {
4793         *dur = uduration;
4794       }
4795     }
4796     *upstream = FALSE;
4797   } else {
4798     *upstream = TRUE;
4799   }
4800
4801   return res;
4802 }
4803
4804 static const GstQueryType *
4805 gst_base_sink_get_query_types (GstElement * element)
4806 {
4807   static const GstQueryType query_types[] = {
4808     GST_QUERY_DURATION,
4809     GST_QUERY_POSITION,
4810     GST_QUERY_SEGMENT,
4811     GST_QUERY_LATENCY,
4812     0
4813   };
4814
4815   return query_types;
4816 }
4817
4818 static gboolean
4819 gst_base_sink_query (GstElement * element, GstQuery * query)
4820 {
4821   gboolean res = FALSE;
4822
4823   GstBaseSink *basesink = GST_BASE_SINK (element);
4824
4825   switch (GST_QUERY_TYPE (query)) {
4826     case GST_QUERY_POSITION:
4827     {
4828       gint64 cur = 0;
4829       GstFormat format;
4830       gboolean upstream = FALSE;
4831
4832       gst_query_parse_position (query, &format, NULL);
4833
4834       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4835           gst_format_get_name (format));
4836
4837       /* first try to get the position based on the clock */
4838       if ((res =
4839               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4840         gst_query_set_position (query, format, cur);
4841       } else if (upstream) {
4842         /* fallback to peer query */
4843         res = gst_pad_peer_query (basesink->sinkpad, query);
4844       }
4845       if (!res) {
4846         /* we can handle a few things if upstream failed */
4847         if (format == GST_FORMAT_PERCENT) {
4848           gint64 dur = 0;
4849           GstFormat uformat = GST_FORMAT_TIME;
4850
4851           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4852               &upstream);
4853           if (!res && upstream) {
4854             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4855                 &cur);
4856           }
4857           if (res) {
4858             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4859                 &upstream);
4860             if (!res && upstream) {
4861               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4862                   &dur);
4863             }
4864           }
4865           if (res) {
4866             gint64 pos;
4867
4868             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4869                 dur);
4870             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4871           }
4872         }
4873       }
4874       break;
4875     }
4876     case GST_QUERY_DURATION:
4877     {
4878       gint64 dur = 0;
4879       GstFormat format;
4880       gboolean upstream = FALSE;
4881
4882       gst_query_parse_duration (query, &format, NULL);
4883
4884       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4885           gst_format_get_name (format));
4886
4887       if ((res =
4888               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4889         gst_query_set_duration (query, format, dur);
4890       } else if (upstream) {
4891         /* fallback to peer query */
4892         res = gst_pad_peer_query (basesink->sinkpad, query);
4893       }
4894       if (!res) {
4895         /* we can handle a few things if upstream failed */
4896         if (format == GST_FORMAT_PERCENT) {
4897           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4898               GST_FORMAT_PERCENT_MAX);
4899           res = TRUE;
4900         }
4901       }
4902       break;
4903     }
4904     case GST_QUERY_LATENCY:
4905     {
4906       gboolean live, us_live;
4907       GstClockTime min, max;
4908
4909       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4910                   &max))) {
4911         gst_query_set_latency (query, live, min, max);
4912       }
4913       break;
4914     }
4915     case GST_QUERY_JITTER:
4916       break;
4917     case GST_QUERY_RATE:
4918       /* gst_query_set_rate (query, basesink->segment_rate); */
4919       res = TRUE;
4920       break;
4921     case GST_QUERY_SEGMENT:
4922     {
4923       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4924         gst_query_set_segment (query, basesink->segment.rate,
4925             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4926         res = TRUE;
4927       } else {
4928         res = gst_pad_peer_query (basesink->sinkpad, query);
4929       }
4930       break;
4931     }
4932     case GST_QUERY_SEEKING:
4933     case GST_QUERY_CONVERT:
4934     case GST_QUERY_FORMATS:
4935     default:
4936       res = gst_pad_peer_query (basesink->sinkpad, query);
4937       break;
4938   }
4939   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4940       GST_QUERY_TYPE_NAME (query), res);
4941   return res;
4942 }
4943
4944 static GstStateChangeReturn
4945 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4946 {
4947   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4948   GstBaseSink *basesink = GST_BASE_SINK (element);
4949   GstBaseSinkClass *bclass;
4950   GstBaseSinkPrivate *priv;
4951
4952   priv = basesink->priv;
4953
4954   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4955
4956   switch (transition) {
4957     case GST_STATE_CHANGE_NULL_TO_READY:
4958       if (bclass->start)
4959         if (!bclass->start (basesink))
4960           goto start_failed;
4961       break;
4962     case GST_STATE_CHANGE_READY_TO_PAUSED:
4963       /* need to complete preroll before this state change completes, there
4964        * is no data flow in READY so we can safely assume we need to preroll. */
4965       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4966       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4967       basesink->have_newsegment = FALSE;
4968       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4969       gst_segment_init (basesink->abidata.ABI.clip_segment,
4970           GST_FORMAT_UNDEFINED);
4971       basesink->offset = 0;
4972       basesink->have_preroll = FALSE;
4973       priv->step_unlock = FALSE;
4974       basesink->need_preroll = TRUE;
4975       basesink->playing_async = TRUE;
4976       priv->current_sstart = GST_CLOCK_TIME_NONE;
4977       priv->current_sstop = GST_CLOCK_TIME_NONE;
4978       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4979       priv->latency = 0;
4980       basesink->eos = FALSE;
4981       priv->received_eos = FALSE;
4982       gst_base_sink_reset_qos (basesink);
4983       priv->commited = FALSE;
4984       priv->call_preroll = TRUE;
4985       priv->current_step.valid = FALSE;
4986       priv->pending_step.valid = FALSE;
4987       if (priv->async_enabled) {
4988         GST_DEBUG_OBJECT (basesink, "doing async state change");
4989         /* when async enabled, post async-start message and return ASYNC from
4990          * the state change function */
4991         ret = GST_STATE_CHANGE_ASYNC;
4992         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4993             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4994       } else {
4995         priv->have_latency = TRUE;
4996       }
4997       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4998       break;
4999     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5000       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5001       g_atomic_int_set (&basesink->priv->to_playing, TRUE);
5002       if (!gst_base_sink_needs_preroll (basesink)) {
5003         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
5004         /* no preroll needed anymore now. */
5005         basesink->playing_async = FALSE;
5006         basesink->need_preroll = FALSE;
5007         if (basesink->eos) {
5008           GstMessage *message;
5009
5010           /* need to post EOS message here */
5011           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
5012           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
5013           gst_message_set_seqnum (message, basesink->priv->seqnum);
5014           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
5015         } else {
5016           GST_DEBUG_OBJECT (basesink, "signal preroll");
5017           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
5018         }
5019       } else {
5020         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
5021         basesink->need_preroll = TRUE;
5022         basesink->playing_async = TRUE;
5023         priv->call_preroll = TRUE;
5024         priv->commited = FALSE;
5025         if (priv->async_enabled) {
5026           GST_DEBUG_OBJECT (basesink, "doing async state change");
5027           ret = GST_STATE_CHANGE_ASYNC;
5028           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5029               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5030         }
5031       }
5032       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5033       break;
5034     default:
5035       break;
5036   }
5037
5038   {
5039     GstStateChangeReturn bret;
5040
5041     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5042     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5043       goto activate_failed;
5044   }
5045
5046   switch (transition) {
5047     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5048       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5049       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5050       /* FIXME, make sure we cannot enter _render first */
5051
5052       /* we need to call ::unlock before locking PREROLL_LOCK
5053        * since we lock it before going into ::render */
5054       if (bclass->unlock)
5055         bclass->unlock (basesink);
5056
5057       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5058       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5059       /* now that we have the PREROLL lock, clear our unlock request */
5060       if (bclass->unlock_stop)
5061         bclass->unlock_stop (basesink);
5062
5063       /* we need preroll again and we set the flag before unlocking the clockid
5064        * because if the clockid is unlocked before a current buffer expired, we
5065        * can use that buffer to preroll with */
5066       basesink->need_preroll = TRUE;
5067
5068       if (basesink->clock_id) {
5069         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5070         gst_clock_id_unschedule (basesink->clock_id);
5071       }
5072
5073       /* if we don't have a preroll buffer we need to wait for a preroll and
5074        * return ASYNC. */
5075       if (!gst_base_sink_needs_preroll (basesink)) {
5076         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5077         basesink->playing_async = FALSE;
5078       } else {
5079         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5080           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5081           ret = GST_STATE_CHANGE_SUCCESS;
5082         } else {
5083           GST_DEBUG_OBJECT (basesink,
5084               "PLAYING to PAUSED, we are not prerolled");
5085           basesink->playing_async = TRUE;
5086           priv->commited = FALSE;
5087           priv->call_preroll = TRUE;
5088           if (priv->async_enabled) {
5089             GST_DEBUG_OBJECT (basesink, "doing async state change");
5090             ret = GST_STATE_CHANGE_ASYNC;
5091             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5092                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5093                     FALSE));
5094           }
5095         }
5096       }
5097       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5098           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5099
5100       gst_base_sink_reset_qos (basesink);
5101       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5102       break;
5103     case GST_STATE_CHANGE_PAUSED_TO_READY:
5104       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5105       /* start by reseting our position state with the object lock so that the
5106        * position query gets the right idea. We do this before we post the
5107        * messages so that the message handlers pick this up. */
5108       GST_OBJECT_LOCK (basesink);
5109       basesink->have_newsegment = FALSE;
5110       priv->current_sstart = GST_CLOCK_TIME_NONE;
5111       priv->current_sstop = GST_CLOCK_TIME_NONE;
5112       priv->have_latency = FALSE;
5113       if (priv->cached_clock_id) {
5114         gst_clock_id_unref (priv->cached_clock_id);
5115         priv->cached_clock_id = NULL;
5116       }
5117       GST_OBJECT_UNLOCK (basesink);
5118
5119       gst_base_sink_set_last_buffer (basesink, NULL);
5120       priv->call_preroll = FALSE;
5121
5122       if (!priv->commited) {
5123         if (priv->async_enabled) {
5124           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5125
5126           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5127               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5128                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5129
5130           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5131               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5132         }
5133         priv->commited = TRUE;
5134       } else {
5135         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5136       }
5137       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5138       break;
5139     case GST_STATE_CHANGE_READY_TO_NULL:
5140       if (bclass->stop) {
5141         if (!bclass->stop (basesink)) {
5142           GST_WARNING_OBJECT (basesink, "failed to stop");
5143         }
5144       }
5145       gst_base_sink_set_last_buffer (basesink, NULL);
5146       priv->call_preroll = FALSE;
5147       break;
5148     default:
5149       break;
5150   }
5151
5152   return ret;
5153
5154   /* ERRORS */
5155 start_failed:
5156   {
5157     GST_DEBUG_OBJECT (basesink, "failed to start");
5158     return GST_STATE_CHANGE_FAILURE;
5159   }
5160 activate_failed:
5161   {
5162     GST_DEBUG_OBJECT (basesink,
5163         "element failed to change states -- activation problem?");
5164     return GST_STATE_CHANGE_FAILURE;
5165   }
5166 }