basesink: ensure start_time reset upon flush
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199   /* the running time of the previous buffer */
200   GstClockTime prev_rstart;
201
202   /* EOS sync time in running time */
203   GstClockTime eos_rtime;
204
205   /* last buffer that arrived in time, running time */
206   GstClockTime last_render_time;
207   /* when the last buffer left the sink, running time */
208   GstClockTime last_left;
209
210   /* running averages go here these are done on running time */
211   GstClockTime avg_pt;
212   GstClockTime avg_duration;
213   gdouble avg_rate;
214   GstClockTime avg_in_diff;
215
216   /* these are done on system time. avg_jitter and avg_render are
217    * compared to eachother to see if the rendering time takes a
218    * huge amount of the processing, If so we are flooded with
219    * buffers. */
220   GstClockTime last_left_systime;
221   GstClockTime avg_jitter;
222   GstClockTime start, stop;
223   GstClockTime avg_render;
224
225   /* number of rendered and dropped frames */
226   guint64 rendered;
227   guint64 dropped;
228
229   /* latency stuff */
230   GstClockTime latency;
231
232   /* if we already commited the state */
233   gboolean commited;
234   /* state change to playing ongoing */
235   gboolean to_playing;
236
237   /* when we received EOS */
238   gboolean received_eos;
239
240   /* when we are prerolled and able to report latency */
241   gboolean have_latency;
242
243   /* the last buffer we prerolled or rendered. Useful for making snapshots */
244   gint enable_last_buffer;      /* atomic */
245   GstBuffer *last_buffer;
246
247   /* caps for pull based scheduling */
248   GstCaps *pull_caps;
249
250   /* blocksize for pulling */
251   guint blocksize;
252
253   gboolean discont;
254
255   /* seqnum of the stream */
256   guint32 seqnum;
257
258   gboolean call_preroll;
259   gboolean step_unlock;
260
261   /* we have a pending and a current step operation */
262   GstStepInfo current_step;
263   GstStepInfo pending_step;
264
265   /* Cached GstClockID */
266   GstClockID cached_clock_id;
267
268   /* for throttling and QoS */
269   GstClockTime earliest_in_time;
270   GstClockTime throttle_time;
271 };
272
273 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
274
275 /* generic running average, this has a neutral window size */
276 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
277
278 /* the windows for these running averages are experimentally obtained.
279  * possitive values get averaged more while negative values use a small
280  * window so we can react faster to badness. */
281 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
282 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
283
284 enum
285 {
286   _PR_IS_NOTHING = 1 << 0,
287   _PR_IS_BUFFER = 1 << 1,
288   _PR_IS_BUFFERLIST = 1 << 2,
289   _PR_IS_EVENT = 1 << 3
290 } PrivateObjectType;
291
292 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
293 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
294 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
295 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
296
297 /* BaseSink properties */
298
299 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
300 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
301
302 #define DEFAULT_PREROLL_QUEUE_LEN   0
303 #define DEFAULT_SYNC                TRUE
304 #define DEFAULT_MAX_LATENESS        -1
305 #define DEFAULT_QOS                 FALSE
306 #define DEFAULT_ASYNC               TRUE
307 #define DEFAULT_TS_OFFSET           0
308 #define DEFAULT_BLOCKSIZE           4096
309 #define DEFAULT_RENDER_DELAY        0
310 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
311 #define DEFAULT_THROTTLE_TIME       0
312
313 enum
314 {
315   PROP_0,
316   PROP_PREROLL_QUEUE_LEN,
317   PROP_SYNC,
318   PROP_MAX_LATENESS,
319   PROP_QOS,
320   PROP_ASYNC,
321   PROP_TS_OFFSET,
322   PROP_ENABLE_LAST_BUFFER,
323   PROP_LAST_BUFFER,
324   PROP_BLOCKSIZE,
325   PROP_RENDER_DELAY,
326   PROP_THROTTLE_TIME,
327   PROP_LAST
328 };
329
330 static GstElementClass *parent_class = NULL;
331
332 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
333 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
334 static void gst_base_sink_finalize (GObject * object);
335
336 GType
337 gst_base_sink_get_type (void)
338 {
339   static volatile gsize base_sink_type = 0;
340
341   if (g_once_init_enter (&base_sink_type)) {
342     GType _type;
343     static const GTypeInfo base_sink_info = {
344       sizeof (GstBaseSinkClass),
345       NULL,
346       NULL,
347       (GClassInitFunc) gst_base_sink_class_init,
348       NULL,
349       NULL,
350       sizeof (GstBaseSink),
351       0,
352       (GInstanceInitFunc) gst_base_sink_init,
353     };
354
355     _type = g_type_register_static (GST_TYPE_ELEMENT,
356         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
357     g_once_init_leave (&base_sink_type, _type);
358   }
359   return base_sink_type;
360 }
361
362 static void gst_base_sink_set_property (GObject * object, guint prop_id,
363     const GValue * value, GParamSpec * pspec);
364 static void gst_base_sink_get_property (GObject * object, guint prop_id,
365     GValue * value, GParamSpec * pspec);
366
367 static gboolean gst_base_sink_send_event (GstElement * element,
368     GstEvent * event);
369 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
370 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
371
372 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
373 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
374 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
375     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
376 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
377     GstClockTime * start, GstClockTime * end);
378 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
379     GstPad * pad, gboolean flushing);
380 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
381     gboolean active);
382 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
383     GstSegment * segment);
384 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
385     GstEvent * event, GstSegment * segment);
386
387 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
388     GstStateChange transition);
389
390 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
391 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
392     GstBufferList * list);
393
394 static void gst_base_sink_loop (GstPad * pad);
395 static gboolean gst_base_sink_pad_activate (GstPad * pad);
396 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
397 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
398 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
399
400 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
401 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
402 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
403 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
404 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
405     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
406
407
408 /* check if an object was too late */
409 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
410     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
411     GstClockReturn status, GstClockTimeDiff jitter);
412 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
413     guint8 obj_type, GstMiniObject * obj);
414
415 static void
416 gst_base_sink_class_init (GstBaseSinkClass * klass)
417 {
418   GObjectClass *gobject_class;
419   GstElementClass *gstelement_class;
420
421   gobject_class = G_OBJECT_CLASS (klass);
422   gstelement_class = GST_ELEMENT_CLASS (klass);
423
424   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
425       "basesink element");
426
427   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
428
429   parent_class = g_type_class_peek_parent (klass);
430
431   gobject_class->finalize = gst_base_sink_finalize;
432   gobject_class->set_property = gst_base_sink_set_property;
433   gobject_class->get_property = gst_base_sink_get_property;
434
435   /* FIXME, this next value should be configured using an event from the
436    * upstream element, ie, the BUFFER_SIZE event. */
437   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
438       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
439           "Number of buffers to queue during preroll", 0, G_MAXUINT,
440           DEFAULT_PREROLL_QUEUE_LEN,
441           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
442
443   g_object_class_install_property (gobject_class, PROP_SYNC,
444       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
448       g_param_spec_int64 ("max-lateness", "Max Lateness",
449           "Maximum number of nanoseconds that a buffer can be late before it "
450           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
451           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_QOS,
454       g_param_spec_boolean ("qos", "Qos",
455           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457   /**
458    * GstBaseSink:async
459    *
460    * If set to #TRUE, the basesink will perform asynchronous state changes.
461    * When set to #FALSE, the sink will not signal the parent when it prerolls.
462    * Use this option when dealing with sparse streams or when synchronisation is
463    * not required.
464    *
465    * Since: 0.10.15
466    */
467   g_object_class_install_property (gobject_class, PROP_ASYNC,
468       g_param_spec_boolean ("async", "Async",
469           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471   /**
472    * GstBaseSink:ts-offset
473    *
474    * Controls the final synchronisation, a negative value will render the buffer
475    * earlier while a positive value delays playback. This property can be
476    * used to fix synchronisation in bad files.
477    *
478    * Since: 0.10.15
479    */
480   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
481       g_param_spec_int64 ("ts-offset", "TS Offset",
482           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
483           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
484
485   /**
486    * GstBaseSink:enable-last-buffer
487    *
488    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
489    * reference to the last buffer arrived and the last-buffer property is always
490    * set to NULL. This can be useful if you need buffers to be released as soon
491    * as possible, eg. if you're using a buffer pool.
492    *
493    * Since: 0.10.30
494    */
495   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
496       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
497           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
498           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
499
500   /**
501    * GstBaseSink:last-buffer
502    *
503    * The last buffer that arrived in the sink and was used for preroll or for
504    * rendering. This property can be used to generate thumbnails. This property
505    * can be NULL when the sink has not yet received a bufer.
506    *
507    * Since: 0.10.15
508    */
509   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
510       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
511           "The last buffer received in the sink", GST_TYPE_BUFFER,
512           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
513   /**
514    * GstBaseSink:blocksize
515    *
516    * The amount of bytes to pull when operating in pull mode.
517    *
518    * Since: 0.10.22
519    */
520   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
521   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
522       g_param_spec_uint ("blocksize", "Block size",
523           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
524           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
525   /**
526    * GstBaseSink:render-delay
527    *
528    * The additional delay between synchronisation and actual rendering of the
529    * media. This property will add additional latency to the device in order to
530    * make other sinks compensate for the delay.
531    *
532    * Since: 0.10.22
533    */
534   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
535       g_param_spec_uint64 ("render-delay", "Render Delay",
536           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
537           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
538   /**
539    * GstBaseSink:throttle-time
540    *
541    * The time to insert between buffers. This property can be used to control
542    * the maximum amount of buffers per second to render. Setting this property
543    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
544    *
545    * Since: 0.10.33
546    */
547   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
548       g_param_spec_uint64 ("throttle-time", "Throttle time",
549           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
550           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551
552   gstelement_class->change_state =
553       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
554   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
555   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
556   gstelement_class->get_query_types =
557       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
558
559   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
560   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
561   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
562   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
563   klass->activate_pull =
564       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
565
566   /* Registering debug symbols for function pointers */
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
571   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
572   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
573   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
575   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
576   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
577 }
578
579 static GstCaps *
580 gst_base_sink_pad_getcaps (GstPad * pad)
581 {
582   GstBaseSinkClass *bclass;
583   GstBaseSink *bsink;
584   GstCaps *caps = NULL;
585
586   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
587   bclass = GST_BASE_SINK_GET_CLASS (bsink);
588
589   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
590     /* if we are operating in pull mode we only accept the negotiated caps */
591     GST_OBJECT_LOCK (pad);
592     if ((caps = GST_PAD_CAPS (pad)))
593       gst_caps_ref (caps);
594     GST_OBJECT_UNLOCK (pad);
595   }
596   if (caps == NULL) {
597     if (bclass->get_caps)
598       caps = bclass->get_caps (bsink);
599
600     if (caps == NULL) {
601       GstPadTemplate *pad_template;
602
603       pad_template =
604           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
605           "sink");
606       if (pad_template != NULL) {
607         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
608       }
609     }
610   }
611   gst_object_unref (bsink);
612
613   return caps;
614 }
615
616 static gboolean
617 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
618 {
619   GstBaseSinkClass *bclass;
620   GstBaseSink *bsink;
621   gboolean res = TRUE;
622
623   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
624   bclass = GST_BASE_SINK_GET_CLASS (bsink);
625
626   if (res && bclass->set_caps)
627     res = bclass->set_caps (bsink, caps);
628
629   gst_object_unref (bsink);
630
631   return res;
632 }
633
634 static void
635 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
636 {
637   GstBaseSinkClass *bclass;
638   GstBaseSink *bsink;
639
640   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
641   bclass = GST_BASE_SINK_GET_CLASS (bsink);
642
643   if (bclass->fixate)
644     bclass->fixate (bsink, caps);
645
646   gst_object_unref (bsink);
647 }
648
649 static GstFlowReturn
650 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
651     GstCaps * caps, GstBuffer ** buf)
652 {
653   GstBaseSinkClass *bclass;
654   GstBaseSink *bsink;
655   GstFlowReturn result = GST_FLOW_OK;
656
657   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
658   if (G_UNLIKELY (bsink == NULL))
659     return GST_FLOW_WRONG_STATE;
660   bclass = GST_BASE_SINK_GET_CLASS (bsink);
661
662   if (bclass->buffer_alloc)
663     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
664   else
665     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
666
667   gst_object_unref (bsink);
668
669   return result;
670 }
671
672 static void
673 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
674 {
675   GstPadTemplate *pad_template;
676   GstBaseSinkPrivate *priv;
677
678   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
679
680   pad_template =
681       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
682   g_return_if_fail (pad_template != NULL);
683
684   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
685
686   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
687   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
688   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
689   gst_pad_set_bufferalloc_function (basesink->sinkpad,
690       gst_base_sink_pad_buffer_alloc);
691   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
692   gst_pad_set_activatepush_function (basesink->sinkpad,
693       gst_base_sink_pad_activate_push);
694   gst_pad_set_activatepull_function (basesink->sinkpad,
695       gst_base_sink_pad_activate_pull);
696   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
697   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
698   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
699   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
700
701   basesink->pad_mode = GST_ACTIVATE_NONE;
702   basesink->preroll_queue = g_queue_new ();
703   basesink->abidata.ABI.clip_segment = gst_segment_new ();
704   priv->have_latency = FALSE;
705
706   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
707   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
708
709   basesink->sync = DEFAULT_SYNC;
710   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
711   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
712   priv->async_enabled = DEFAULT_ASYNC;
713   priv->ts_offset = DEFAULT_TS_OFFSET;
714   priv->render_delay = DEFAULT_RENDER_DELAY;
715   priv->blocksize = DEFAULT_BLOCKSIZE;
716   priv->cached_clock_id = NULL;
717   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
718   priv->throttle_time = DEFAULT_THROTTLE_TIME;
719
720   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
721 }
722
723 static void
724 gst_base_sink_finalize (GObject * object)
725 {
726   GstBaseSink *basesink;
727
728   basesink = GST_BASE_SINK (object);
729
730   g_queue_free (basesink->preroll_queue);
731   gst_segment_free (basesink->abidata.ABI.clip_segment);
732
733   G_OBJECT_CLASS (parent_class)->finalize (object);
734 }
735
736 /**
737  * gst_base_sink_set_sync:
738  * @sink: the sink
739  * @sync: the new sync value.
740  *
741  * Configures @sink to synchronize on the clock or not. When
742  * @sync is FALSE, incomming samples will be played as fast as
743  * possible. If @sync is TRUE, the timestamps of the incomming
744  * buffers will be used to schedule the exact render time of its
745  * contents.
746  *
747  * Since: 0.10.4
748  */
749 void
750 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
751 {
752   g_return_if_fail (GST_IS_BASE_SINK (sink));
753
754   GST_OBJECT_LOCK (sink);
755   sink->sync = sync;
756   GST_OBJECT_UNLOCK (sink);
757 }
758
759 /**
760  * gst_base_sink_get_sync:
761  * @sink: the sink
762  *
763  * Checks if @sink is currently configured to synchronize against the
764  * clock.
765  *
766  * Returns: TRUE if the sink is configured to synchronize against the clock.
767  *
768  * Since: 0.10.4
769  */
770 gboolean
771 gst_base_sink_get_sync (GstBaseSink * sink)
772 {
773   gboolean res;
774
775   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
776
777   GST_OBJECT_LOCK (sink);
778   res = sink->sync;
779   GST_OBJECT_UNLOCK (sink);
780
781   return res;
782 }
783
784 /**
785  * gst_base_sink_set_max_lateness:
786  * @sink: the sink
787  * @max_lateness: the new max lateness value.
788  *
789  * Sets the new max lateness value to @max_lateness. This value is
790  * used to decide if a buffer should be dropped or not based on the
791  * buffer timestamp and the current clock time. A value of -1 means
792  * an unlimited time.
793  *
794  * Since: 0.10.4
795  */
796 void
797 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
798 {
799   g_return_if_fail (GST_IS_BASE_SINK (sink));
800
801   GST_OBJECT_LOCK (sink);
802   sink->abidata.ABI.max_lateness = max_lateness;
803   GST_OBJECT_UNLOCK (sink);
804 }
805
806 /**
807  * gst_base_sink_get_max_lateness:
808  * @sink: the sink
809  *
810  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
811  * more details.
812  *
813  * Returns: The maximum time in nanoseconds that a buffer can be late
814  * before it is dropped and not rendered. A value of -1 means an
815  * unlimited time.
816  *
817  * Since: 0.10.4
818  */
819 gint64
820 gst_base_sink_get_max_lateness (GstBaseSink * sink)
821 {
822   gint64 res;
823
824   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
825
826   GST_OBJECT_LOCK (sink);
827   res = sink->abidata.ABI.max_lateness;
828   GST_OBJECT_UNLOCK (sink);
829
830   return res;
831 }
832
833 /**
834  * gst_base_sink_set_qos_enabled:
835  * @sink: the sink
836  * @enabled: the new qos value.
837  *
838  * Configures @sink to send Quality-of-Service events upstream.
839  *
840  * Since: 0.10.5
841  */
842 void
843 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
844 {
845   g_return_if_fail (GST_IS_BASE_SINK (sink));
846
847   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
848 }
849
850 /**
851  * gst_base_sink_is_qos_enabled:
852  * @sink: the sink
853  *
854  * Checks if @sink is currently configured to send Quality-of-Service events
855  * upstream.
856  *
857  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
858  *
859  * Since: 0.10.5
860  */
861 gboolean
862 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
863 {
864   gboolean res;
865
866   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
867
868   res = g_atomic_int_get (&sink->priv->qos_enabled);
869
870   return res;
871 }
872
873 /**
874  * gst_base_sink_set_async_enabled:
875  * @sink: the sink
876  * @enabled: the new async value.
877  *
878  * Configures @sink to perform all state changes asynchronusly. When async is
879  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
880  * preroll buffer. This feature is usefull if the sink does not synchronize
881  * against the clock or when it is dealing with sparse streams.
882  *
883  * Since: 0.10.15
884  */
885 void
886 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
887 {
888   g_return_if_fail (GST_IS_BASE_SINK (sink));
889
890   GST_PAD_PREROLL_LOCK (sink->sinkpad);
891   g_atomic_int_set (&sink->priv->async_enabled, enabled);
892   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
893   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
894 }
895
896 /**
897  * gst_base_sink_is_async_enabled:
898  * @sink: the sink
899  *
900  * Checks if @sink is currently configured to perform asynchronous state
901  * changes to PAUSED.
902  *
903  * Returns: TRUE if the sink is configured to perform asynchronous state
904  * changes.
905  *
906  * Since: 0.10.15
907  */
908 gboolean
909 gst_base_sink_is_async_enabled (GstBaseSink * sink)
910 {
911   gboolean res;
912
913   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
914
915   res = g_atomic_int_get (&sink->priv->async_enabled);
916
917   return res;
918 }
919
920 /**
921  * gst_base_sink_set_ts_offset:
922  * @sink: the sink
923  * @offset: the new offset
924  *
925  * Adjust the synchronisation of @sink with @offset. A negative value will
926  * render buffers earlier than their timestamp. A positive value will delay
927  * rendering. This function can be used to fix playback of badly timestamped
928  * buffers.
929  *
930  * Since: 0.10.15
931  */
932 void
933 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
934 {
935   g_return_if_fail (GST_IS_BASE_SINK (sink));
936
937   GST_OBJECT_LOCK (sink);
938   sink->priv->ts_offset = offset;
939   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
940   GST_OBJECT_UNLOCK (sink);
941 }
942
943 /**
944  * gst_base_sink_get_ts_offset:
945  * @sink: the sink
946  *
947  * Get the synchronisation offset of @sink.
948  *
949  * Returns: The synchronisation offset.
950  *
951  * Since: 0.10.15
952  */
953 GstClockTimeDiff
954 gst_base_sink_get_ts_offset (GstBaseSink * sink)
955 {
956   GstClockTimeDiff res;
957
958   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
959
960   GST_OBJECT_LOCK (sink);
961   res = sink->priv->ts_offset;
962   GST_OBJECT_UNLOCK (sink);
963
964   return res;
965 }
966
967 /**
968  * gst_base_sink_get_last_buffer:
969  * @sink: the sink
970  *
971  * Get the last buffer that arrived in the sink and was used for preroll or for
972  * rendering. This property can be used to generate thumbnails.
973  *
974  * The #GstCaps on the buffer can be used to determine the type of the buffer.
975  *
976  * Free-function: gst_buffer_unref
977  *
978  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
979  *     This function returns NULL when no buffer has arrived in the sink yet
980  *     or when the sink is not in PAUSED or PLAYING.
981  *
982  * Since: 0.10.15
983  */
984 GstBuffer *
985 gst_base_sink_get_last_buffer (GstBaseSink * sink)
986 {
987   GstBuffer *res;
988
989   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
990
991   GST_OBJECT_LOCK (sink);
992   if ((res = sink->priv->last_buffer))
993     gst_buffer_ref (res);
994   GST_OBJECT_UNLOCK (sink);
995
996   return res;
997 }
998
999 /* with OBJECT_LOCK */
1000 static void
1001 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1002 {
1003   GstBuffer *old;
1004
1005   old = sink->priv->last_buffer;
1006   if (G_LIKELY (old != buffer)) {
1007     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1008     if (G_LIKELY (buffer))
1009       gst_buffer_ref (buffer);
1010     sink->priv->last_buffer = buffer;
1011   } else {
1012     old = NULL;
1013   }
1014   /* avoid unreffing with the lock because cleanup code might want to take the
1015    * lock too */
1016   if (G_LIKELY (old)) {
1017     GST_OBJECT_UNLOCK (sink);
1018     gst_buffer_unref (old);
1019     GST_OBJECT_LOCK (sink);
1020   }
1021 }
1022
1023 static void
1024 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1025 {
1026   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1027     return;
1028
1029   GST_OBJECT_LOCK (sink);
1030   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1031   GST_OBJECT_UNLOCK (sink);
1032 }
1033
1034 /**
1035  * gst_base_sink_set_last_buffer_enabled:
1036  * @sink: the sink
1037  * @enabled: the new enable-last-buffer value.
1038  *
1039  * Configures @sink to store the last received buffer in the last-buffer
1040  * property.
1041  *
1042  * Since: 0.10.30
1043  */
1044 void
1045 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1046 {
1047   g_return_if_fail (GST_IS_BASE_SINK (sink));
1048
1049   /* Only take lock if we change the value */
1050   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1051           !enabled, enabled) && !enabled) {
1052     GST_OBJECT_LOCK (sink);
1053     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1054     GST_OBJECT_UNLOCK (sink);
1055   }
1056 }
1057
1058 /**
1059  * gst_base_sink_is_last_buffer_enabled:
1060  * @sink: the sink
1061  *
1062  * Checks if @sink is currently configured to store the last received buffer in
1063  * the last-buffer property.
1064  *
1065  * Returns: TRUE if the sink is configured to store the last received buffer.
1066  *
1067  * Since: 0.10.30
1068  */
1069 gboolean
1070 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1071 {
1072   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1073
1074   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1075 }
1076
1077 /**
1078  * gst_base_sink_get_latency:
1079  * @sink: the sink
1080  *
1081  * Get the currently configured latency.
1082  *
1083  * Returns: The configured latency.
1084  *
1085  * Since: 0.10.12
1086  */
1087 GstClockTime
1088 gst_base_sink_get_latency (GstBaseSink * sink)
1089 {
1090   GstClockTime res;
1091
1092   GST_OBJECT_LOCK (sink);
1093   res = sink->priv->latency;
1094   GST_OBJECT_UNLOCK (sink);
1095
1096   return res;
1097 }
1098
1099 /**
1100  * gst_base_sink_query_latency:
1101  * @sink: the sink
1102  * @live: (out) (allow-none): if the sink is live
1103  * @upstream_live: (out) (allow-none): if an upstream element is live
1104  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1105  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1106  *
1107  * Query the sink for the latency parameters. The latency will be queried from
1108  * the upstream elements. @live will be TRUE if @sink is configured to
1109  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1110  * element is live.
1111  *
1112  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1113  * for the latency introduced by the upstream elements by setting the
1114  * @min_latency to a strictly possitive value.
1115  *
1116  * This function is mostly used by subclasses.
1117  *
1118  * Returns: TRUE if the query succeeded.
1119  *
1120  * Since: 0.10.12
1121  */
1122 gboolean
1123 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1124     gboolean * upstream_live, GstClockTime * min_latency,
1125     GstClockTime * max_latency)
1126 {
1127   gboolean l, us_live, res, have_latency;
1128   GstClockTime min, max, render_delay;
1129   GstQuery *query;
1130   GstClockTime us_min, us_max;
1131
1132   /* we are live when we sync to the clock */
1133   GST_OBJECT_LOCK (sink);
1134   l = sink->sync;
1135   have_latency = sink->priv->have_latency;
1136   render_delay = sink->priv->render_delay;
1137   GST_OBJECT_UNLOCK (sink);
1138
1139   /* assume no latency */
1140   min = 0;
1141   max = -1;
1142   us_live = FALSE;
1143
1144   if (have_latency) {
1145     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1146     /* we are ready for a latency query this is when we preroll or when we are
1147      * not async. */
1148     query = gst_query_new_latency ();
1149
1150     /* ask the peer for the latency */
1151     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1152       /* get upstream min and max latency */
1153       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1154
1155       if (us_live) {
1156         /* upstream live, use its latency, subclasses should use these
1157          * values to create the complete latency. */
1158         min = us_min;
1159         max = us_max;
1160       }
1161       if (l) {
1162         /* we need to add the render delay if we are live */
1163         if (min != -1)
1164           min += render_delay;
1165         if (max != -1)
1166           max += render_delay;
1167       }
1168     }
1169     gst_query_unref (query);
1170   } else {
1171     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1172     res = FALSE;
1173   }
1174
1175   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1176   if (!res) {
1177     if (!l) {
1178       res = TRUE;
1179       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1180     } else {
1181       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1182     }
1183   }
1184
1185   if (res) {
1186     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1187         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1188         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1189
1190     if (live)
1191       *live = l;
1192     if (upstream_live)
1193       *upstream_live = us_live;
1194     if (min_latency)
1195       *min_latency = min;
1196     if (max_latency)
1197       *max_latency = max;
1198   }
1199   return res;
1200 }
1201
1202 /**
1203  * gst_base_sink_set_render_delay:
1204  * @sink: a #GstBaseSink
1205  * @delay: the new delay
1206  *
1207  * Set the render delay in @sink to @delay. The render delay is the time
1208  * between actual rendering of a buffer and its synchronisation time. Some
1209  * devices might delay media rendering which can be compensated for with this
1210  * function.
1211  *
1212  * After calling this function, this sink will report additional latency and
1213  * other sinks will adjust their latency to delay the rendering of their media.
1214  *
1215  * This function is usually called by subclasses.
1216  *
1217  * Since: 0.10.21
1218  */
1219 void
1220 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1221 {
1222   GstClockTime old_render_delay;
1223
1224   g_return_if_fail (GST_IS_BASE_SINK (sink));
1225
1226   GST_OBJECT_LOCK (sink);
1227   old_render_delay = sink->priv->render_delay;
1228   sink->priv->render_delay = delay;
1229   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1230       GST_TIME_ARGS (delay));
1231   GST_OBJECT_UNLOCK (sink);
1232
1233   if (delay != old_render_delay) {
1234     GST_DEBUG_OBJECT (sink, "posting latency changed");
1235     gst_element_post_message (GST_ELEMENT_CAST (sink),
1236         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1237   }
1238 }
1239
1240 /**
1241  * gst_base_sink_get_render_delay:
1242  * @sink: a #GstBaseSink
1243  *
1244  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1245  * information about the render delay.
1246  *
1247  * Returns: the render delay of @sink.
1248  *
1249  * Since: 0.10.21
1250  */
1251 GstClockTime
1252 gst_base_sink_get_render_delay (GstBaseSink * sink)
1253 {
1254   GstClockTimeDiff res;
1255
1256   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1257
1258   GST_OBJECT_LOCK (sink);
1259   res = sink->priv->render_delay;
1260   GST_OBJECT_UNLOCK (sink);
1261
1262   return res;
1263 }
1264
1265 /**
1266  * gst_base_sink_set_blocksize:
1267  * @sink: a #GstBaseSink
1268  * @blocksize: the blocksize in bytes
1269  *
1270  * Set the number of bytes that the sink will pull when it is operating in pull
1271  * mode.
1272  *
1273  * Since: 0.10.22
1274  */
1275 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1276 void
1277 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1278 {
1279   g_return_if_fail (GST_IS_BASE_SINK (sink));
1280
1281   GST_OBJECT_LOCK (sink);
1282   sink->priv->blocksize = blocksize;
1283   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1284   GST_OBJECT_UNLOCK (sink);
1285 }
1286
1287 /**
1288  * gst_base_sink_get_blocksize:
1289  * @sink: a #GstBaseSink
1290  *
1291  * Get the number of bytes that the sink will pull when it is operating in pull
1292  * mode.
1293  *
1294  * Returns: the number of bytes @sink will pull in pull mode.
1295  *
1296  * Since: 0.10.22
1297  */
1298 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1299 guint
1300 gst_base_sink_get_blocksize (GstBaseSink * sink)
1301 {
1302   guint res;
1303
1304   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1305
1306   GST_OBJECT_LOCK (sink);
1307   res = sink->priv->blocksize;
1308   GST_OBJECT_UNLOCK (sink);
1309
1310   return res;
1311 }
1312
1313 /**
1314  * gst_base_sink_set_throttle_time:
1315  * @sink: a #GstBaseSink
1316  * @throttle: the throttle time in nanoseconds
1317  *
1318  * Set the time that will be inserted between rendered buffers. This
1319  * can be used to control the maximum buffers per second that the sink
1320  * will render. 
1321  *
1322  * Since: 0.10.33
1323  */
1324 void
1325 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1326 {
1327   g_return_if_fail (GST_IS_BASE_SINK (sink));
1328
1329   GST_OBJECT_LOCK (sink);
1330   sink->priv->throttle_time = throttle;
1331   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1332   GST_OBJECT_UNLOCK (sink);
1333 }
1334
1335 /**
1336  * gst_base_sink_get_throttle_time:
1337  * @sink: a #GstBaseSink
1338  *
1339  * Get the time that will be inserted between frames to control the 
1340  * maximum buffers per second.
1341  *
1342  * Returns: the number of nanoseconds @sink will put between frames.
1343  *
1344  * Since: 0.10.33
1345  */
1346 guint64
1347 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1348 {
1349   guint64 res;
1350
1351   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1352
1353   GST_OBJECT_LOCK (sink);
1354   res = sink->priv->throttle_time;
1355   GST_OBJECT_UNLOCK (sink);
1356
1357   return res;
1358 }
1359
1360 static void
1361 gst_base_sink_set_property (GObject * object, guint prop_id,
1362     const GValue * value, GParamSpec * pspec)
1363 {
1364   GstBaseSink *sink = GST_BASE_SINK (object);
1365
1366   switch (prop_id) {
1367     case PROP_PREROLL_QUEUE_LEN:
1368       /* preroll lock necessary to serialize with finish_preroll */
1369       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1370       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1371       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1372       break;
1373     case PROP_SYNC:
1374       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1375       break;
1376     case PROP_MAX_LATENESS:
1377       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1378       break;
1379     case PROP_QOS:
1380       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1381       break;
1382     case PROP_ASYNC:
1383       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1384       break;
1385     case PROP_TS_OFFSET:
1386       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1387       break;
1388     case PROP_BLOCKSIZE:
1389       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1390       break;
1391     case PROP_RENDER_DELAY:
1392       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1393       break;
1394     case PROP_ENABLE_LAST_BUFFER:
1395       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1396       break;
1397     case PROP_THROTTLE_TIME:
1398       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1399       break;
1400     default:
1401       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1402       break;
1403   }
1404 }
1405
1406 static void
1407 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1408     GParamSpec * pspec)
1409 {
1410   GstBaseSink *sink = GST_BASE_SINK (object);
1411
1412   switch (prop_id) {
1413     case PROP_PREROLL_QUEUE_LEN:
1414       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1415       break;
1416     case PROP_SYNC:
1417       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1418       break;
1419     case PROP_MAX_LATENESS:
1420       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1421       break;
1422     case PROP_QOS:
1423       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1424       break;
1425     case PROP_ASYNC:
1426       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1427       break;
1428     case PROP_TS_OFFSET:
1429       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1430       break;
1431     case PROP_LAST_BUFFER:
1432       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1433       break;
1434     case PROP_ENABLE_LAST_BUFFER:
1435       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1436       break;
1437     case PROP_BLOCKSIZE:
1438       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1439       break;
1440     case PROP_RENDER_DELAY:
1441       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1442       break;
1443     case PROP_THROTTLE_TIME:
1444       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1445       break;
1446     default:
1447       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1448       break;
1449   }
1450 }
1451
1452
1453 static GstCaps *
1454 gst_base_sink_get_caps (GstBaseSink * sink)
1455 {
1456   return NULL;
1457 }
1458
1459 static gboolean
1460 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1461 {
1462   return TRUE;
1463 }
1464
1465 static GstFlowReturn
1466 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1467     GstCaps * caps, GstBuffer ** buf)
1468 {
1469   *buf = NULL;
1470   return GST_FLOW_OK;
1471 }
1472
1473 /* with PREROLL_LOCK, STREAM_LOCK */
1474 static void
1475 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1476 {
1477   GstMiniObject *obj;
1478
1479   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1480   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1481     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1482     gst_mini_object_unref (obj);
1483   }
1484   /* we can't have EOS anymore now */
1485   basesink->eos = FALSE;
1486   basesink->priv->received_eos = FALSE;
1487   basesink->have_preroll = FALSE;
1488   basesink->priv->step_unlock = FALSE;
1489   basesink->eos_queued = FALSE;
1490   basesink->preroll_queued = 0;
1491   basesink->buffers_queued = 0;
1492   basesink->events_queued = 0;
1493   /* can't report latency anymore until we preroll again */
1494   if (basesink->priv->async_enabled) {
1495     GST_OBJECT_LOCK (basesink);
1496     basesink->priv->have_latency = FALSE;
1497     GST_OBJECT_UNLOCK (basesink);
1498   }
1499   /* and signal any waiters now */
1500   GST_PAD_PREROLL_SIGNAL (pad);
1501 }
1502
1503 /* with STREAM_LOCK, configures given segment with the event information. */
1504 static void
1505 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1506     GstEvent * event, GstSegment * segment)
1507 {
1508   gboolean update;
1509   gdouble rate, arate;
1510   GstFormat format;
1511   gint64 start;
1512   gint64 stop;
1513   gint64 time;
1514
1515   /* the newsegment event is needed to bring the buffer timestamps to the
1516    * stream time and to drop samples outside of the playback segment. */
1517   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1518       &start, &stop, &time);
1519
1520   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1521    * We protect with the OBJECT_LOCK so that we can use the values to
1522    * safely answer a POSITION query. */
1523   GST_OBJECT_LOCK (basesink);
1524   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1525       stop, time);
1526
1527   if (format == GST_FORMAT_TIME) {
1528     GST_DEBUG_OBJECT (basesink,
1529         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1530         "format GST_FORMAT_TIME, "
1531         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1532         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1533         update, rate, arate, GST_TIME_ARGS (segment->start),
1534         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1535         GST_TIME_ARGS (segment->accum));
1536   } else {
1537     GST_DEBUG_OBJECT (basesink,
1538         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1539         "format %d, "
1540         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1541         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1542         segment->format, segment->start, segment->stop, segment->time,
1543         segment->accum);
1544   }
1545   GST_OBJECT_UNLOCK (basesink);
1546 }
1547
1548 /* with PREROLL_LOCK, STREAM_LOCK */
1549 static gboolean
1550 gst_base_sink_commit_state (GstBaseSink * basesink)
1551 {
1552   /* commit state and proceed to next pending state */
1553   GstState current, next, pending, post_pending;
1554   gboolean post_paused = FALSE;
1555   gboolean post_async_done = FALSE;
1556   gboolean post_playing = FALSE;
1557
1558   /* we are certainly not playing async anymore now */
1559   basesink->playing_async = FALSE;
1560
1561   GST_OBJECT_LOCK (basesink);
1562   current = GST_STATE (basesink);
1563   next = GST_STATE_NEXT (basesink);
1564   pending = GST_STATE_PENDING (basesink);
1565   post_pending = pending;
1566
1567   switch (pending) {
1568     case GST_STATE_PLAYING:
1569     {
1570       GstBaseSinkClass *bclass;
1571       GstStateChangeReturn ret;
1572
1573       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1574
1575       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1576
1577       basesink->need_preroll = FALSE;
1578       post_async_done = TRUE;
1579       basesink->priv->commited = TRUE;
1580       post_playing = TRUE;
1581       /* post PAUSED too when we were READY */
1582       if (current == GST_STATE_READY) {
1583         post_paused = TRUE;
1584       }
1585
1586       /* make sure we notify the subclass of async playing */
1587       if (bclass->async_play) {
1588         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1589         ret = bclass->async_play (basesink);
1590         if (ret == GST_STATE_CHANGE_FAILURE)
1591           goto async_failed;
1592       }
1593       break;
1594     }
1595     case GST_STATE_PAUSED:
1596       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1597       post_paused = TRUE;
1598       post_async_done = TRUE;
1599       basesink->priv->commited = TRUE;
1600       post_pending = GST_STATE_VOID_PENDING;
1601       break;
1602     case GST_STATE_READY:
1603     case GST_STATE_NULL:
1604       goto stopping;
1605     case GST_STATE_VOID_PENDING:
1606       goto nothing_pending;
1607     default:
1608       break;
1609   }
1610
1611   /* we can report latency queries now */
1612   basesink->priv->have_latency = TRUE;
1613
1614   GST_STATE (basesink) = pending;
1615   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1616   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1617   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1618   GST_OBJECT_UNLOCK (basesink);
1619
1620   if (post_paused) {
1621     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1622     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1623         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1624             current, next, post_pending));
1625   }
1626   if (post_async_done) {
1627     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1628     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1629         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1630   }
1631   if (post_playing) {
1632     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1633     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1634         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1635             next, pending, GST_STATE_VOID_PENDING));
1636   }
1637
1638   GST_STATE_BROADCAST (basesink);
1639
1640   return TRUE;
1641
1642 nothing_pending:
1643   {
1644     /* Depending on the state, set our vars. We get in this situation when the
1645      * state change function got a change to update the state vars before the
1646      * streaming thread did. This is fine but we need to make sure that we
1647      * update the need_preroll var since it was TRUE when we got here and might
1648      * become FALSE if we got to PLAYING. */
1649     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1650         gst_element_state_get_name (current));
1651     switch (current) {
1652       case GST_STATE_PLAYING:
1653         basesink->need_preroll = FALSE;
1654         break;
1655       case GST_STATE_PAUSED:
1656         basesink->need_preroll = TRUE;
1657         break;
1658       default:
1659         basesink->need_preroll = FALSE;
1660         basesink->flushing = TRUE;
1661         break;
1662     }
1663     /* we can report latency queries now */
1664     basesink->priv->have_latency = TRUE;
1665     GST_OBJECT_UNLOCK (basesink);
1666     return TRUE;
1667   }
1668 stopping:
1669   {
1670     /* app is going to READY */
1671     GST_DEBUG_OBJECT (basesink, "stopping");
1672     basesink->need_preroll = FALSE;
1673     basesink->flushing = TRUE;
1674     GST_OBJECT_UNLOCK (basesink);
1675     return FALSE;
1676   }
1677 async_failed:
1678   {
1679     GST_DEBUG_OBJECT (basesink, "async commit failed");
1680     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1681     GST_OBJECT_UNLOCK (basesink);
1682     return FALSE;
1683   }
1684 }
1685
1686 static void
1687 start_stepping (GstBaseSink * sink, GstSegment * segment,
1688     GstStepInfo * pending, GstStepInfo * current)
1689 {
1690   gint64 end;
1691   GstMessage *message;
1692
1693   GST_DEBUG_OBJECT (sink, "update pending step");
1694
1695   GST_OBJECT_LOCK (sink);
1696   memcpy (current, pending, sizeof (GstStepInfo));
1697   pending->valid = FALSE;
1698   GST_OBJECT_UNLOCK (sink);
1699
1700   /* post message first */
1701   message =
1702       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1703       current->amount, current->rate, current->flush, current->intermediate);
1704   gst_message_set_seqnum (message, current->seqnum);
1705   gst_element_post_message (GST_ELEMENT (sink), message);
1706
1707   /* get the running time of where we paused and remember it */
1708   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1709   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1710
1711   /* set the new rate for the remainder of the segment */
1712   current->start_rate = segment->rate;
1713   segment->rate *= current->rate;
1714   segment->abs_rate = ABS (segment->rate);
1715
1716   /* save values */
1717   if (segment->rate > 0.0)
1718     current->start_stop = segment->stop;
1719   else
1720     current->start_start = segment->start;
1721
1722   if (current->format == GST_FORMAT_TIME) {
1723     end = current->start + current->amount;
1724     if (!current->flush) {
1725       /* update the segment clipping regions for non-flushing seeks */
1726       if (segment->rate > 0.0) {
1727         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1728         segment->last_stop = segment->stop;
1729       } else {
1730         gint64 position;
1731
1732         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1733         segment->time = position;
1734         segment->start = position;
1735         segment->last_stop = position;
1736       }
1737     }
1738   }
1739
1740   GST_DEBUG_OBJECT (sink,
1741       "segment now rate %lf, applied rate %lf, "
1742       "format GST_FORMAT_TIME, "
1743       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1744       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1745       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1746       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1747       GST_TIME_ARGS (segment->accum));
1748
1749   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1750       GST_TIME_ARGS (current->start));
1751
1752   if (current->amount == -1) {
1753     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1754     current->valid = FALSE;
1755   } else {
1756     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1757         "rate: %f", current->amount, gst_format_get_name (current->format),
1758         current->rate);
1759   }
1760 }
1761
1762 static void
1763 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1764     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1765 {
1766   gint64 stop, position;
1767   GstMessage *message;
1768
1769   GST_DEBUG_OBJECT (sink, "step complete");
1770
1771   if (segment->rate > 0.0)
1772     stop = rstart;
1773   else
1774     stop = rstop;
1775
1776   GST_DEBUG_OBJECT (sink,
1777       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1778
1779   if (stop == -1)
1780     current->duration = current->position;
1781   else
1782     current->duration = stop - current->start;
1783
1784   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1785       GST_TIME_ARGS (current->duration));
1786
1787   position = current->start + current->duration;
1788
1789   /* now move the segment to the new running time */
1790   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1791
1792   if (current->flush) {
1793     /* and remove the accumulated time we flushed, start time did not change */
1794     segment->accum = current->start;
1795   } else {
1796     /* start time is now the stepped position */
1797     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1798   }
1799
1800   /* restore the previous rate */
1801   segment->rate = current->start_rate;
1802   segment->abs_rate = ABS (segment->rate);
1803
1804   if (segment->rate > 0.0)
1805     segment->stop = current->start_stop;
1806   else
1807     segment->start = current->start_start;
1808
1809   /* the clip segment is used for position report in paused... */
1810   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1811
1812   /* post the step done when we know the stepped duration in TIME */
1813   message =
1814       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1815       current->amount, current->rate, current->flush, current->intermediate,
1816       current->duration, eos);
1817   gst_message_set_seqnum (message, current->seqnum);
1818   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1819
1820   if (!current->intermediate)
1821     sink->need_preroll = current->need_preroll;
1822
1823   /* and the current step info finished and becomes invalid */
1824   current->valid = FALSE;
1825 }
1826
1827 static gboolean
1828 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1829     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1830     gint64 * rstop)
1831 {
1832   gboolean step_end = FALSE;
1833
1834   /* see if we need to skip this buffer because of stepping */
1835   switch (current->format) {
1836     case GST_FORMAT_TIME:
1837     {
1838       guint64 end;
1839       gint64 first, last;
1840
1841       if (segment->rate > 0.0) {
1842         if (segment->stop == *cstop)
1843           *rstop = *rstart + current->amount;
1844
1845         first = *rstart;
1846         last = *rstop;
1847       } else {
1848         if (segment->start == *cstart)
1849           *rstart = *rstop + current->amount;
1850
1851         first = *rstop;
1852         last = *rstart;
1853       }
1854
1855       end = current->start + current->amount;
1856       current->position = first - current->start;
1857
1858       if (G_UNLIKELY (segment->abs_rate != 1.0))
1859         current->position /= segment->abs_rate;
1860
1861       GST_DEBUG_OBJECT (sink,
1862           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1863           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1864       GST_DEBUG_OBJECT (sink,
1865           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1866           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1867           GST_TIME_ARGS (last - current->start),
1868           GST_TIME_ARGS (current->amount));
1869
1870       if ((current->flush && current->position >= current->amount)
1871           || last >= end) {
1872         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1873         step_end = TRUE;
1874         if (segment->rate > 0.0) {
1875           *rstart = end;
1876           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1877         } else {
1878           *rstop = end;
1879           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1880         }
1881       }
1882       GST_DEBUG_OBJECT (sink,
1883           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1884           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1885       GST_DEBUG_OBJECT (sink,
1886           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1887           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1888       break;
1889     }
1890     case GST_FORMAT_BUFFERS:
1891       GST_DEBUG_OBJECT (sink,
1892           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1893           current->position, current->amount);
1894
1895       if (current->position < current->amount) {
1896         current->position++;
1897       } else {
1898         step_end = TRUE;
1899       }
1900       break;
1901     case GST_FORMAT_DEFAULT:
1902     default:
1903       GST_DEBUG_OBJECT (sink,
1904           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1905           current->position, current->amount);
1906       break;
1907   }
1908   return step_end;
1909 }
1910
1911 /* with STREAM_LOCK, PREROLL_LOCK
1912  *
1913  * Returns TRUE if the object needs synchronisation and takes therefore
1914  * part in prerolling.
1915  *
1916  * rsstart/rsstop contain the start/stop in stream time.
1917  * rrstart/rrstop contain the start/stop in running time.
1918  */
1919 static gboolean
1920 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1921     GstClockTime * rsstart, GstClockTime * rsstop,
1922     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1923     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1924     gboolean * step_end, guint8 obj_type)
1925 {
1926   GstBaseSinkClass *bclass;
1927   GstBuffer *buffer;
1928   GstClockTime start, stop;     /* raw start/stop timestamps */
1929   gint64 cstart, cstop;         /* clipped raw timestamps */
1930   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1931   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1932   GstFormat format;
1933   GstBaseSinkPrivate *priv;
1934   gboolean eos;
1935
1936   priv = basesink->priv;
1937
1938   /* start with nothing */
1939   start = stop = GST_CLOCK_TIME_NONE;
1940
1941   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1942     GstEvent *event = GST_EVENT_CAST (obj);
1943
1944     switch (GST_EVENT_TYPE (event)) {
1945         /* EOS event needs syncing */
1946       case GST_EVENT_EOS:
1947       {
1948         if (basesink->segment.rate >= 0.0) {
1949           sstart = sstop = priv->current_sstop;
1950           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1951             /* we have not seen a buffer yet, use the segment values */
1952             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1953                 basesink->segment.format, basesink->segment.stop);
1954           }
1955         } else {
1956           sstart = sstop = priv->current_sstart;
1957           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1958             /* we have not seen a buffer yet, use the segment values */
1959             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1960                 basesink->segment.format, basesink->segment.start);
1961           }
1962         }
1963
1964         rstart = rstop = priv->eos_rtime;
1965         *do_sync = rstart != -1;
1966         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1967             GST_TIME_ARGS (rstart));
1968         /* if we are stepping, we end now */
1969         *step_end = step->valid;
1970         eos = TRUE;
1971         goto eos_done;
1972       }
1973       default:
1974         /* other events do not need syncing */
1975         /* FIXME, maybe NEWSEGMENT might need synchronisation
1976          * since the POSITION query depends on accumulated times and
1977          * we cannot accumulate the current segment before the previous
1978          * one completed.
1979          */
1980         return FALSE;
1981     }
1982   }
1983
1984   eos = FALSE;
1985
1986 again:
1987   /* else do buffer sync code */
1988   buffer = GST_BUFFER_CAST (obj);
1989
1990   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1991
1992   /* just get the times to see if we need syncing, if the start returns -1 we
1993    * don't sync. */
1994   if (bclass->get_times)
1995     bclass->get_times (basesink, buffer, &start, &stop);
1996
1997   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1998     /* we don't need to sync but we still want to get the timestamps for
1999      * tracking the position */
2000     gst_base_sink_get_times (basesink, buffer, &start, &stop);
2001     *do_sync = FALSE;
2002   } else {
2003     *do_sync = TRUE;
2004   }
2005
2006   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2007       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2008       GST_TIME_ARGS (stop), *do_sync);
2009
2010   /* collect segment and format for code clarity */
2011   format = segment->format;
2012
2013   /* no timestamp clipping if we did not get a TIME segment format */
2014   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2015     cstart = start;
2016     cstop = stop;
2017     /* do running and stream time in TIME format */
2018     format = GST_FORMAT_TIME;
2019     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2020     goto do_times;
2021   }
2022
2023   /* clip, only when we know about time */
2024   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2025               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2026     if (step->valid) {
2027       GST_DEBUG_OBJECT (basesink, "step out of segment");
2028       /* when we are stepping, pretend we're at the end of the segment */
2029       if (segment->rate > 0.0) {
2030         cstart = segment->stop;
2031         cstop = segment->stop;
2032       } else {
2033         cstart = segment->start;
2034         cstop = segment->start;
2035       }
2036       goto do_times;
2037     }
2038     goto out_of_segment;
2039   }
2040
2041   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2042     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2043         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2044         GST_TIME_ARGS (cstop));
2045   }
2046
2047   /* set last stop position */
2048   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2049     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2050   else
2051     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2052
2053 do_times:
2054   rstart = gst_segment_to_running_time (segment, format, cstart);
2055   rstop = gst_segment_to_running_time (segment, format, cstop);
2056
2057   if (G_UNLIKELY (step->valid)) {
2058     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2059                 &rstart, &rstop))) {
2060       /* step is still busy, we discard data when we are flushing */
2061       *stepped = step->flush;
2062       GST_DEBUG_OBJECT (basesink, "stepping busy");
2063     }
2064   }
2065   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2066    * upstream is behaving very badly */
2067   sstart = gst_segment_to_stream_time (segment, format, cstart);
2068   sstop = gst_segment_to_stream_time (segment, format, cstop);
2069
2070 eos_done:
2071   /* eos_done label only called when doing EOS, we also stop stepping then */
2072   if (*step_end && step->flush) {
2073     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2074     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2075     *step_end = FALSE;
2076     /* re-determine running start times for adjusted segment
2077      * (which has a flushed amount of running/accumulated time removed) */
2078     if (!GST_IS_EVENT (obj)) {
2079       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2080       goto again;
2081     }
2082   }
2083
2084   /* save times */
2085   *rsstart = sstart;
2086   *rsstop = sstop;
2087   *rrstart = rstart;
2088   *rrstop = rstop;
2089
2090   /* buffers and EOS always need syncing and preroll */
2091   return TRUE;
2092
2093   /* special cases */
2094 out_of_segment:
2095   {
2096     /* we usually clip in the chain function already but stepping could cause
2097      * the segment to be updated later. we return FALSE so that we don't try
2098      * to sync on it. */
2099     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2100     return FALSE;
2101   }
2102 }
2103
2104 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2105  * adjust a timestamp with the latency and timestamp offset. This function does
2106  * not adjust for the render delay. */
2107 static GstClockTime
2108 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2109 {
2110   GstClockTimeDiff ts_offset;
2111
2112   /* don't do anything funny with invalid timestamps */
2113   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2114     return time;
2115
2116   time += basesink->priv->latency;
2117
2118   /* apply offset, be carefull for underflows */
2119   ts_offset = basesink->priv->ts_offset;
2120   if (ts_offset < 0) {
2121     ts_offset = -ts_offset;
2122     if (ts_offset < time)
2123       time -= ts_offset;
2124     else
2125       time = 0;
2126   } else
2127     time += ts_offset;
2128
2129   /* subtract the render delay again, which was included in the latency */
2130   if (time > basesink->priv->render_delay)
2131     time -= basesink->priv->render_delay;
2132   else
2133     time = 0;
2134
2135   return time;
2136 }
2137
2138 /**
2139  * gst_base_sink_wait_clock:
2140  * @sink: the sink
2141  * @time: the running_time to be reached
2142  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2143  *
2144  * This function will block until @time is reached. It is usually called by
2145  * subclasses that use their own internal synchronisation.
2146  *
2147  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2148  * returned. Likewise, if synchronisation is disabled in the element or there
2149  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2150  *
2151  * This function should only be called with the PREROLL_LOCK held, like when
2152  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2153  * receiving a buffer in
2154  * the #GstBaseSinkClass.render() vmethod.
2155  *
2156  * The @time argument should be the running_time of when this method should
2157  * return and is not adjusted with any latency or offset configured in the
2158  * sink.
2159  *
2160  * Since: 0.10.20
2161  *
2162  * Returns: #GstClockReturn
2163  */
2164 GstClockReturn
2165 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2166     GstClockTimeDiff * jitter)
2167 {
2168   GstClockReturn ret;
2169   GstClock *clock;
2170   GstClockTime base_time;
2171
2172   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2173     goto invalid_time;
2174
2175   GST_OBJECT_LOCK (sink);
2176   if (G_UNLIKELY (!sink->sync))
2177     goto no_sync;
2178
2179   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2180     goto no_clock;
2181
2182   base_time = GST_ELEMENT_CAST (sink)->base_time;
2183   GST_LOG_OBJECT (sink,
2184       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2185       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2186
2187   /* add base_time to running_time to get the time against the clock */
2188   time += base_time;
2189
2190   /* Re-use existing clockid if available */
2191   /* FIXME: Casting to GstClockEntry only works because the types
2192    * are the same */
2193   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2194           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
2195               priv->cached_clock_id) == clock)) {
2196     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2197             time)) {
2198       gst_clock_id_unref (sink->priv->cached_clock_id);
2199       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2200     }
2201   } else {
2202     if (sink->priv->cached_clock_id != NULL)
2203       gst_clock_id_unref (sink->priv->cached_clock_id);
2204     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2205   }
2206   GST_OBJECT_UNLOCK (sink);
2207
2208   /* A blocking wait is performed on the clock. We save the ClockID
2209    * so we can unlock the entry at any time. While we are blocking, we
2210    * release the PREROLL_LOCK so that other threads can interrupt the
2211    * entry. */
2212   sink->clock_id = sink->priv->cached_clock_id;
2213   /* release the preroll lock while waiting */
2214   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2215
2216   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2217
2218   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2219   sink->clock_id = NULL;
2220
2221   return ret;
2222
2223   /* no syncing needed */
2224 invalid_time:
2225   {
2226     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2227     return GST_CLOCK_BADTIME;
2228   }
2229 no_sync:
2230   {
2231     GST_DEBUG_OBJECT (sink, "sync disabled");
2232     GST_OBJECT_UNLOCK (sink);
2233     return GST_CLOCK_BADTIME;
2234   }
2235 no_clock:
2236   {
2237     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2238     GST_OBJECT_UNLOCK (sink);
2239     return GST_CLOCK_BADTIME;
2240   }
2241 }
2242
2243 /**
2244  * gst_base_sink_wait_preroll:
2245  * @sink: the sink
2246  *
2247  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2248  * against the clock it must unblock when going from PLAYING to the PAUSED state
2249  * and call this method before continuing to render the remaining data.
2250  *
2251  * This function will block until a state change to PLAYING happens (in which
2252  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2253  * to a state change to READY or a FLUSH event (in which case this function
2254  * returns #GST_FLOW_WRONG_STATE).
2255  *
2256  * This function should only be called with the PREROLL_LOCK held, like in the
2257  * render function.
2258  *
2259  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2260  * continue. Any other return value should be returned from the render vmethod.
2261  *
2262  * Since: 0.10.11
2263  */
2264 GstFlowReturn
2265 gst_base_sink_wait_preroll (GstBaseSink * sink)
2266 {
2267   sink->have_preroll = TRUE;
2268   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2269   /* block until the state changes, or we get a flush, or something */
2270   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2271   sink->have_preroll = FALSE;
2272   if (G_UNLIKELY (sink->flushing))
2273     goto stopping;
2274   if (G_UNLIKELY (sink->priv->step_unlock))
2275     goto step_unlocked;
2276   GST_DEBUG_OBJECT (sink, "continue after preroll");
2277
2278   return GST_FLOW_OK;
2279
2280   /* ERRORS */
2281 stopping:
2282   {
2283     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2284     return GST_FLOW_WRONG_STATE;
2285   }
2286 step_unlocked:
2287   {
2288     sink->priv->step_unlock = FALSE;
2289     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2290     return GST_FLOW_STEP;
2291   }
2292 }
2293
2294 static inline guint8
2295 get_object_type (GstMiniObject * obj)
2296 {
2297   guint8 obj_type;
2298
2299   if (G_LIKELY (GST_IS_BUFFER (obj)))
2300     obj_type = _PR_IS_BUFFER;
2301   else if (GST_IS_EVENT (obj))
2302     obj_type = _PR_IS_EVENT;
2303   else if (GST_IS_BUFFER_LIST (obj))
2304     obj_type = _PR_IS_BUFFERLIST;
2305   else
2306     obj_type = _PR_IS_NOTHING;
2307
2308   return obj_type;
2309 }
2310
2311 /**
2312  * gst_base_sink_do_preroll:
2313  * @sink: the sink
2314  * @obj: (transfer none): the mini object that caused the preroll
2315  *
2316  * If the @sink spawns its own thread for pulling buffers from upstream it
2317  * should call this method after it has pulled a buffer. If the element needed
2318  * to preroll, this function will perform the preroll and will then block
2319  * until the element state is changed.
2320  *
2321  * This function should be called with the PREROLL_LOCK held.
2322  *
2323  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2324  * continue. Any other return value should be returned from the render vmethod.
2325  *
2326  * Since: 0.10.22
2327  */
2328 GstFlowReturn
2329 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2330 {
2331   GstFlowReturn ret;
2332
2333   while (G_UNLIKELY (sink->need_preroll)) {
2334     guint8 obj_type;
2335     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2336
2337     obj_type = get_object_type (obj);
2338
2339     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2340     if (ret != GST_FLOW_OK)
2341       goto preroll_failed;
2342
2343     /* need to recheck here because the commit state could have
2344      * made us not need the preroll anymore */
2345     if (G_LIKELY (sink->need_preroll)) {
2346       /* block until the state changes, or we get a flush, or something */
2347       ret = gst_base_sink_wait_preroll (sink);
2348       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2349         goto preroll_failed;
2350     }
2351   }
2352   return GST_FLOW_OK;
2353
2354   /* ERRORS */
2355 preroll_failed:
2356   {
2357     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2358     return ret;
2359   }
2360 }
2361
2362 /**
2363  * gst_base_sink_wait_eos:
2364  * @sink: the sink
2365  * @time: the running_time to be reached
2366  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2367  *
2368  * This function will block until @time is reached. It is usually called by
2369  * subclasses that use their own internal synchronisation but want to let the
2370  * EOS be handled by the base class.
2371  *
2372  * This function should only be called with the PREROLL_LOCK held, like when
2373  * receiving an EOS event in the ::event vmethod.
2374  *
2375  * The @time argument should be the running_time of when the EOS should happen
2376  * and will be adjusted with any latency and offset configured in the sink.
2377  *
2378  * Returns: #GstFlowReturn
2379  *
2380  * Since: 0.10.15
2381  */
2382 GstFlowReturn
2383 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2384     GstClockTimeDiff * jitter)
2385 {
2386   GstClockReturn status;
2387   GstFlowReturn ret;
2388
2389   do {
2390     GstClockTime stime;
2391
2392     GST_DEBUG_OBJECT (sink, "checking preroll");
2393
2394     /* first wait for the playing state before we can continue */
2395     while (G_UNLIKELY (sink->need_preroll)) {
2396       ret = gst_base_sink_wait_preroll (sink);
2397       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2398         goto flushing;
2399     }
2400
2401     /* preroll done, we can sync since we are in PLAYING now. */
2402     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2403         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2404
2405     /* compensate for latency and ts_offset. We don't adjust for render delay
2406      * because we don't interact with the device on EOS normally. */
2407     stime = gst_base_sink_adjust_time (sink, time);
2408
2409     /* wait for the clock, this can be interrupted because we got shut down or
2410      * we PAUSED. */
2411     status = gst_base_sink_wait_clock (sink, stime, jitter);
2412
2413     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2414
2415     /* invalid time, no clock or sync disabled, just continue then */
2416     if (status == GST_CLOCK_BADTIME)
2417       break;
2418
2419     /* waiting could have been interrupted and we can be flushing now */
2420     if (G_UNLIKELY (sink->flushing))
2421       goto flushing;
2422
2423     /* retry if we got unscheduled, which means we did not reach the timeout
2424      * yet. if some other error occures, we continue. */
2425   } while (status == GST_CLOCK_UNSCHEDULED);
2426
2427   GST_DEBUG_OBJECT (sink, "end of stream");
2428
2429   return GST_FLOW_OK;
2430
2431   /* ERRORS */
2432 flushing:
2433   {
2434     GST_DEBUG_OBJECT (sink, "we are flushing");
2435     return GST_FLOW_WRONG_STATE;
2436   }
2437 }
2438
2439 /* with STREAM_LOCK, PREROLL_LOCK
2440  *
2441  * Make sure we are in PLAYING and synchronize an object to the clock.
2442  *
2443  * If we need preroll, we are not in PLAYING. We try to commit the state
2444  * if needed and then block if we still are not PLAYING.
2445  *
2446  * We start waiting on the clock in PLAYING. If we got interrupted, we
2447  * immediatly try to re-preroll.
2448  *
2449  * Some objects do not need synchronisation (most events) and so this function
2450  * immediatly returns GST_FLOW_OK.
2451  *
2452  * for objects that arrive later than max-lateness to be synchronized to the
2453  * clock have the @late boolean set to TRUE.
2454  *
2455  * This function keeps a running average of the jitter (the diff between the
2456  * clock time and the requested sync time). The jitter is negative for
2457  * objects that arrive in time and positive for late buffers.
2458  *
2459  * does not take ownership of obj.
2460  */
2461 static GstFlowReturn
2462 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2463     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2464 {
2465   GstClockTimeDiff jitter = 0;
2466   gboolean syncable;
2467   GstClockReturn status = GST_CLOCK_OK;
2468   GstClockTime rstart, rstop, sstart, sstop, stime;
2469   gboolean do_sync;
2470   GstBaseSinkPrivate *priv;
2471   GstFlowReturn ret;
2472   GstStepInfo *current, *pending;
2473   gboolean stepped;
2474
2475   priv = basesink->priv;
2476
2477 do_step:
2478   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2479   do_sync = TRUE;
2480   stepped = FALSE;
2481
2482   priv->current_rstart = GST_CLOCK_TIME_NONE;
2483
2484   /* get stepping info */
2485   current = &priv->current_step;
2486   pending = &priv->pending_step;
2487
2488   /* get timing information for this object against the render segment */
2489   syncable = gst_base_sink_get_sync_times (basesink, obj,
2490       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2491       current, step_end, obj_type);
2492
2493   if (G_UNLIKELY (stepped))
2494     goto step_skipped;
2495
2496   /* a syncable object needs to participate in preroll and
2497    * clocking. All buffers and EOS are syncable. */
2498   if (G_UNLIKELY (!syncable))
2499     goto not_syncable;
2500
2501   /* store timing info for current object */
2502   priv->current_rstart = rstart;
2503   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2504
2505   /* save sync time for eos when the previous object needed sync */
2506   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2507
2508   /* calculate inter frame spacing */
2509   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2510     GstClockTime in_diff;
2511
2512     in_diff = rstart - priv->prev_rstart;
2513
2514     if (priv->avg_in_diff == -1)
2515       priv->avg_in_diff = in_diff;
2516     else
2517       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2518
2519     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2520         GST_TIME_ARGS (priv->avg_in_diff));
2521
2522   }
2523   priv->prev_rstart = rstart;
2524
2525   if (G_UNLIKELY (priv->earliest_in_time != -1
2526           && rstart < priv->earliest_in_time))
2527     goto qos_dropped;
2528
2529 again:
2530   /* first do preroll, this makes sure we commit our state
2531    * to PAUSED and can continue to PLAYING. We cannot perform
2532    * any clock sync in PAUSED because there is no clock. */
2533   ret = gst_base_sink_do_preroll (basesink, obj);
2534   if (G_UNLIKELY (ret != GST_FLOW_OK))
2535     goto preroll_failed;
2536
2537   /* update the segment with a pending step if the current one is invalid and we
2538    * have a new pending one. We only accept new step updates after a preroll */
2539   if (G_UNLIKELY (pending->valid && !current->valid)) {
2540     start_stepping (basesink, &basesink->segment, pending, current);
2541     goto do_step;
2542   }
2543
2544   /* After rendering we store the position of the last buffer so that we can use
2545    * it to report the position. We need to take the lock here. */
2546   GST_OBJECT_LOCK (basesink);
2547   priv->current_sstart = sstart;
2548   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2549   GST_OBJECT_UNLOCK (basesink);
2550
2551   if (!do_sync)
2552     goto done;
2553
2554   /* adjust for latency */
2555   stime = gst_base_sink_adjust_time (basesink, rstart);
2556
2557   /* adjust for render-delay, avoid underflows */
2558   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2559     if (stime > priv->render_delay)
2560       stime -= priv->render_delay;
2561     else
2562       stime = 0;
2563   }
2564
2565   /* preroll done, we can sync since we are in PLAYING now. */
2566   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2567       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2568       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2569
2570   /* This function will return immediatly if start == -1, no clock
2571    * or sync is disabled with GST_CLOCK_BADTIME. */
2572   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2573
2574   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2575       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2576
2577   /* invalid time, no clock or sync disabled, just render */
2578   if (status == GST_CLOCK_BADTIME)
2579     goto done;
2580
2581   /* waiting could have been interrupted and we can be flushing now */
2582   if (G_UNLIKELY (basesink->flushing))
2583     goto flushing;
2584
2585   /* check for unlocked by a state change, we are not flushing so
2586    * we can try to preroll on the current buffer. */
2587   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2588     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2589     priv->call_preroll = TRUE;
2590     goto again;
2591   }
2592
2593   /* successful syncing done, record observation */
2594   priv->current_jitter = jitter;
2595
2596   /* check if the object should be dropped */
2597   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2598       status, jitter);
2599
2600 done:
2601   return GST_FLOW_OK;
2602
2603   /* ERRORS */
2604 step_skipped:
2605   {
2606     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2607     *late = TRUE;
2608     return GST_FLOW_OK;
2609   }
2610 not_syncable:
2611   {
2612     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2613     return GST_FLOW_OK;
2614   }
2615 qos_dropped:
2616   {
2617     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2618     *late = TRUE;
2619     return GST_FLOW_OK;
2620   }
2621 flushing:
2622   {
2623     GST_DEBUG_OBJECT (basesink, "we are flushing");
2624     return GST_FLOW_WRONG_STATE;
2625   }
2626 preroll_failed:
2627   {
2628     GST_DEBUG_OBJECT (basesink, "preroll failed");
2629     *step_end = FALSE;
2630     return ret;
2631   }
2632 }
2633
2634 static gboolean
2635 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2636     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2637 {
2638   GstEvent *event;
2639   gboolean res;
2640
2641   /* generate Quality-of-Service event */
2642   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2643       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2644       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2645
2646   event = gst_event_new_qos_full (type, proportion, diff, time);
2647
2648   /* send upstream */
2649   res = gst_pad_push_event (basesink->sinkpad, event);
2650
2651   return res;
2652 }
2653
2654 static void
2655 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2656 {
2657   GstBaseSinkPrivate *priv;
2658   GstClockTime start, stop;
2659   GstClockTimeDiff jitter;
2660   GstClockTime pt, entered, left;
2661   GstClockTime duration;
2662   gdouble rate;
2663
2664   priv = sink->priv;
2665
2666   start = priv->current_rstart;
2667
2668   if (priv->current_step.valid)
2669     return;
2670
2671   /* if Quality-of-Service disabled, do nothing */
2672   if (!g_atomic_int_get (&priv->qos_enabled) ||
2673       !GST_CLOCK_TIME_IS_VALID (start))
2674     return;
2675
2676   stop = priv->current_rstop;
2677   jitter = priv->current_jitter;
2678
2679   if (jitter < 0) {
2680     /* this is the time the buffer entered the sink */
2681     if (start < -jitter)
2682       entered = 0;
2683     else
2684       entered = start + jitter;
2685     left = start;
2686   } else {
2687     /* this is the time the buffer entered the sink */
2688     entered = start + jitter;
2689     /* this is the time the buffer left the sink */
2690     left = start + jitter;
2691   }
2692
2693   /* calculate duration of the buffer */
2694   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2695     duration = stop - start;
2696   else
2697     duration = priv->avg_in_diff;
2698
2699   /* if we have the time when the last buffer left us, calculate
2700    * processing time */
2701   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2702     if (entered > priv->last_left) {
2703       pt = entered - priv->last_left;
2704     } else {
2705       pt = 0;
2706     }
2707   } else {
2708     pt = priv->avg_pt;
2709   }
2710
2711   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2712       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2713       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2714       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2715       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2716       GST_TIME_ARGS (duration), jitter);
2717
2718   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2719       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2720       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2721       priv->avg_rate);
2722
2723   /* collect running averages. for first observations, we copy the
2724    * values */
2725   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2726     priv->avg_duration = duration;
2727   else
2728     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2729
2730   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2731     priv->avg_pt = pt;
2732   else
2733     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2734
2735   if (priv->avg_duration != 0)
2736     rate =
2737         gst_guint64_to_gdouble (priv->avg_pt) /
2738         gst_guint64_to_gdouble (priv->avg_duration);
2739   else
2740     rate = 1.0;
2741
2742   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2743     if (dropped || priv->avg_rate < 0.0) {
2744       priv->avg_rate = rate;
2745     } else {
2746       if (rate > 1.0)
2747         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2748       else
2749         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2750     }
2751   }
2752
2753   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2754       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2755       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2756       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2757
2758
2759   if (priv->avg_rate >= 0.0) {
2760     GstQOSType type;
2761     GstClockTimeDiff diff;
2762
2763     /* if we have a valid rate, start sending QoS messages */
2764     if (priv->current_jitter < 0) {
2765       /* make sure we never go below 0 when adding the jitter to the
2766        * timestamp. */
2767       if (priv->current_rstart < -priv->current_jitter)
2768         priv->current_jitter = -priv->current_rstart;
2769     }
2770
2771     if (priv->throttle_time > 0) {
2772       diff = priv->throttle_time;
2773       type = GST_QOS_TYPE_THROTTLE;
2774     } else {
2775       diff = priv->current_jitter;
2776       if (diff <= 0)
2777         type = GST_QOS_TYPE_OVERFLOW;
2778       else
2779         type = GST_QOS_TYPE_UNDERFLOW;
2780     }
2781
2782     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2783         diff);
2784   }
2785
2786   /* record when this buffer will leave us */
2787   priv->last_left = left;
2788 }
2789
2790 /* reset all qos measuring */
2791 static void
2792 gst_base_sink_reset_qos (GstBaseSink * sink)
2793 {
2794   GstBaseSinkPrivate *priv;
2795
2796   priv = sink->priv;
2797
2798   priv->last_render_time = GST_CLOCK_TIME_NONE;
2799   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2800   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2801   priv->last_left = GST_CLOCK_TIME_NONE;
2802   priv->avg_duration = GST_CLOCK_TIME_NONE;
2803   priv->avg_pt = GST_CLOCK_TIME_NONE;
2804   priv->avg_rate = -1.0;
2805   priv->avg_render = GST_CLOCK_TIME_NONE;
2806   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2807   priv->rendered = 0;
2808   priv->dropped = 0;
2809
2810 }
2811
2812 /* Checks if the object was scheduled too late.
2813  *
2814  * rstart/rstop contain the running_time start and stop values
2815  * of the object.
2816  *
2817  * status and jitter contain the return values from the clock wait.
2818  *
2819  * returns TRUE if the buffer was too late.
2820  */
2821 static gboolean
2822 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2823     GstClockTime rstart, GstClockTime rstop,
2824     GstClockReturn status, GstClockTimeDiff jitter)
2825 {
2826   gboolean late;
2827   gint64 max_lateness;
2828   GstBaseSinkPrivate *priv;
2829
2830   priv = basesink->priv;
2831
2832   late = FALSE;
2833
2834   /* only for objects that were too late */
2835   if (G_LIKELY (status != GST_CLOCK_EARLY))
2836     goto in_time;
2837
2838   max_lateness = basesink->abidata.ABI.max_lateness;
2839
2840   /* check if frame dropping is enabled */
2841   if (max_lateness == -1)
2842     goto no_drop;
2843
2844   /* only check for buffers */
2845   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2846     goto not_buffer;
2847
2848   /* can't do check if we don't have a timestamp */
2849   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2850     goto no_timestamp;
2851
2852   /* we can add a valid stop time */
2853   if (GST_CLOCK_TIME_IS_VALID (rstop))
2854     max_lateness += rstop;
2855   else {
2856     max_lateness += rstart;
2857     /* no stop time, use avg frame diff */
2858     if (priv->avg_in_diff != -1)
2859       max_lateness += priv->avg_in_diff;
2860   }
2861
2862   /* if the jitter bigger than duration and lateness we are too late */
2863   if ((late = rstart + jitter > max_lateness)) {
2864     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2865         "buffer is too late %" GST_TIME_FORMAT
2866         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2867         GST_TIME_ARGS (max_lateness));
2868     /* !!emergency!!, if we did not receive anything valid for more than a
2869      * second, render it anyway so the user sees something */
2870     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2871         rstart - priv->last_render_time > GST_SECOND) {
2872       late = FALSE;
2873       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2874           (_("A lot of buffers are being dropped.")),
2875           ("There may be a timestamping problem, or this computer is too slow."));
2876       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2877           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2878           GST_TIME_ARGS (priv->last_render_time));
2879     }
2880   }
2881
2882 done:
2883   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2884     priv->last_render_time = rstart;
2885     /* the next allowed input timestamp */
2886     if (priv->throttle_time > 0)
2887       priv->earliest_in_time = rstart + priv->throttle_time;
2888   }
2889   return late;
2890
2891   /* all is fine */
2892 in_time:
2893   {
2894     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2895     goto done;
2896   }
2897 no_drop:
2898   {
2899     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2900     goto done;
2901   }
2902 not_buffer:
2903   {
2904     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2905     return FALSE;
2906   }
2907 no_timestamp:
2908   {
2909     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2910     return FALSE;
2911   }
2912 }
2913
2914 /* called before and after calling the render vmethod. It keeps track of how
2915  * much time was spent in the render method and is used to check if we are
2916  * flooded */
2917 static void
2918 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2919 {
2920   GstBaseSinkPrivate *priv;
2921
2922   priv = basesink->priv;
2923
2924   if (start) {
2925     priv->start = gst_util_get_timestamp ();
2926   } else {
2927     GstClockTime elapsed;
2928
2929     priv->stop = gst_util_get_timestamp ();
2930
2931     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2932
2933     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2934       priv->avg_render = elapsed;
2935     else
2936       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2937
2938     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2939         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2940   }
2941 }
2942
2943 /* with STREAM_LOCK, PREROLL_LOCK,
2944  *
2945  * Synchronize the object on the clock and then render it.
2946  *
2947  * takes ownership of obj.
2948  */
2949 static GstFlowReturn
2950 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2951     guint8 obj_type, gpointer obj)
2952 {
2953   GstFlowReturn ret;
2954   GstBaseSinkClass *bclass;
2955   gboolean late, step_end;
2956   gpointer sync_obj;
2957   GstBaseSinkPrivate *priv;
2958
2959   priv = basesink->priv;
2960
2961   if (OBJ_IS_BUFFERLIST (obj_type)) {
2962     /*
2963      * If buffer list, use the first group buffer within the list
2964      * for syncing
2965      */
2966     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2967     g_assert (NULL != sync_obj);
2968   } else {
2969     sync_obj = obj;
2970   }
2971
2972 again:
2973   late = FALSE;
2974   step_end = FALSE;
2975
2976   /* synchronize this object, non syncable objects return OK
2977    * immediatly. */
2978   ret =
2979       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2980       obj_type);
2981   if (G_UNLIKELY (ret != GST_FLOW_OK))
2982     goto sync_failed;
2983
2984   /* and now render, event or buffer/buffer list. */
2985   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2986     /* drop late buffers unconditionally, let's hope it's unlikely */
2987     if (G_UNLIKELY (late))
2988       goto dropped;
2989
2990     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2991
2992     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2993             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2994       gint do_qos;
2995
2996       /* read once, to get same value before and after */
2997       do_qos = g_atomic_int_get (&priv->qos_enabled);
2998
2999       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
3000
3001       /* record rendering time for QoS and stats */
3002       if (do_qos)
3003         gst_base_sink_do_render_stats (basesink, TRUE);
3004
3005       if (!OBJ_IS_BUFFERLIST (obj_type)) {
3006         GstBuffer *buf;
3007
3008         /* For buffer lists do not set last buffer. Creating buffer
3009          * with meaningful data can be done only with memcpy which will
3010          * significantly affect performance */
3011         buf = GST_BUFFER_CAST (obj);
3012         gst_base_sink_set_last_buffer (basesink, buf);
3013
3014         ret = bclass->render (basesink, buf);
3015       } else {
3016         GstBufferList *buflist;
3017
3018         buflist = GST_BUFFER_LIST_CAST (obj);
3019
3020         ret = bclass->render_list (basesink, buflist);
3021       }
3022
3023       if (do_qos)
3024         gst_base_sink_do_render_stats (basesink, FALSE);
3025
3026       if (ret == GST_FLOW_STEP)
3027         goto again;
3028
3029       if (G_UNLIKELY (basesink->flushing))
3030         goto flushing;
3031
3032       priv->rendered++;
3033     }
3034   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3035     GstEvent *event = GST_EVENT_CAST (obj);
3036     gboolean event_res = TRUE;
3037     GstEventType type;
3038
3039     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3040
3041     type = GST_EVENT_TYPE (event);
3042
3043     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3044         gst_event_type_get_name (type));
3045
3046     if (bclass->event)
3047       event_res = bclass->event (basesink, event);
3048
3049     /* when we get here we could be flushing again when the event handler calls
3050      * _wait_eos(). We have to ignore this object in that case. */
3051     if (G_UNLIKELY (basesink->flushing))
3052       goto flushing;
3053
3054     if (G_LIKELY (event_res)) {
3055       guint32 seqnum;
3056
3057       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3058       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3059
3060       switch (type) {
3061         case GST_EVENT_EOS:
3062         {
3063           GstMessage *message;
3064
3065           /* the EOS event is completely handled so we mark
3066            * ourselves as being in the EOS state. eos is also
3067            * protected by the object lock so we can read it when
3068            * answering the POSITION query. */
3069           GST_OBJECT_LOCK (basesink);
3070           basesink->eos = TRUE;
3071           GST_OBJECT_UNLOCK (basesink);
3072
3073           /* ok, now we can post the message */
3074           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3075
3076           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3077           gst_message_set_seqnum (message, seqnum);
3078           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3079           break;
3080         }
3081         case GST_EVENT_NEWSEGMENT:
3082           /* configure the segment */
3083           gst_base_sink_configure_segment (basesink, pad, event,
3084               &basesink->segment);
3085           break;
3086         case GST_EVENT_SINK_MESSAGE:{
3087           GstMessage *msg = NULL;
3088
3089           gst_event_parse_sink_message (event, &msg);
3090
3091           if (msg)
3092             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3093         }
3094         default:
3095           break;
3096       }
3097     }
3098   } else {
3099     g_return_val_if_reached (GST_FLOW_ERROR);
3100   }
3101
3102 done:
3103   if (step_end) {
3104     /* the step ended, check if we need to activate a new step */
3105     GST_DEBUG_OBJECT (basesink, "step ended");
3106     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3107         priv->current_rstart, priv->current_rstop, basesink->eos);
3108     goto again;
3109   }
3110
3111   gst_base_sink_perform_qos (basesink, late);
3112
3113   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3114   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3115   return ret;
3116
3117   /* ERRORS */
3118 sync_failed:
3119   {
3120     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3121     goto done;
3122   }
3123 dropped:
3124   {
3125     priv->dropped++;
3126     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3127
3128     if (g_atomic_int_get (&priv->qos_enabled)) {
3129       GstMessage *qos_msg;
3130       GstClockTime timestamp, duration;
3131
3132       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3133       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3134
3135       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3136           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3137           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3138           GST_TIME_ARGS (priv->current_rstart),
3139           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3140           GST_TIME_ARGS (duration));
3141       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3142           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3143           priv->rendered, priv->dropped);
3144
3145       qos_msg =
3146           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3147           priv->current_rstart, priv->current_sstart, timestamp, duration);
3148       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3149           1000000);
3150       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3151           priv->dropped);
3152       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3153     }
3154     goto done;
3155   }
3156 flushing:
3157   {
3158     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3159     gst_mini_object_unref (obj);
3160     return GST_FLOW_WRONG_STATE;
3161   }
3162 }
3163
3164 /* with STREAM_LOCK, PREROLL_LOCK
3165  *
3166  * Perform preroll on the given object. For buffers this means
3167  * calling the preroll subclass method.
3168  * If that succeeds, the state will be commited.
3169  *
3170  * function does not take ownership of obj.
3171  */
3172 static GstFlowReturn
3173 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3174     GstMiniObject * obj)
3175 {
3176   GstFlowReturn ret;
3177
3178   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3179
3180   /* if it's a buffer, we need to call the preroll method */
3181   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3182     GstBaseSinkClass *bclass;
3183     GstBuffer *buf;
3184     GstClockTime timestamp;
3185
3186     if (OBJ_IS_BUFFERLIST (obj_type)) {
3187       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3188       g_assert (NULL != buf);
3189     } else {
3190       buf = GST_BUFFER_CAST (obj);
3191     }
3192
3193     timestamp = GST_BUFFER_TIMESTAMP (buf);
3194
3195     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3196         GST_TIME_ARGS (timestamp));
3197
3198     /*
3199      * For buffer lists do not set last buffer. Creating buffer
3200      * with meaningful data can be done only with memcpy which will
3201      * significantly affect performance
3202      */
3203     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3204       gst_base_sink_set_last_buffer (basesink, buf);
3205     }
3206
3207     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3208     if (bclass->preroll)
3209       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3210         goto preroll_failed;
3211
3212     basesink->priv->call_preroll = FALSE;
3213   }
3214
3215   /* commit state */
3216   if (G_LIKELY (basesink->playing_async)) {
3217     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3218       goto stopping;
3219   }
3220
3221   return GST_FLOW_OK;
3222
3223   /* ERRORS */
3224 preroll_failed:
3225   {
3226     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3227     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3228     return ret;
3229   }
3230 stopping:
3231   {
3232     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3233     return GST_FLOW_WRONG_STATE;
3234   }
3235 }
3236
3237 /* with STREAM_LOCK, PREROLL_LOCK
3238  *
3239  * Queue an object for rendering.
3240  * The first prerollable object queued will complete the preroll. If the
3241  * preroll queue is filled, we render all the objects in the queue.
3242  *
3243  * This function takes ownership of the object.
3244  */
3245 static GstFlowReturn
3246 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3247     guint8 obj_type, gpointer obj, gboolean prerollable)
3248 {
3249   GstFlowReturn ret = GST_FLOW_OK;
3250   gint length;
3251   GQueue *q;
3252
3253   if (G_UNLIKELY (basesink->need_preroll)) {
3254     if (G_LIKELY (prerollable))
3255       basesink->preroll_queued++;
3256
3257     length = basesink->preroll_queued;
3258
3259     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3260
3261     /* first prerollable item needs to finish the preroll */
3262     if (length == 1) {
3263       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3264       if (G_UNLIKELY (ret != GST_FLOW_OK))
3265         goto preroll_failed;
3266     }
3267     /* need to recheck if we need preroll, commmit state during preroll
3268      * could have made us not need more preroll. */
3269     if (G_UNLIKELY (basesink->need_preroll)) {
3270       /* see if we can render now, if we can't add the object to the preroll
3271        * queue. */
3272       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3273         goto more_preroll;
3274     }
3275   }
3276   /* we can start rendering (or blocking) the queued object
3277    * if any. */
3278   q = basesink->preroll_queue;
3279   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3280     GstMiniObject *o;
3281     guint8 ot;
3282
3283     o = g_queue_pop_head (q);
3284     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3285
3286     ot = get_object_type (o);
3287
3288     /* do something with the return value */
3289     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3290     if (ret != GST_FLOW_OK)
3291       goto dequeue_failed;
3292   }
3293
3294   /* now render the object */
3295   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3296   basesink->preroll_queued = 0;
3297
3298   return ret;
3299
3300   /* special cases */
3301 preroll_failed:
3302   {
3303     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3304         gst_flow_get_name (ret));
3305     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3306     return ret;
3307   }
3308 more_preroll:
3309   {
3310     /* add object to the queue and return */
3311     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3312         length, basesink->preroll_queue_max_len);
3313     g_queue_push_tail (basesink->preroll_queue, obj);
3314     return GST_FLOW_OK;
3315   }
3316 dequeue_failed:
3317   {
3318     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3319         gst_flow_get_name (ret));
3320     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3321     return ret;
3322   }
3323 }
3324
3325 /* with STREAM_LOCK
3326  *
3327  * This function grabs the PREROLL_LOCK and adds the object to
3328  * the queue.
3329  *
3330  * This function takes ownership of obj.
3331  *
3332  * Note: Only GstEvent seem to be passed to this private method
3333  */
3334 static GstFlowReturn
3335 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3336     GstMiniObject * obj, gboolean prerollable)
3337 {
3338   GstFlowReturn ret;
3339
3340   GST_PAD_PREROLL_LOCK (pad);
3341   if (G_UNLIKELY (basesink->flushing))
3342     goto flushing;
3343
3344   if (G_UNLIKELY (basesink->priv->received_eos))
3345     goto was_eos;
3346
3347   ret =
3348       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3349       prerollable);
3350   GST_PAD_PREROLL_UNLOCK (pad);
3351
3352   return ret;
3353
3354   /* ERRORS */
3355 flushing:
3356   {
3357     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3358     GST_PAD_PREROLL_UNLOCK (pad);
3359     gst_mini_object_unref (obj);
3360     return GST_FLOW_WRONG_STATE;
3361   }
3362 was_eos:
3363   {
3364     GST_DEBUG_OBJECT (basesink,
3365         "we are EOS, dropping object, return UNEXPECTED");
3366     GST_PAD_PREROLL_UNLOCK (pad);
3367     gst_mini_object_unref (obj);
3368     return GST_FLOW_UNEXPECTED;
3369   }
3370 }
3371
3372 static void
3373 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3374 {
3375   /* make sure we are not blocked on the clock also clear any pending
3376    * eos state. */
3377   gst_base_sink_set_flushing (basesink, pad, TRUE);
3378
3379   /* we grab the stream lock but that is not needed since setting the
3380    * sink to flushing would make sure no state commit is being done
3381    * anymore */
3382   GST_PAD_STREAM_LOCK (pad);
3383   gst_base_sink_reset_qos (basesink);
3384   /* and we need to commit our state again on the next
3385    * prerolled buffer */
3386   basesink->playing_async = TRUE;
3387   if (basesink->priv->async_enabled) {
3388     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3389   } else {
3390     /* start time reset in above case as well;
3391      * arranges for a.o. proper position reporting when flushing in PAUSED */
3392     gst_element_set_start_time (GST_ELEMENT_CAST (basesink), 0);
3393     basesink->priv->have_latency = TRUE;
3394   }
3395   gst_base_sink_set_last_buffer (basesink, NULL);
3396   GST_PAD_STREAM_UNLOCK (pad);
3397 }
3398
3399 static void
3400 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3401 {
3402   /* unset flushing so we can accept new data, this also flushes out any EOS
3403    * event. */
3404   gst_base_sink_set_flushing (basesink, pad, FALSE);
3405
3406   /* for position reporting */
3407   GST_OBJECT_LOCK (basesink);
3408   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3409   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3410   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3411   basesink->priv->call_preroll = TRUE;
3412   basesink->priv->current_step.valid = FALSE;
3413   basesink->priv->pending_step.valid = FALSE;
3414   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3415     /* we need new segment info after the flush. */
3416     basesink->have_newsegment = FALSE;
3417     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3418     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3419   }
3420   GST_OBJECT_UNLOCK (basesink);
3421 }
3422
3423 static gboolean
3424 gst_base_sink_event (GstPad * pad, GstEvent * event)
3425 {
3426   GstBaseSink *basesink;
3427   gboolean result = TRUE;
3428   GstBaseSinkClass *bclass;
3429
3430   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3431   if (G_UNLIKELY (basesink == NULL)) {
3432     gst_event_unref (event);
3433     return FALSE;
3434   }
3435
3436   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3437
3438   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3439       event);
3440
3441   switch (GST_EVENT_TYPE (event)) {
3442     case GST_EVENT_EOS:
3443     {
3444       GstFlowReturn ret;
3445
3446       GST_PAD_PREROLL_LOCK (pad);
3447       if (G_UNLIKELY (basesink->flushing))
3448         goto flushing;
3449
3450       if (G_UNLIKELY (basesink->priv->received_eos)) {
3451         /* we can't accept anything when we are EOS */
3452         result = FALSE;
3453         gst_event_unref (event);
3454       } else {
3455         /* we set the received EOS flag here so that we can use it when testing if
3456          * we are prerolled and to refuse more buffers. */
3457         basesink->priv->received_eos = TRUE;
3458
3459         /* EOS is a prerollable object, we call the unlocked version because it
3460          * does not check the received_eos flag. */
3461         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3462             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3463         if (G_UNLIKELY (ret != GST_FLOW_OK))
3464           result = FALSE;
3465       }
3466       GST_PAD_PREROLL_UNLOCK (pad);
3467       break;
3468     }
3469     case GST_EVENT_NEWSEGMENT:
3470     {
3471       GstFlowReturn ret;
3472       gboolean update;
3473
3474       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3475
3476       GST_PAD_PREROLL_LOCK (pad);
3477       if (G_UNLIKELY (basesink->flushing))
3478         goto flushing;
3479
3480       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3481           NULL, NULL);
3482
3483       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3484         /* we can't accept anything when we are EOS */
3485         result = FALSE;
3486         gst_event_unref (event);
3487       } else {
3488         /* the new segment is a non prerollable item and does not block anything,
3489          * we need to configure the current clipping segment and insert the event
3490          * in the queue to serialize it with the buffers for rendering. */
3491         gst_base_sink_configure_segment (basesink, pad, event,
3492             basesink->abidata.ABI.clip_segment);
3493
3494         ret =
3495             gst_base_sink_queue_object_unlocked (basesink, pad,
3496             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3497         if (G_UNLIKELY (ret != GST_FLOW_OK))
3498           result = FALSE;
3499         else {
3500           GST_OBJECT_LOCK (basesink);
3501           basesink->have_newsegment = TRUE;
3502           GST_OBJECT_UNLOCK (basesink);
3503         }
3504       }
3505       GST_PAD_PREROLL_UNLOCK (pad);
3506       break;
3507     }
3508     case GST_EVENT_FLUSH_START:
3509       if (bclass->event)
3510         bclass->event (basesink, event);
3511
3512       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3513
3514       gst_base_sink_flush_start (basesink, pad);
3515
3516       gst_event_unref (event);
3517       break;
3518     case GST_EVENT_FLUSH_STOP:
3519       if (bclass->event)
3520         bclass->event (basesink, event);
3521
3522       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3523
3524       gst_base_sink_flush_stop (basesink, pad);
3525
3526       gst_event_unref (event);
3527       break;
3528     default:
3529       /* other events are sent to queue or subclass depending on if they
3530        * are serialized. */
3531       if (GST_EVENT_IS_SERIALIZED (event)) {
3532         gst_base_sink_queue_object (basesink, pad,
3533             GST_MINI_OBJECT_CAST (event), FALSE);
3534       } else {
3535         if (bclass->event)
3536           bclass->event (basesink, event);
3537         gst_event_unref (event);
3538       }
3539       break;
3540   }
3541 done:
3542   gst_object_unref (basesink);
3543
3544   return result;
3545
3546   /* ERRORS */
3547 flushing:
3548   {
3549     GST_DEBUG_OBJECT (basesink, "we are flushing");
3550     GST_PAD_PREROLL_UNLOCK (pad);
3551     result = FALSE;
3552     gst_event_unref (event);
3553     goto done;
3554   }
3555 }
3556
3557 /* default implementation to calculate the start and end
3558  * timestamps on a buffer, subclasses can override
3559  */
3560 static void
3561 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3562     GstClockTime * start, GstClockTime * end)
3563 {
3564   GstClockTime timestamp, duration;
3565
3566   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3567   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3568
3569     /* get duration to calculate end time */
3570     duration = GST_BUFFER_DURATION (buffer);
3571     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3572       *end = timestamp + duration;
3573     }
3574     *start = timestamp;
3575   }
3576 }
3577
3578 /* must be called with PREROLL_LOCK */
3579 static gboolean
3580 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3581 {
3582   gboolean is_prerolled, res;
3583
3584   /* we have 2 cases where the PREROLL_LOCK is released:
3585    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3586    *  2) we are syncing on the clock
3587    */
3588   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3589   res = !is_prerolled;
3590
3591   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3592       basesink->have_preroll, basesink->priv->received_eos, res);
3593
3594   return res;
3595 }
3596
3597 /* with STREAM_LOCK, PREROLL_LOCK
3598  *
3599  * Takes a buffer and compare the timestamps with the last segment.
3600  * If the buffer falls outside of the segment boundaries, drop it.
3601  * Else queue the buffer for preroll and rendering.
3602  *
3603  * This function takes ownership of the buffer.
3604  */
3605 static GstFlowReturn
3606 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3607     guint8 obj_type, gpointer obj)
3608 {
3609   GstBaseSinkClass *bclass;
3610   GstFlowReturn result;
3611   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3612   GstSegment *clip_segment;
3613   GstBuffer *time_buf;
3614
3615   if (G_UNLIKELY (basesink->flushing))
3616     goto flushing;
3617
3618   if (G_UNLIKELY (basesink->priv->received_eos))
3619     goto was_eos;
3620
3621   if (OBJ_IS_BUFFERLIST (obj_type)) {
3622     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3623     g_assert (NULL != time_buf);
3624   } else {
3625     time_buf = GST_BUFFER_CAST (obj);
3626   }
3627
3628   /* for code clarity */
3629   clip_segment = basesink->abidata.ABI.clip_segment;
3630
3631   if (G_UNLIKELY (!basesink->have_newsegment)) {
3632     gboolean sync;
3633
3634     sync = gst_base_sink_get_sync (basesink);
3635     if (sync) {
3636       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3637           (_("Internal data flow problem.")),
3638           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3639     }
3640
3641     /* this means this sink will assume timestamps start from 0 */
3642     GST_OBJECT_LOCK (basesink);
3643     clip_segment->start = 0;
3644     clip_segment->stop = -1;
3645     basesink->segment.start = 0;
3646     basesink->segment.stop = -1;
3647     basesink->have_newsegment = TRUE;
3648     GST_OBJECT_UNLOCK (basesink);
3649   }
3650
3651   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3652
3653   /* check if the buffer needs to be dropped, we first ask the subclass for the
3654    * start and end */
3655   if (bclass->get_times)
3656     bclass->get_times (basesink, time_buf, &start, &end);
3657
3658   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3659     /* if the subclass does not want sync, we use our own values so that we at
3660      * least clip the buffer to the segment */
3661     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3662   }
3663
3664   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3665       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3666
3667   /* a dropped buffer does not participate in anything */
3668   if (GST_CLOCK_TIME_IS_VALID (start) &&
3669       (clip_segment->format == GST_FORMAT_TIME)) {
3670     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3671                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3672       goto out_of_segment;
3673   }
3674
3675   /* now we can process the buffer in the queue, this function takes ownership
3676    * of the buffer */
3677   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3678       obj_type, obj, TRUE);
3679   return result;
3680
3681   /* ERRORS */
3682 flushing:
3683   {
3684     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3685     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3686     return GST_FLOW_WRONG_STATE;
3687   }
3688 was_eos:
3689   {
3690     GST_DEBUG_OBJECT (basesink,
3691         "we are EOS, dropping object, return UNEXPECTED");
3692     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3693     return GST_FLOW_UNEXPECTED;
3694   }
3695 out_of_segment:
3696   {
3697     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3698     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3699     return GST_FLOW_OK;
3700   }
3701 }
3702
3703 /* with STREAM_LOCK
3704  */
3705 static GstFlowReturn
3706 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3707     guint8 obj_type, gpointer obj)
3708 {
3709   GstFlowReturn result;
3710
3711   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3712     goto wrong_mode;
3713
3714   GST_PAD_PREROLL_LOCK (pad);
3715   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3716   GST_PAD_PREROLL_UNLOCK (pad);
3717
3718 done:
3719   return result;
3720
3721   /* ERRORS */
3722 wrong_mode:
3723   {
3724     GST_OBJECT_LOCK (pad);
3725     GST_WARNING_OBJECT (basesink,
3726         "Push on pad %s:%s, but it was not activated in push mode",
3727         GST_DEBUG_PAD_NAME (pad));
3728     GST_OBJECT_UNLOCK (pad);
3729     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3730     /* we don't post an error message this will signal to the peer
3731      * pushing that EOS is reached. */
3732     result = GST_FLOW_UNEXPECTED;
3733     goto done;
3734   }
3735 }
3736
3737 static GstFlowReturn
3738 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3739 {
3740   GstBaseSink *basesink;
3741
3742   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3743
3744   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3745 }
3746
3747 static GstFlowReturn
3748 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3749 {
3750   GstBaseSink *basesink;
3751   GstBaseSinkClass *bclass;
3752   GstFlowReturn result;
3753
3754   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3755   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3756
3757   if (G_LIKELY (bclass->render_list)) {
3758     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3759   } else {
3760     GstBufferListIterator *it;
3761     GstBuffer *group;
3762
3763     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3764
3765     it = gst_buffer_list_iterate (list);
3766
3767     if (gst_buffer_list_iterator_next_group (it)) {
3768       do {
3769         group = gst_buffer_list_iterator_merge_group (it);
3770         if (group == NULL) {
3771           group = gst_buffer_new ();
3772           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3773         } else {
3774           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3775         }
3776         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3777       } while (result == GST_FLOW_OK
3778           && gst_buffer_list_iterator_next_group (it));
3779     } else {
3780       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3781       result =
3782           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3783           gst_buffer_new ());
3784     }
3785     gst_buffer_list_iterator_free (it);
3786     gst_buffer_list_unref (list);
3787   }
3788   return result;
3789 }
3790
3791
3792 static gboolean
3793 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3794 {
3795   gboolean res = TRUE;
3796
3797   /* update our offset if the start/stop position was updated */
3798   if (segment->format == GST_FORMAT_BYTES) {
3799     segment->time = segment->start;
3800   } else if (segment->start == 0) {
3801     /* seek to start, we can implement a default for this. */
3802     segment->time = 0;
3803   } else {
3804     res = FALSE;
3805     GST_INFO_OBJECT (sink, "Can't do a default seek");
3806   }
3807
3808   return res;
3809 }
3810
3811 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3812
3813 static gboolean
3814 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3815     GstEvent * event, GstSegment * segment)
3816 {
3817   /* By default, we try one of 2 things:
3818    *   - For absolute seek positions, convert the requested position to our
3819    *     configured processing format and place it in the output segment \
3820    *   - For relative seek positions, convert our current (input) values to the
3821    *     seek format, adjust by the relative seek offset and then convert back to
3822    *     the processing format
3823    */
3824   GstSeekType cur_type, stop_type;
3825   gint64 cur, stop;
3826   GstSeekFlags flags;
3827   GstFormat seek_format, dest_format;
3828   gdouble rate;
3829   gboolean update;
3830   gboolean res = TRUE;
3831
3832   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3833       &cur_type, &cur, &stop_type, &stop);
3834   dest_format = segment->format;
3835
3836   if (seek_format == dest_format) {
3837     gst_segment_set_seek (segment, rate, seek_format, flags,
3838         cur_type, cur, stop_type, stop, &update);
3839     return TRUE;
3840   }
3841
3842   if (cur_type != GST_SEEK_TYPE_NONE) {
3843     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3844     res =
3845         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3846         &cur);
3847     cur_type = GST_SEEK_TYPE_SET;
3848   }
3849
3850   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3851     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3852     res =
3853         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3854         &stop);
3855     stop_type = GST_SEEK_TYPE_SET;
3856   }
3857
3858   /* And finally, configure our output segment in the desired format */
3859   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3860       stop_type, stop, &update);
3861
3862   if (!res)
3863     goto no_format;
3864
3865   return res;
3866
3867 no_format:
3868   {
3869     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3870     return FALSE;
3871   }
3872 }
3873
3874 /* perform a seek, only executed in pull mode */
3875 static gboolean
3876 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3877 {
3878   gboolean flush;
3879   gdouble rate;
3880   GstFormat seek_format, dest_format;
3881   GstSeekFlags flags;
3882   GstSeekType cur_type, stop_type;
3883   gboolean seekseg_configured = FALSE;
3884   gint64 cur, stop;
3885   gboolean update, res = TRUE;
3886   GstSegment seeksegment;
3887
3888   dest_format = sink->segment.format;
3889
3890   if (event) {
3891     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3892     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3893         &cur_type, &cur, &stop_type, &stop);
3894
3895     flush = flags & GST_SEEK_FLAG_FLUSH;
3896   } else {
3897     GST_DEBUG_OBJECT (sink, "performing seek without event");
3898     flush = FALSE;
3899   }
3900
3901   if (flush) {
3902     GST_DEBUG_OBJECT (sink, "flushing upstream");
3903     gst_pad_push_event (pad, gst_event_new_flush_start ());
3904     gst_base_sink_flush_start (sink, pad);
3905   } else {
3906     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3907   }
3908
3909   GST_PAD_STREAM_LOCK (pad);
3910
3911   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3912    * copy the current segment info into the temp segment that we can actually
3913    * attempt the seek with. We only update the real segment if the seek suceeds. */
3914   if (!seekseg_configured) {
3915     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3916
3917     /* now configure the final seek segment */
3918     if (event) {
3919       if (sink->segment.format != seek_format) {
3920         /* OK, here's where we give the subclass a chance to convert the relative
3921          * seek into an absolute one in the processing format. We set up any
3922          * absolute seek above, before taking the stream lock. */
3923         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3924                 &seeksegment)) {
3925           GST_DEBUG_OBJECT (sink,
3926               "Preparing the seek failed after flushing. " "Aborting seek");
3927           res = FALSE;
3928         }
3929       } else {
3930         /* The seek format matches our processing format, no need to ask the
3931          * the subclass to configure the segment. */
3932         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3933             cur_type, cur, stop_type, stop, &update);
3934       }
3935     }
3936     /* Else, no seek event passed, so we're just (re)starting the
3937        current segment. */
3938   }
3939
3940   if (res) {
3941     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3942         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3943         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3944
3945     /* do the seek, segment.last_stop contains the new position. */
3946     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3947   }
3948
3949
3950   if (flush) {
3951     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3952     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3953     gst_base_sink_flush_stop (sink, pad);
3954   } else if (res && sink->abidata.ABI.running) {
3955     /* we are running the current segment and doing a non-flushing seek,
3956      * close the segment first based on the last_stop. */
3957     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3958         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3959   }
3960
3961   /* The subclass must have converted the segment to the processing format
3962    * by now */
3963   if (res && seeksegment.format != dest_format) {
3964     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3965         "in the correct format. Aborting seek.");
3966     res = FALSE;
3967   }
3968
3969   /* if successfull seek, we update our real segment and push
3970    * out the new segment. */
3971   if (res) {
3972     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3973
3974     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3975       gst_element_post_message (GST_ELEMENT (sink),
3976           gst_message_new_segment_start (GST_OBJECT (sink),
3977               sink->segment.format, sink->segment.last_stop));
3978     }
3979   }
3980
3981   sink->priv->discont = TRUE;
3982   sink->abidata.ABI.running = TRUE;
3983
3984   GST_PAD_STREAM_UNLOCK (pad);
3985
3986   return res;
3987 }
3988
3989 static void
3990 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3991     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3992     gboolean flush, gboolean intermediate)
3993 {
3994   GST_OBJECT_LOCK (sink);
3995   pending->seqnum = seqnum;
3996   pending->format = format;
3997   pending->amount = amount;
3998   pending->position = 0;
3999   pending->rate = rate;
4000   pending->flush = flush;
4001   pending->intermediate = intermediate;
4002   pending->valid = TRUE;
4003   /* flush invalidates the current stepping segment */
4004   if (flush)
4005     current->valid = FALSE;
4006   GST_OBJECT_UNLOCK (sink);
4007 }
4008
4009 static gboolean
4010 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
4011 {
4012   GstBaseSinkPrivate *priv;
4013   GstBaseSinkClass *bclass;
4014   gboolean flush, intermediate;
4015   gdouble rate;
4016   GstFormat format;
4017   guint64 amount;
4018   guint seqnum;
4019   GstStepInfo *pending, *current;
4020   GstMessage *message;
4021
4022   bclass = GST_BASE_SINK_GET_CLASS (sink);
4023   priv = sink->priv;
4024
4025   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
4026
4027   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
4028   seqnum = gst_event_get_seqnum (event);
4029
4030   pending = &priv->pending_step;
4031   current = &priv->current_step;
4032
4033   /* post message first */
4034   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4035       amount, rate, flush, intermediate);
4036   gst_message_set_seqnum (message, seqnum);
4037   gst_element_post_message (GST_ELEMENT (sink), message);
4038
4039   if (flush) {
4040     /* we need to call ::unlock before locking PREROLL_LOCK
4041      * since we lock it before going into ::render */
4042     if (bclass->unlock)
4043       bclass->unlock (sink);
4044
4045     GST_PAD_PREROLL_LOCK (sink->sinkpad);
4046     /* now that we have the PREROLL lock, clear our unlock request */
4047     if (bclass->unlock_stop)
4048       bclass->unlock_stop (sink);
4049
4050     /* update the stepinfo and make it valid */
4051     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4052         intermediate);
4053
4054     if (sink->priv->async_enabled) {
4055       /* and we need to commit our state again on the next
4056        * prerolled buffer */
4057       sink->playing_async = TRUE;
4058       priv->pending_step.need_preroll = TRUE;
4059       sink->need_preroll = FALSE;
4060       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4061     } else {
4062       sink->priv->have_latency = TRUE;
4063       sink->need_preroll = FALSE;
4064     }
4065     priv->current_sstart = GST_CLOCK_TIME_NONE;
4066     priv->current_sstop = GST_CLOCK_TIME_NONE;
4067     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4068     priv->call_preroll = TRUE;
4069     gst_base_sink_set_last_buffer (sink, NULL);
4070     gst_base_sink_reset_qos (sink);
4071
4072     if (sink->clock_id) {
4073       gst_clock_id_unschedule (sink->clock_id);
4074     }
4075
4076     if (sink->have_preroll) {
4077       GST_DEBUG_OBJECT (sink, "signal waiter");
4078       priv->step_unlock = TRUE;
4079       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4080     }
4081     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4082   } else {
4083     /* update the stepinfo and make it valid */
4084     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4085         intermediate);
4086   }
4087
4088   return TRUE;
4089 }
4090
4091 /* with STREAM_LOCK
4092  */
4093 static void
4094 gst_base_sink_loop (GstPad * pad)
4095 {
4096   GstBaseSink *basesink;
4097   GstBuffer *buf = NULL;
4098   GstFlowReturn result;
4099   guint blocksize;
4100   guint64 offset;
4101
4102   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4103
4104   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4105
4106   if ((blocksize = basesink->priv->blocksize) == 0)
4107     blocksize = -1;
4108
4109   offset = basesink->segment.last_stop;
4110
4111   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4112       offset, blocksize);
4113
4114   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4115   if (G_UNLIKELY (result != GST_FLOW_OK))
4116     goto paused;
4117
4118   if (G_UNLIKELY (buf == NULL))
4119     goto no_buffer;
4120
4121   offset += GST_BUFFER_SIZE (buf);
4122
4123   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4124
4125   GST_PAD_PREROLL_LOCK (pad);
4126   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4127   GST_PAD_PREROLL_UNLOCK (pad);
4128   if (G_UNLIKELY (result != GST_FLOW_OK))
4129     goto paused;
4130
4131   return;
4132
4133   /* ERRORS */
4134 paused:
4135   {
4136     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4137         gst_flow_get_name (result));
4138     gst_pad_pause_task (pad);
4139     if (result == GST_FLOW_UNEXPECTED) {
4140       /* perform EOS logic */
4141       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4142         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4143             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4144                 basesink->segment.format, basesink->segment.last_stop));
4145       } else {
4146         gst_base_sink_event (pad, gst_event_new_eos ());
4147       }
4148     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4149       /* for fatal errors we post an error message, post the error
4150        * first so the app knows about the error first. 
4151        * wrong-state is not a fatal error because it happens due to
4152        * flushing and posting an error message in that case is the
4153        * wrong thing to do, e.g. when basesrc is doing a flushing
4154        * seek. */
4155       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4156           (_("Internal data stream error.")),
4157           ("stream stopped, reason %s", gst_flow_get_name (result)));
4158       gst_base_sink_event (pad, gst_event_new_eos ());
4159     }
4160     return;
4161   }
4162 no_buffer:
4163   {
4164     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4165     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4166         (_("Internal data flow error.")), ("element returned NULL buffer"));
4167     result = GST_FLOW_ERROR;
4168     goto paused;
4169   }
4170 }
4171
4172 static gboolean
4173 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4174     gboolean flushing)
4175 {
4176   GstBaseSinkClass *bclass;
4177
4178   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4179
4180   if (flushing) {
4181     /* unlock any subclasses, we need to do this before grabbing the
4182      * PREROLL_LOCK since we hold this lock before going into ::render. */
4183     if (bclass->unlock)
4184       bclass->unlock (basesink);
4185   }
4186
4187   GST_PAD_PREROLL_LOCK (pad);
4188   basesink->flushing = flushing;
4189   if (flushing) {
4190     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4191     if (bclass->unlock_stop)
4192       bclass->unlock_stop (basesink);
4193
4194     /* set need_preroll before we unblock the clock. If the clock is unblocked
4195      * before timing out, we can reuse the buffer for preroll. */
4196     basesink->need_preroll = TRUE;
4197
4198     /* step 2, unblock clock sync (if any) or any other blocking thing */
4199     if (basesink->clock_id) {
4200       gst_clock_id_unschedule (basesink->clock_id);
4201     }
4202
4203     /* flush out the data thread if it's locked in finish_preroll, this will
4204      * also flush out the EOS state */
4205     GST_DEBUG_OBJECT (basesink,
4206         "flushing out data thread, need preroll to TRUE");
4207     gst_base_sink_preroll_queue_flush (basesink, pad);
4208   }
4209   GST_PAD_PREROLL_UNLOCK (pad);
4210
4211   return TRUE;
4212 }
4213
4214 static gboolean
4215 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4216 {
4217   gboolean result;
4218
4219   if (active) {
4220     /* start task */
4221     result = gst_pad_start_task (basesink->sinkpad,
4222         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4223   } else {
4224     /* step 2, make sure streaming finishes */
4225     result = gst_pad_stop_task (basesink->sinkpad);
4226   }
4227
4228   return result;
4229 }
4230
4231 static gboolean
4232 gst_base_sink_pad_activate (GstPad * pad)
4233 {
4234   gboolean result = FALSE;
4235   GstBaseSink *basesink;
4236
4237   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4238
4239   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4240
4241   gst_base_sink_set_flushing (basesink, pad, FALSE);
4242
4243   /* we need to have the pull mode enabled */
4244   if (!basesink->can_activate_pull) {
4245     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4246     goto fallback;
4247   }
4248
4249   /* check if downstreams supports pull mode at all */
4250   if (!gst_pad_check_pull_range (pad)) {
4251     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4252     goto fallback;
4253   }
4254
4255   /* set the pad mode before starting the task so that it's in the
4256    * correct state for the new thread. also the sink set_caps and get_caps
4257    * function checks this */
4258   basesink->pad_mode = GST_ACTIVATE_PULL;
4259
4260   /* we first try to negotiate a format so that when we try to activate
4261    * downstream, it knows about our format */
4262   if (!gst_base_sink_negotiate_pull (basesink)) {
4263     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4264     goto fallback;
4265   }
4266
4267   /* ok activate now */
4268   if (!gst_pad_activate_pull (pad, TRUE)) {
4269     /* clear any pending caps */
4270     GST_OBJECT_LOCK (basesink);
4271     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4272     GST_OBJECT_UNLOCK (basesink);
4273     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4274     goto fallback;
4275   }
4276
4277   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4278   result = TRUE;
4279   goto done;
4280
4281   /* push mode fallback */
4282 fallback:
4283   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4284   if ((result = gst_pad_activate_push (pad, TRUE))) {
4285     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4286   }
4287
4288 done:
4289   if (!result) {
4290     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4291     gst_base_sink_set_flushing (basesink, pad, TRUE);
4292   }
4293
4294   gst_object_unref (basesink);
4295
4296   return result;
4297 }
4298
4299 static gboolean
4300 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4301 {
4302   gboolean result;
4303   GstBaseSink *basesink;
4304
4305   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4306
4307   if (active) {
4308     if (!basesink->can_activate_push) {
4309       result = FALSE;
4310       basesink->pad_mode = GST_ACTIVATE_NONE;
4311     } else {
4312       result = TRUE;
4313       basesink->pad_mode = GST_ACTIVATE_PUSH;
4314     }
4315   } else {
4316     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4317       g_warning ("Internal GStreamer activation error!!!");
4318       result = FALSE;
4319     } else {
4320       gst_base_sink_set_flushing (basesink, pad, TRUE);
4321       result = TRUE;
4322       basesink->pad_mode = GST_ACTIVATE_NONE;
4323     }
4324   }
4325
4326   gst_object_unref (basesink);
4327
4328   return result;
4329 }
4330
4331 static gboolean
4332 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4333 {
4334   GstCaps *caps;
4335   gboolean result;
4336
4337   result = FALSE;
4338
4339   /* this returns the intersection between our caps and the peer caps. If there
4340    * is no peer, it returns NULL and we can't operate in pull mode so we can
4341    * fail the negotiation. */
4342   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4343   if (caps == NULL || gst_caps_is_empty (caps))
4344     goto no_caps_possible;
4345
4346   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4347
4348   caps = gst_caps_make_writable (caps);
4349   /* get the first (prefered) format */
4350   gst_caps_truncate (caps);
4351   /* try to fixate */
4352   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4353
4354   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4355
4356   if (gst_caps_is_any (caps)) {
4357     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4358         "allowing pull()");
4359     /* neither side has template caps in this case, so they are prepared for
4360        pull() without setcaps() */
4361     result = TRUE;
4362   } else if (gst_caps_is_fixed (caps)) {
4363     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4364       goto could_not_set_caps;
4365
4366     GST_OBJECT_LOCK (basesink);
4367     gst_caps_replace (&basesink->priv->pull_caps, caps);
4368     GST_OBJECT_UNLOCK (basesink);
4369
4370     result = TRUE;
4371   }
4372
4373   gst_caps_unref (caps);
4374
4375   return result;
4376
4377 no_caps_possible:
4378   {
4379     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4380     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4381     if (caps)
4382       gst_caps_unref (caps);
4383     return FALSE;
4384   }
4385 could_not_set_caps:
4386   {
4387     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4388     gst_caps_unref (caps);
4389     return FALSE;
4390   }
4391 }
4392
4393 /* this won't get called until we implement an activate function */
4394 static gboolean
4395 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4396 {
4397   gboolean result = FALSE;
4398   GstBaseSink *basesink;
4399   GstBaseSinkClass *bclass;
4400
4401   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4402   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4403
4404   if (active) {
4405     GstFormat format;
4406     gint64 duration;
4407
4408     /* we mark we have a newsegment here because pull based
4409      * mode works just fine without having a newsegment before the
4410      * first buffer */
4411     format = GST_FORMAT_BYTES;
4412
4413     gst_segment_init (&basesink->segment, format);
4414     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4415     GST_OBJECT_LOCK (basesink);
4416     basesink->have_newsegment = TRUE;
4417     GST_OBJECT_UNLOCK (basesink);
4418
4419     /* get the peer duration in bytes */
4420     result = gst_pad_query_peer_duration (pad, &format, &duration);
4421     if (result) {
4422       GST_DEBUG_OBJECT (basesink,
4423           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4424       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4425           duration);
4426       gst_segment_set_duration (&basesink->segment, format, duration);
4427     } else {
4428       GST_DEBUG_OBJECT (basesink, "unknown duration");
4429     }
4430
4431     if (bclass->activate_pull)
4432       result = bclass->activate_pull (basesink, TRUE);
4433     else
4434       result = FALSE;
4435
4436     if (!result)
4437       goto activate_failed;
4438
4439   } else {
4440     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4441       g_warning ("Internal GStreamer activation error!!!");
4442       result = FALSE;
4443     } else {
4444       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4445       if (bclass->activate_pull)
4446         result &= bclass->activate_pull (basesink, FALSE);
4447       basesink->pad_mode = GST_ACTIVATE_NONE;
4448       /* clear any pending caps */
4449       GST_OBJECT_LOCK (basesink);
4450       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4451       GST_OBJECT_UNLOCK (basesink);
4452     }
4453   }
4454   gst_object_unref (basesink);
4455
4456   return result;
4457
4458   /* ERRORS */
4459 activate_failed:
4460   {
4461     /* reset, as starting the thread failed */
4462     basesink->pad_mode = GST_ACTIVATE_NONE;
4463
4464     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4465     return FALSE;
4466   }
4467 }
4468
4469 /* send an event to our sinkpad peer. */
4470 static gboolean
4471 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4472 {
4473   GstPad *pad;
4474   GstBaseSink *basesink = GST_BASE_SINK (element);
4475   gboolean forward, result = TRUE;
4476   GstActivateMode mode;
4477
4478   GST_OBJECT_LOCK (element);
4479   /* get the pad and the scheduling mode */
4480   pad = gst_object_ref (basesink->sinkpad);
4481   mode = basesink->pad_mode;
4482   GST_OBJECT_UNLOCK (element);
4483
4484   /* only push UPSTREAM events upstream */
4485   forward = GST_EVENT_IS_UPSTREAM (event);
4486
4487   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4488       event);
4489
4490   switch (GST_EVENT_TYPE (event)) {
4491     case GST_EVENT_LATENCY:
4492     {
4493       GstClockTime latency;
4494
4495       gst_event_parse_latency (event, &latency);
4496
4497       /* store the latency. We use this to adjust the running_time before syncing
4498        * it to the clock. */
4499       GST_OBJECT_LOCK (element);
4500       basesink->priv->latency = latency;
4501       if (!basesink->priv->have_latency)
4502         forward = FALSE;
4503       GST_OBJECT_UNLOCK (element);
4504       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4505           GST_TIME_ARGS (latency));
4506
4507       /* We forward this event so that all elements know about the global pipeline
4508        * latency. This is interesting for an element when it wants to figure out
4509        * when a particular piece of data will be rendered. */
4510       break;
4511     }
4512     case GST_EVENT_SEEK:
4513       /* in pull mode we will execute the seek */
4514       if (mode == GST_ACTIVATE_PULL)
4515         result = gst_base_sink_perform_seek (basesink, pad, event);
4516       break;
4517     case GST_EVENT_STEP:
4518       result = gst_base_sink_perform_step (basesink, pad, event);
4519       forward = FALSE;
4520       break;
4521     default:
4522       break;
4523   }
4524
4525   if (forward) {
4526     result = gst_pad_push_event (pad, event);
4527   } else {
4528     /* not forwarded, unref the event */
4529     gst_event_unref (event);
4530   }
4531
4532   gst_object_unref (pad);
4533   return result;
4534 }
4535
4536 static gboolean
4537 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4538     gint64 * cur, gboolean * upstream)
4539 {
4540   GstClock *clock = NULL;
4541   gboolean res = FALSE;
4542   GstFormat oformat, tformat;
4543   GstSegment *segment;
4544   GstClockTime now, latency;
4545   GstClockTimeDiff base;
4546   gint64 time, accum, duration;
4547   gdouble rate;
4548   gint64 last;
4549   gboolean last_seen, with_clock, in_paused;
4550
4551   GST_OBJECT_LOCK (basesink);
4552   /* we can only get the segment when we are not NULL or READY */
4553   if (!basesink->have_newsegment)
4554     goto wrong_state;
4555
4556   in_paused = FALSE;
4557   /* when not in PLAYING or when we're busy with a state change, we
4558    * cannot read from the clock so we report time based on the
4559    * last seen timestamp. */
4560   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4561       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4562     in_paused = TRUE;
4563   }
4564
4565   /* we don't use the clip segment in pull mode, when seeking we update the
4566    * main segment directly with the new segment values without it having to be
4567    * activated by the rendering after preroll */
4568   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4569     segment = basesink->abidata.ABI.clip_segment;
4570   else
4571     segment = &basesink->segment;
4572
4573   /* our intermediate time format */
4574   tformat = GST_FORMAT_TIME;
4575   /* get the format in the segment */
4576   oformat = segment->format;
4577
4578   /* report with last seen position when EOS */
4579   last_seen = basesink->eos;
4580
4581   /* assume we will use the clock for getting the current position */
4582   with_clock = TRUE;
4583   if (basesink->sync == FALSE)
4584     with_clock = FALSE;
4585
4586   /* and we need a clock */
4587   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4588     with_clock = FALSE;
4589   else
4590     gst_object_ref (clock);
4591
4592   /* mainloop might be querying position when going to playing async,
4593    * while (audio) rendering might be quickly advancing stream position,
4594    * so use clock asap rather than last reported position */
4595   if (in_paused && with_clock && g_atomic_int_get (&basesink->priv->to_playing)) {
4596     GST_DEBUG_OBJECT (basesink, "going to PLAYING, so not PAUSED");
4597     in_paused = FALSE;
4598   }
4599
4600   /* collect all data we need holding the lock */
4601   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4602     time = segment->time;
4603   else
4604     time = 0;
4605
4606   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4607     duration = segment->stop - segment->start;
4608   else
4609     duration = 0;
4610
4611   accum = segment->accum;
4612   rate = segment->rate * segment->applied_rate;
4613   latency = basesink->priv->latency;
4614
4615   if (oformat == GST_FORMAT_TIME) {
4616     gint64 start, stop;
4617
4618     start = basesink->priv->current_sstart;
4619     stop = basesink->priv->current_sstop;
4620
4621     if (in_paused) {
4622       /* in paused we use the last position as a lower bound */
4623       if (stop == -1 || segment->rate > 0.0)
4624         last = start;
4625       else
4626         last = stop;
4627     } else {
4628       /* in playing, use last stop time as upper bound */
4629       if (start == -1 || segment->rate > 0.0)
4630         last = stop;
4631       else
4632         last = start;
4633     }
4634   } else {
4635     /* convert last stop to stream time */
4636     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4637   }
4638
4639   if (in_paused) {
4640     /* in paused, use start_time */
4641     base = GST_ELEMENT_START_TIME (basesink);
4642     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4643         GST_TIME_ARGS (base));
4644   } else if (with_clock) {
4645     /* else use clock when needed */
4646     base = GST_ELEMENT_CAST (basesink)->base_time;
4647     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4648         GST_TIME_ARGS (base));
4649   } else {
4650     /* else, no sync or clock -> no base time */
4651     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4652     base = -1;
4653   }
4654
4655   /* no base, we can't calculate running_time, use last seem timestamp to report
4656    * time */
4657   if (base == -1)
4658     last_seen = TRUE;
4659
4660   /* need to release the object lock before we can get the time,
4661    * a clock might take the LOCK of the provider, which could be
4662    * a basesink subclass. */
4663   GST_OBJECT_UNLOCK (basesink);
4664
4665   if (last_seen) {
4666     /* in EOS or when no valid stream_time, report the value of last seen
4667      * timestamp */
4668     if (last == -1) {
4669       /* no timestamp, we need to ask upstream */
4670       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4671       res = FALSE;
4672       *upstream = TRUE;
4673       goto done;
4674     }
4675     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4676         GST_TIME_ARGS (last));
4677     *cur = last;
4678   } else {
4679     if (oformat != tformat) {
4680       /* convert accum, time and duration to time */
4681       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4682               &accum))
4683         goto convert_failed;
4684       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4685               &tformat, &duration))
4686         goto convert_failed;
4687       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4688               &time))
4689         goto convert_failed;
4690       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4691               &last))
4692         goto convert_failed;
4693
4694       /* assume time format from now on */
4695       oformat = tformat;
4696     }
4697
4698     if (!in_paused && with_clock) {
4699       now = gst_clock_get_time (clock);
4700     } else {
4701       now = base;
4702       base = 0;
4703     }
4704
4705     /* subtract base time and accumulated time from the clock time.
4706      * Make sure we don't go negative. This is the current time in
4707      * the segment which we need to scale with the combined
4708      * rate and applied rate. */
4709     base += accum;
4710     base += latency;
4711     if (GST_CLOCK_DIFF (base, now) < 0)
4712       base = now;
4713
4714     /* for negative rates we need to count back from the segment
4715      * duration. */
4716     if (rate < 0.0)
4717       time += duration;
4718
4719     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4720
4721     if (in_paused) {
4722       /* never report less than segment values in paused */
4723       if (last != -1)
4724         *cur = MAX (last, *cur);
4725     } else {
4726       /* never report more than last seen position in playing */
4727       if (last != -1)
4728         *cur = MIN (last, *cur);
4729     }
4730
4731     GST_DEBUG_OBJECT (basesink,
4732         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4733         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4734         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4735         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4736   }
4737
4738   if (oformat != format) {
4739     /* convert to final format */
4740     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4741       goto convert_failed;
4742   }
4743
4744   res = TRUE;
4745
4746 done:
4747   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4748       res, GST_TIME_ARGS (*cur));
4749
4750   if (clock)
4751     gst_object_unref (clock);
4752
4753   return res;
4754
4755   /* special cases */
4756 wrong_state:
4757   {
4758     /* in NULL or READY we always return FALSE and -1 */
4759     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4760     res = FALSE;
4761     *cur = -1;
4762     GST_OBJECT_UNLOCK (basesink);
4763     goto done;
4764   }
4765 convert_failed:
4766   {
4767     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4768     *upstream = TRUE;
4769     res = FALSE;
4770     goto done;
4771   }
4772 }
4773
4774 static gboolean
4775 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4776     gint64 * dur, gboolean * upstream)
4777 {
4778   gboolean res = FALSE;
4779
4780   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4781     GstFormat uformat = GST_FORMAT_BYTES;
4782     gint64 uduration;
4783
4784     /* get the duration in bytes, in pull mode that's all we are sure to
4785      * know. We have to explicitly get this value from upstream instead of
4786      * using our cached value because it might change. Duration caching
4787      * should be done at a higher level. */
4788     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4789     if (res) {
4790       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4791       if (format != uformat) {
4792         /* convert to the requested format */
4793         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4794             &format, dur);
4795       } else {
4796         *dur = uduration;
4797       }
4798     }
4799     *upstream = FALSE;
4800   } else {
4801     *upstream = TRUE;
4802   }
4803
4804   return res;
4805 }
4806
4807 static const GstQueryType *
4808 gst_base_sink_get_query_types (GstElement * element)
4809 {
4810   static const GstQueryType query_types[] = {
4811     GST_QUERY_DURATION,
4812     GST_QUERY_POSITION,
4813     GST_QUERY_SEGMENT,
4814     GST_QUERY_LATENCY,
4815     0
4816   };
4817
4818   return query_types;
4819 }
4820
4821 static gboolean
4822 gst_base_sink_query (GstElement * element, GstQuery * query)
4823 {
4824   gboolean res = FALSE;
4825
4826   GstBaseSink *basesink = GST_BASE_SINK (element);
4827
4828   switch (GST_QUERY_TYPE (query)) {
4829     case GST_QUERY_POSITION:
4830     {
4831       gint64 cur = 0;
4832       GstFormat format;
4833       gboolean upstream = FALSE;
4834
4835       gst_query_parse_position (query, &format, NULL);
4836
4837       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4838           gst_format_get_name (format));
4839
4840       /* first try to get the position based on the clock */
4841       if ((res =
4842               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4843         gst_query_set_position (query, format, cur);
4844       } else if (upstream) {
4845         /* fallback to peer query */
4846         res = gst_pad_peer_query (basesink->sinkpad, query);
4847       }
4848       if (!res) {
4849         /* we can handle a few things if upstream failed */
4850         if (format == GST_FORMAT_PERCENT) {
4851           gint64 dur = 0;
4852           GstFormat uformat = GST_FORMAT_TIME;
4853
4854           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4855               &upstream);
4856           if (!res && upstream) {
4857             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4858                 &cur);
4859           }
4860           if (res) {
4861             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4862                 &upstream);
4863             if (!res && upstream) {
4864               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4865                   &dur);
4866             }
4867           }
4868           if (res) {
4869             gint64 pos;
4870
4871             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4872                 dur);
4873             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4874           }
4875         }
4876       }
4877       break;
4878     }
4879     case GST_QUERY_DURATION:
4880     {
4881       gint64 dur = 0;
4882       GstFormat format;
4883       gboolean upstream = FALSE;
4884
4885       gst_query_parse_duration (query, &format, NULL);
4886
4887       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4888           gst_format_get_name (format));
4889
4890       if ((res =
4891               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4892         gst_query_set_duration (query, format, dur);
4893       } else if (upstream) {
4894         /* fallback to peer query */
4895         res = gst_pad_peer_query (basesink->sinkpad, query);
4896       }
4897       if (!res) {
4898         /* we can handle a few things if upstream failed */
4899         if (format == GST_FORMAT_PERCENT) {
4900           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4901               GST_FORMAT_PERCENT_MAX);
4902           res = TRUE;
4903         }
4904       }
4905       break;
4906     }
4907     case GST_QUERY_LATENCY:
4908     {
4909       gboolean live, us_live;
4910       GstClockTime min, max;
4911
4912       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4913                   &max))) {
4914         gst_query_set_latency (query, live, min, max);
4915       }
4916       break;
4917     }
4918     case GST_QUERY_JITTER:
4919       break;
4920     case GST_QUERY_RATE:
4921       /* gst_query_set_rate (query, basesink->segment_rate); */
4922       res = TRUE;
4923       break;
4924     case GST_QUERY_SEGMENT:
4925     {
4926       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4927         gst_query_set_segment (query, basesink->segment.rate,
4928             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4929         res = TRUE;
4930       } else {
4931         res = gst_pad_peer_query (basesink->sinkpad, query);
4932       }
4933       break;
4934     }
4935     case GST_QUERY_SEEKING:
4936     case GST_QUERY_CONVERT:
4937     case GST_QUERY_FORMATS:
4938     default:
4939       res = gst_pad_peer_query (basesink->sinkpad, query);
4940       break;
4941   }
4942   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4943       GST_QUERY_TYPE_NAME (query), res);
4944   return res;
4945 }
4946
4947 static GstStateChangeReturn
4948 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4949 {
4950   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4951   GstBaseSink *basesink = GST_BASE_SINK (element);
4952   GstBaseSinkClass *bclass;
4953   GstBaseSinkPrivate *priv;
4954
4955   priv = basesink->priv;
4956
4957   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4958
4959   switch (transition) {
4960     case GST_STATE_CHANGE_NULL_TO_READY:
4961       if (bclass->start)
4962         if (!bclass->start (basesink))
4963           goto start_failed;
4964       break;
4965     case GST_STATE_CHANGE_READY_TO_PAUSED:
4966       /* need to complete preroll before this state change completes, there
4967        * is no data flow in READY so we can safely assume we need to preroll. */
4968       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4969       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4970       basesink->have_newsegment = FALSE;
4971       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4972       gst_segment_init (basesink->abidata.ABI.clip_segment,
4973           GST_FORMAT_UNDEFINED);
4974       basesink->offset = 0;
4975       basesink->have_preroll = FALSE;
4976       priv->step_unlock = FALSE;
4977       basesink->need_preroll = TRUE;
4978       basesink->playing_async = TRUE;
4979       priv->current_sstart = GST_CLOCK_TIME_NONE;
4980       priv->current_sstop = GST_CLOCK_TIME_NONE;
4981       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4982       priv->latency = 0;
4983       basesink->eos = FALSE;
4984       priv->received_eos = FALSE;
4985       gst_base_sink_reset_qos (basesink);
4986       priv->commited = FALSE;
4987       priv->call_preroll = TRUE;
4988       priv->current_step.valid = FALSE;
4989       priv->pending_step.valid = FALSE;
4990       if (priv->async_enabled) {
4991         GST_DEBUG_OBJECT (basesink, "doing async state change");
4992         /* when async enabled, post async-start message and return ASYNC from
4993          * the state change function */
4994         ret = GST_STATE_CHANGE_ASYNC;
4995         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4996             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4997       } else {
4998         priv->have_latency = TRUE;
4999       }
5000       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5001       break;
5002     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5003       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5004       g_atomic_int_set (&basesink->priv->to_playing, TRUE);
5005       if (!gst_base_sink_needs_preroll (basesink)) {
5006         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
5007         /* no preroll needed anymore now. */
5008         basesink->playing_async = FALSE;
5009         basesink->need_preroll = FALSE;
5010         if (basesink->eos) {
5011           GstMessage *message;
5012
5013           /* need to post EOS message here */
5014           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
5015           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
5016           gst_message_set_seqnum (message, basesink->priv->seqnum);
5017           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
5018         } else {
5019           GST_DEBUG_OBJECT (basesink, "signal preroll");
5020           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
5021         }
5022       } else {
5023         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
5024         basesink->need_preroll = TRUE;
5025         basesink->playing_async = TRUE;
5026         priv->call_preroll = TRUE;
5027         priv->commited = FALSE;
5028         if (priv->async_enabled) {
5029           GST_DEBUG_OBJECT (basesink, "doing async state change");
5030           ret = GST_STATE_CHANGE_ASYNC;
5031           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5032               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5033         }
5034       }
5035       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5036       break;
5037     default:
5038       break;
5039   }
5040
5041   {
5042     GstStateChangeReturn bret;
5043
5044     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5045     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5046       goto activate_failed;
5047   }
5048
5049   switch (transition) {
5050     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5051       /* completed transition, so need not be marked any longer
5052        * And it should be unmarked, since e.g. losing our position upon flush
5053        * does not really change state to PAUSED ... */
5054       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5055       break;
5056     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5057       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5058       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5059       /* FIXME, make sure we cannot enter _render first */
5060
5061       /* we need to call ::unlock before locking PREROLL_LOCK
5062        * since we lock it before going into ::render */
5063       if (bclass->unlock)
5064         bclass->unlock (basesink);
5065
5066       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5067       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5068       /* now that we have the PREROLL lock, clear our unlock request */
5069       if (bclass->unlock_stop)
5070         bclass->unlock_stop (basesink);
5071
5072       /* we need preroll again and we set the flag before unlocking the clockid
5073        * because if the clockid is unlocked before a current buffer expired, we
5074        * can use that buffer to preroll with */
5075       basesink->need_preroll = TRUE;
5076
5077       if (basesink->clock_id) {
5078         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5079         gst_clock_id_unschedule (basesink->clock_id);
5080       }
5081
5082       /* if we don't have a preroll buffer we need to wait for a preroll and
5083        * return ASYNC. */
5084       if (!gst_base_sink_needs_preroll (basesink)) {
5085         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5086         basesink->playing_async = FALSE;
5087       } else {
5088         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5089           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5090           ret = GST_STATE_CHANGE_SUCCESS;
5091         } else {
5092           GST_DEBUG_OBJECT (basesink,
5093               "PLAYING to PAUSED, we are not prerolled");
5094           basesink->playing_async = TRUE;
5095           priv->commited = FALSE;
5096           priv->call_preroll = TRUE;
5097           if (priv->async_enabled) {
5098             GST_DEBUG_OBJECT (basesink, "doing async state change");
5099             ret = GST_STATE_CHANGE_ASYNC;
5100             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5101                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5102                     FALSE));
5103           }
5104         }
5105       }
5106       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5107           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5108
5109       gst_base_sink_reset_qos (basesink);
5110       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5111       break;
5112     case GST_STATE_CHANGE_PAUSED_TO_READY:
5113       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5114       /* start by reseting our position state with the object lock so that the
5115        * position query gets the right idea. We do this before we post the
5116        * messages so that the message handlers pick this up. */
5117       GST_OBJECT_LOCK (basesink);
5118       basesink->have_newsegment = FALSE;
5119       priv->current_sstart = GST_CLOCK_TIME_NONE;
5120       priv->current_sstop = GST_CLOCK_TIME_NONE;
5121       priv->have_latency = FALSE;
5122       if (priv->cached_clock_id) {
5123         gst_clock_id_unref (priv->cached_clock_id);
5124         priv->cached_clock_id = NULL;
5125       }
5126       GST_OBJECT_UNLOCK (basesink);
5127
5128       gst_base_sink_set_last_buffer (basesink, NULL);
5129       priv->call_preroll = FALSE;
5130
5131       if (!priv->commited) {
5132         if (priv->async_enabled) {
5133           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5134
5135           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5136               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5137                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5138
5139           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5140               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5141         }
5142         priv->commited = TRUE;
5143       } else {
5144         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5145       }
5146       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5147       break;
5148     case GST_STATE_CHANGE_READY_TO_NULL:
5149       if (bclass->stop) {
5150         if (!bclass->stop (basesink)) {
5151           GST_WARNING_OBJECT (basesink, "failed to stop");
5152         }
5153       }
5154       gst_base_sink_set_last_buffer (basesink, NULL);
5155       priv->call_preroll = FALSE;
5156       break;
5157     default:
5158       break;
5159   }
5160
5161   return ret;
5162
5163   /* ERRORS */
5164 start_failed:
5165   {
5166     GST_DEBUG_OBJECT (basesink, "failed to start");
5167     return GST_STATE_CHANGE_FAILURE;
5168   }
5169 activate_failed:
5170   {
5171     GST_DEBUG_OBJECT (basesink,
5172         "element failed to change states -- activation problem?");
5173     return GST_STATE_CHANGE_FAILURE;
5174   }
5175 }