basesink: fix emergency rendering timestamp tracking
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199
200   /* EOS sync time in running time */
201   GstClockTime eos_rtime;
202
203   /* last buffer that arrived in time, running time */
204   GstClockTime last_in_time;
205   /* when the last buffer left the sink, running time */
206   GstClockTime last_left;
207
208   /* running averages go here these are done on running time */
209   GstClockTime avg_pt;
210   GstClockTime avg_duration;
211   gdouble avg_rate;
212
213   /* these are done on system time. avg_jitter and avg_render are
214    * compared to eachother to see if the rendering time takes a
215    * huge amount of the processing, If so we are flooded with
216    * buffers. */
217   GstClockTime last_left_systime;
218   GstClockTime avg_jitter;
219   GstClockTime start, stop;
220   GstClockTime avg_render;
221
222   /* number of rendered and dropped frames */
223   guint64 rendered;
224   guint64 dropped;
225
226   /* latency stuff */
227   GstClockTime latency;
228
229   /* if we already commited the state */
230   gboolean commited;
231
232   /* when we received EOS */
233   gboolean received_eos;
234
235   /* when we are prerolled and able to report latency */
236   gboolean have_latency;
237
238   /* the last buffer we prerolled or rendered. Useful for making snapshots */
239   GstBuffer *last_buffer;
240
241   /* caps for pull based scheduling */
242   GstCaps *pull_caps;
243
244   /* blocksize for pulling */
245   guint blocksize;
246
247   gboolean discont;
248
249   /* seqnum of the stream */
250   guint32 seqnum;
251
252   gboolean call_preroll;
253   gboolean step_unlock;
254
255   /* we have a pending and a current step operation */
256   GstStepInfo current_step;
257   GstStepInfo pending_step;
258 };
259
260 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
261
262 /* generic running average, this has a neutral window size */
263 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
264
265 /* the windows for these running averages are experimentally obtained.
266  * possitive values get averaged more while negative values use a small
267  * window so we can react faster to badness. */
268 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
269 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
270
271 /* BaseSink properties */
272
273 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
274 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
275
276 #define DEFAULT_PREROLL_QUEUE_LEN   0
277 #define DEFAULT_SYNC                TRUE
278 #define DEFAULT_MAX_LATENESS        -1
279 #define DEFAULT_QOS                 FALSE
280 #define DEFAULT_ASYNC               TRUE
281 #define DEFAULT_TS_OFFSET           0
282 #define DEFAULT_BLOCKSIZE           4096
283 #define DEFAULT_RENDER_DELAY        0
284
285 enum
286 {
287   PROP_0,
288   PROP_PREROLL_QUEUE_LEN,
289   PROP_SYNC,
290   PROP_MAX_LATENESS,
291   PROP_QOS,
292   PROP_ASYNC,
293   PROP_TS_OFFSET,
294   PROP_LAST_BUFFER,
295   PROP_BLOCKSIZE,
296   PROP_RENDER_DELAY,
297   PROP_LAST
298 };
299
300 static GstElementClass *parent_class = NULL;
301
302 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
303 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
304 static void gst_base_sink_finalize (GObject * object);
305
306 GType
307 gst_base_sink_get_type (void)
308 {
309   static volatile gsize base_sink_type = 0;
310
311   if (g_once_init_enter (&base_sink_type)) {
312     GType _type;
313     static const GTypeInfo base_sink_info = {
314       sizeof (GstBaseSinkClass),
315       NULL,
316       NULL,
317       (GClassInitFunc) gst_base_sink_class_init,
318       NULL,
319       NULL,
320       sizeof (GstBaseSink),
321       0,
322       (GInstanceInitFunc) gst_base_sink_init,
323     };
324
325     _type = g_type_register_static (GST_TYPE_ELEMENT,
326         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
327     g_once_init_leave (&base_sink_type, _type);
328   }
329   return base_sink_type;
330 }
331
332 static void gst_base_sink_set_property (GObject * object, guint prop_id,
333     const GValue * value, GParamSpec * pspec);
334 static void gst_base_sink_get_property (GObject * object, guint prop_id,
335     GValue * value, GParamSpec * pspec);
336
337 static gboolean gst_base_sink_send_event (GstElement * element,
338     GstEvent * event);
339 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
340
341 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
342 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
343 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
344     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
345 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
346     GstClockTime * start, GstClockTime * end);
347 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
348     GstPad * pad, gboolean flushing);
349 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
350     gboolean active);
351 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
352     GstSegment * segment);
353 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
354     GstEvent * event, GstSegment * segment);
355
356 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
357     GstStateChange transition);
358
359 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
360 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
361     GstBufferList * list);
362
363 static void gst_base_sink_loop (GstPad * pad);
364 static gboolean gst_base_sink_pad_activate (GstPad * pad);
365 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
366 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
367 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
368 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
369
370 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
371 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
372 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
373 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
374 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
375     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
376
377
378 /* check if an object was too late */
379 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
380     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
381     GstClockReturn status, GstClockTimeDiff jitter);
382 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
383     gboolean is_list, GstMiniObject * obj);
384
385 static void
386 gst_base_sink_class_init (GstBaseSinkClass * klass)
387 {
388   GObjectClass *gobject_class;
389   GstElementClass *gstelement_class;
390
391   gobject_class = G_OBJECT_CLASS (klass);
392   gstelement_class = GST_ELEMENT_CLASS (klass);
393
394   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
395       "basesink element");
396
397   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
398
399   parent_class = g_type_class_peek_parent (klass);
400
401   gobject_class->finalize = gst_base_sink_finalize;
402   gobject_class->set_property = gst_base_sink_set_property;
403   gobject_class->get_property = gst_base_sink_get_property;
404
405   /* FIXME, this next value should be configured using an event from the
406    * upstream element, ie, the BUFFER_SIZE event. */
407   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
408       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
409           "Number of buffers to queue during preroll", 0, G_MAXUINT,
410           DEFAULT_PREROLL_QUEUE_LEN,
411           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
412
413   g_object_class_install_property (gobject_class, PROP_SYNC,
414       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
415           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416
417   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
418       g_param_spec_int64 ("max-lateness", "Max Lateness",
419           "Maximum number of nanoseconds that a buffer can be late before it "
420           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
421           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422
423   g_object_class_install_property (gobject_class, PROP_QOS,
424       g_param_spec_boolean ("qos", "Qos",
425           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
426           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
427   /**
428    * GstBaseSink:async
429    *
430    * If set to #TRUE, the basesink will perform asynchronous state changes.
431    * When set to #FALSE, the sink will not signal the parent when it prerolls.
432    * Use this option when dealing with sparse streams or when synchronisation is
433    * not required.
434    *
435    * Since: 0.10.15
436    */
437   g_object_class_install_property (gobject_class, PROP_ASYNC,
438       g_param_spec_boolean ("async", "Async",
439           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
440           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
441   /**
442    * GstBaseSink:ts-offset
443    *
444    * Controls the final synchronisation, a negative value will render the buffer
445    * earlier while a positive value delays playback. This property can be
446    * used to fix synchronisation in bad files.
447    *
448    * Since: 0.10.15
449    */
450   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
451       g_param_spec_int64 ("ts-offset", "TS Offset",
452           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
453           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
454   /**
455    * GstBaseSink:last-buffer
456    *
457    * The last buffer that arrived in the sink and was used for preroll or for
458    * rendering. This property can be used to generate thumbnails. This property
459    * can be NULL when the sink has not yet received a bufer.
460    *
461    * Since: 0.10.15
462    */
463   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
464       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
465           "The last buffer received in the sink", GST_TYPE_BUFFER,
466           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
467   /**
468    * GstBaseSink:blocksize
469    *
470    * The amount of bytes to pull when operating in pull mode.
471    *
472    * Since: 0.10.22
473    */
474   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
475       g_param_spec_uint ("blocksize", "Block size",
476           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
477           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478   /**
479    * GstBaseSink:render-delay
480    *
481    * The additional delay between synchronisation and actual rendering of the
482    * media. This property will add additional latency to the device in order to
483    * make other sinks compensate for the delay.
484    *
485    * Since: 0.10.22
486    */
487   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
488       g_param_spec_uint64 ("render-delay", "Render Delay",
489           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
490           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
491
492   gstelement_class->change_state =
493       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
494   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
495   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
496
497   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
498   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
499   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
500   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
501   klass->activate_pull =
502       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
503
504   /* Registering debug symbols for function pointers */
505   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
506   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
507   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
508   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
509   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
510   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
511   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
512   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
513   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
514   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
515 }
516
517 static GstCaps *
518 gst_base_sink_pad_getcaps (GstPad * pad)
519 {
520   GstBaseSinkClass *bclass;
521   GstBaseSink *bsink;
522   GstCaps *caps = NULL;
523
524   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
525   bclass = GST_BASE_SINK_GET_CLASS (bsink);
526
527   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
528     /* if we are operating in pull mode we only accept the negotiated caps */
529     GST_OBJECT_LOCK (pad);
530     if ((caps = GST_PAD_CAPS (pad)))
531       gst_caps_ref (caps);
532     GST_OBJECT_UNLOCK (pad);
533   }
534   if (caps == NULL) {
535     if (bclass->get_caps)
536       caps = bclass->get_caps (bsink);
537
538     if (caps == NULL) {
539       GstPadTemplate *pad_template;
540
541       pad_template =
542           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
543           "sink");
544       if (pad_template != NULL) {
545         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
546       }
547     }
548   }
549   gst_object_unref (bsink);
550
551   return caps;
552 }
553
554 static gboolean
555 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
556 {
557   GstBaseSinkClass *bclass;
558   GstBaseSink *bsink;
559   gboolean res = TRUE;
560
561   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
562   bclass = GST_BASE_SINK_GET_CLASS (bsink);
563
564   if (res && bclass->set_caps)
565     res = bclass->set_caps (bsink, caps);
566
567   gst_object_unref (bsink);
568
569   return res;
570 }
571
572 static void
573 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
574 {
575   GstBaseSinkClass *bclass;
576   GstBaseSink *bsink;
577
578   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
579   bclass = GST_BASE_SINK_GET_CLASS (bsink);
580
581   if (bclass->fixate)
582     bclass->fixate (bsink, caps);
583
584   gst_object_unref (bsink);
585 }
586
587 static GstFlowReturn
588 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
589     GstCaps * caps, GstBuffer ** buf)
590 {
591   GstBaseSinkClass *bclass;
592   GstBaseSink *bsink;
593   GstFlowReturn result = GST_FLOW_OK;
594
595   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
596   bclass = GST_BASE_SINK_GET_CLASS (bsink);
597
598   if (bclass->buffer_alloc)
599     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
600   else
601     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
602
603   gst_object_unref (bsink);
604
605   return result;
606 }
607
608 static void
609 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
610 {
611   GstPadTemplate *pad_template;
612   GstBaseSinkPrivate *priv;
613
614   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
615
616   pad_template =
617       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
618   g_return_if_fail (pad_template != NULL);
619
620   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
621
622   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
623   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
624   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
625   gst_pad_set_bufferalloc_function (basesink->sinkpad,
626       gst_base_sink_pad_buffer_alloc);
627   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
628   gst_pad_set_activatepush_function (basesink->sinkpad,
629       gst_base_sink_pad_activate_push);
630   gst_pad_set_activatepull_function (basesink->sinkpad,
631       gst_base_sink_pad_activate_pull);
632   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
633   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
634   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
635   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
636
637   basesink->pad_mode = GST_ACTIVATE_NONE;
638   basesink->preroll_queue = g_queue_new ();
639   basesink->abidata.ABI.clip_segment = gst_segment_new ();
640   priv->have_latency = FALSE;
641
642   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
643   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
644
645   basesink->sync = DEFAULT_SYNC;
646   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
647   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
648   priv->async_enabled = DEFAULT_ASYNC;
649   priv->ts_offset = DEFAULT_TS_OFFSET;
650   priv->render_delay = DEFAULT_RENDER_DELAY;
651   priv->blocksize = DEFAULT_BLOCKSIZE;
652
653   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
654 }
655
656 static void
657 gst_base_sink_finalize (GObject * object)
658 {
659   GstBaseSink *basesink;
660
661   basesink = GST_BASE_SINK (object);
662
663   g_queue_free (basesink->preroll_queue);
664   gst_segment_free (basesink->abidata.ABI.clip_segment);
665
666   G_OBJECT_CLASS (parent_class)->finalize (object);
667 }
668
669 /**
670  * gst_base_sink_set_sync:
671  * @sink: the sink
672  * @sync: the new sync value.
673  *
674  * Configures @sink to synchronize on the clock or not. When
675  * @sync is FALSE, incomming samples will be played as fast as
676  * possible. If @sync is TRUE, the timestamps of the incomming
677  * buffers will be used to schedule the exact render time of its
678  * contents.
679  *
680  * Since: 0.10.4
681  */
682 void
683 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
684 {
685   g_return_if_fail (GST_IS_BASE_SINK (sink));
686
687   GST_OBJECT_LOCK (sink);
688   sink->sync = sync;
689   GST_OBJECT_UNLOCK (sink);
690 }
691
692 /**
693  * gst_base_sink_get_sync:
694  * @sink: the sink
695  *
696  * Checks if @sink is currently configured to synchronize against the
697  * clock.
698  *
699  * Returns: TRUE if the sink is configured to synchronize against the clock.
700  *
701  * Since: 0.10.4
702  */
703 gboolean
704 gst_base_sink_get_sync (GstBaseSink * sink)
705 {
706   gboolean res;
707
708   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
709
710   GST_OBJECT_LOCK (sink);
711   res = sink->sync;
712   GST_OBJECT_UNLOCK (sink);
713
714   return res;
715 }
716
717 /**
718  * gst_base_sink_set_max_lateness:
719  * @sink: the sink
720  * @max_lateness: the new max lateness value.
721  *
722  * Sets the new max lateness value to @max_lateness. This value is
723  * used to decide if a buffer should be dropped or not based on the
724  * buffer timestamp and the current clock time. A value of -1 means
725  * an unlimited time.
726  *
727  * Since: 0.10.4
728  */
729 void
730 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
731 {
732   g_return_if_fail (GST_IS_BASE_SINK (sink));
733
734   GST_OBJECT_LOCK (sink);
735   sink->abidata.ABI.max_lateness = max_lateness;
736   GST_OBJECT_UNLOCK (sink);
737 }
738
739 /**
740  * gst_base_sink_get_max_lateness:
741  * @sink: the sink
742  *
743  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
744  * more details.
745  *
746  * Returns: The maximum time in nanoseconds that a buffer can be late
747  * before it is dropped and not rendered. A value of -1 means an
748  * unlimited time.
749  *
750  * Since: 0.10.4
751  */
752 gint64
753 gst_base_sink_get_max_lateness (GstBaseSink * sink)
754 {
755   gint64 res;
756
757   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
758
759   GST_OBJECT_LOCK (sink);
760   res = sink->abidata.ABI.max_lateness;
761   GST_OBJECT_UNLOCK (sink);
762
763   return res;
764 }
765
766 /**
767  * gst_base_sink_set_qos_enabled:
768  * @sink: the sink
769  * @enabled: the new qos value.
770  *
771  * Configures @sink to send Quality-of-Service events upstream.
772  *
773  * Since: 0.10.5
774  */
775 void
776 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
777 {
778   g_return_if_fail (GST_IS_BASE_SINK (sink));
779
780   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
781 }
782
783 /**
784  * gst_base_sink_is_qos_enabled:
785  * @sink: the sink
786  *
787  * Checks if @sink is currently configured to send Quality-of-Service events
788  * upstream.
789  *
790  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
791  *
792  * Since: 0.10.5
793  */
794 gboolean
795 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
796 {
797   gboolean res;
798
799   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
800
801   res = g_atomic_int_get (&sink->priv->qos_enabled);
802
803   return res;
804 }
805
806 /**
807  * gst_base_sink_set_async_enabled:
808  * @sink: the sink
809  * @enabled: the new async value.
810  *
811  * Configures @sink to perform all state changes asynchronusly. When async is
812  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
813  * preroll buffer. This feature is usefull if the sink does not synchronize
814  * against the clock or when it is dealing with sparse streams.
815  *
816  * Since: 0.10.15
817  */
818 void
819 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
820 {
821   g_return_if_fail (GST_IS_BASE_SINK (sink));
822
823   GST_PAD_PREROLL_LOCK (sink->sinkpad);
824   sink->priv->async_enabled = enabled;
825   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
826   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
827 }
828
829 /**
830  * gst_base_sink_is_async_enabled:
831  * @sink: the sink
832  *
833  * Checks if @sink is currently configured to perform asynchronous state
834  * changes to PAUSED.
835  *
836  * Returns: TRUE if the sink is configured to perform asynchronous state
837  * changes.
838  *
839  * Since: 0.10.15
840  */
841 gboolean
842 gst_base_sink_is_async_enabled (GstBaseSink * sink)
843 {
844   gboolean res;
845
846   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
847
848   GST_PAD_PREROLL_LOCK (sink->sinkpad);
849   res = sink->priv->async_enabled;
850   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
851
852   return res;
853 }
854
855 /**
856  * gst_base_sink_set_ts_offset:
857  * @sink: the sink
858  * @offset: the new offset
859  *
860  * Adjust the synchronisation of @sink with @offset. A negative value will
861  * render buffers earlier than their timestamp. A positive value will delay
862  * rendering. This function can be used to fix playback of badly timestamped
863  * buffers.
864  *
865  * Since: 0.10.15
866  */
867 void
868 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
869 {
870   g_return_if_fail (GST_IS_BASE_SINK (sink));
871
872   GST_OBJECT_LOCK (sink);
873   sink->priv->ts_offset = offset;
874   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
875   GST_OBJECT_UNLOCK (sink);
876 }
877
878 /**
879  * gst_base_sink_get_ts_offset:
880  * @sink: the sink
881  *
882  * Get the synchronisation offset of @sink.
883  *
884  * Returns: The synchronisation offset.
885  *
886  * Since: 0.10.15
887  */
888 GstClockTimeDiff
889 gst_base_sink_get_ts_offset (GstBaseSink * sink)
890 {
891   GstClockTimeDiff res;
892
893   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
894
895   GST_OBJECT_LOCK (sink);
896   res = sink->priv->ts_offset;
897   GST_OBJECT_UNLOCK (sink);
898
899   return res;
900 }
901
902 /**
903  * gst_base_sink_get_last_buffer:
904  * @sink: the sink
905  *
906  * Get the last buffer that arrived in the sink and was used for preroll or for
907  * rendering. This property can be used to generate thumbnails.
908  *
909  * The #GstCaps on the buffer can be used to determine the type of the buffer.
910  *
911  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
912  * NULL when no buffer has arrived in the sink yet or when the sink is not in
913  * PAUSED or PLAYING.
914  *
915  * Since: 0.10.15
916  */
917 GstBuffer *
918 gst_base_sink_get_last_buffer (GstBaseSink * sink)
919 {
920   GstBuffer *res;
921
922   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
923
924   GST_OBJECT_LOCK (sink);
925   if ((res = sink->priv->last_buffer))
926     gst_buffer_ref (res);
927   GST_OBJECT_UNLOCK (sink);
928
929   return res;
930 }
931
932 static void
933 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
934 {
935   GstBuffer *old;
936
937   GST_OBJECT_LOCK (sink);
938   old = sink->priv->last_buffer;
939   if (G_LIKELY (old != buffer)) {
940     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
941     if (G_LIKELY (buffer))
942       gst_buffer_ref (buffer);
943     sink->priv->last_buffer = buffer;
944   } else {
945     old = NULL;
946   }
947   GST_OBJECT_UNLOCK (sink);
948
949   /* avoid unreffing with the lock because cleanup code might want to take the
950    * lock too */
951   if (G_LIKELY (old))
952     gst_buffer_unref (old);
953 }
954
955 /**
956  * gst_base_sink_get_latency:
957  * @sink: the sink
958  *
959  * Get the currently configured latency.
960  *
961  * Returns: The configured latency.
962  *
963  * Since: 0.10.12
964  */
965 GstClockTime
966 gst_base_sink_get_latency (GstBaseSink * sink)
967 {
968   GstClockTime res;
969
970   GST_OBJECT_LOCK (sink);
971   res = sink->priv->latency;
972   GST_OBJECT_UNLOCK (sink);
973
974   return res;
975 }
976
977 /**
978  * gst_base_sink_query_latency:
979  * @sink: the sink
980  * @live: if the sink is live
981  * @upstream_live: if an upstream element is live
982  * @min_latency: the min latency of the upstream elements
983  * @max_latency: the max latency of the upstream elements
984  *
985  * Query the sink for the latency parameters. The latency will be queried from
986  * the upstream elements. @live will be TRUE if @sink is configured to
987  * synchronize against the clock. @upstream_live will be TRUE if an upstream
988  * element is live.
989  *
990  * If both @live and @upstream_live are TRUE, the sink will want to compensate
991  * for the latency introduced by the upstream elements by setting the
992  * @min_latency to a strictly possitive value.
993  *
994  * This function is mostly used by subclasses.
995  *
996  * Returns: TRUE if the query succeeded.
997  *
998  * Since: 0.10.12
999  */
1000 gboolean
1001 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1002     gboolean * upstream_live, GstClockTime * min_latency,
1003     GstClockTime * max_latency)
1004 {
1005   gboolean l, us_live, res, have_latency;
1006   GstClockTime min, max, render_delay;
1007   GstQuery *query;
1008   GstClockTime us_min, us_max;
1009
1010   /* we are live when we sync to the clock */
1011   GST_OBJECT_LOCK (sink);
1012   l = sink->sync;
1013   have_latency = sink->priv->have_latency;
1014   render_delay = sink->priv->render_delay;
1015   GST_OBJECT_UNLOCK (sink);
1016
1017   /* assume no latency */
1018   min = 0;
1019   max = -1;
1020   us_live = FALSE;
1021
1022   if (have_latency) {
1023     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1024     /* we are ready for a latency query this is when we preroll or when we are
1025      * not async. */
1026     query = gst_query_new_latency ();
1027
1028     /* ask the peer for the latency */
1029     if ((res = gst_base_sink_peer_query (sink, query))) {
1030       /* get upstream min and max latency */
1031       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1032
1033       if (us_live) {
1034         /* upstream live, use its latency, subclasses should use these
1035          * values to create the complete latency. */
1036         min = us_min;
1037         max = us_max;
1038       }
1039       if (l) {
1040         /* we need to add the render delay if we are live */
1041         if (min != -1)
1042           min += render_delay;
1043         if (max != -1)
1044           max += render_delay;
1045       }
1046     }
1047     gst_query_unref (query);
1048   } else {
1049     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1050     res = FALSE;
1051   }
1052
1053   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1054   if (!res) {
1055     if (!l) {
1056       res = TRUE;
1057       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1058     } else {
1059       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1060     }
1061   }
1062
1063   if (res) {
1064     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1065         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1066         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1067
1068     if (live)
1069       *live = l;
1070     if (upstream_live)
1071       *upstream_live = us_live;
1072     if (min_latency)
1073       *min_latency = min;
1074     if (max_latency)
1075       *max_latency = max;
1076   }
1077   return res;
1078 }
1079
1080 /**
1081  * gst_base_sink_set_render_delay:
1082  * @sink: a #GstBaseSink
1083  * @delay: the new delay
1084  *
1085  * Set the render delay in @sink to @delay. The render delay is the time
1086  * between actual rendering of a buffer and its synchronisation time. Some
1087  * devices might delay media rendering which can be compensated for with this
1088  * function.
1089  *
1090  * After calling this function, this sink will report additional latency and
1091  * other sinks will adjust their latency to delay the rendering of their media.
1092  *
1093  * This function is usually called by subclasses.
1094  *
1095  * Since: 0.10.21
1096  */
1097 void
1098 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1099 {
1100   GstClockTime old_render_delay;
1101
1102   g_return_if_fail (GST_IS_BASE_SINK (sink));
1103
1104   GST_OBJECT_LOCK (sink);
1105   old_render_delay = sink->priv->render_delay;
1106   sink->priv->render_delay = delay;
1107   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1108       GST_TIME_ARGS (delay));
1109   GST_OBJECT_UNLOCK (sink);
1110
1111   if (delay != old_render_delay) {
1112     GST_DEBUG_OBJECT (sink, "posting latency changed");
1113     gst_element_post_message (GST_ELEMENT_CAST (sink),
1114         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1115   }
1116 }
1117
1118 /**
1119  * gst_base_sink_get_render_delay:
1120  * @sink: a #GstBaseSink
1121  *
1122  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1123  * information about the render delay.
1124  *
1125  * Returns: the render delay of @sink.
1126  *
1127  * Since: 0.10.21
1128  */
1129 GstClockTime
1130 gst_base_sink_get_render_delay (GstBaseSink * sink)
1131 {
1132   GstClockTimeDiff res;
1133
1134   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1135
1136   GST_OBJECT_LOCK (sink);
1137   res = sink->priv->render_delay;
1138   GST_OBJECT_UNLOCK (sink);
1139
1140   return res;
1141 }
1142
1143 /**
1144  * gst_base_sink_set_blocksize:
1145  * @sink: a #GstBaseSink
1146  * @blocksize: the blocksize in bytes
1147  *
1148  * Set the number of bytes that the sink will pull when it is operating in pull
1149  * mode.
1150  *
1151  * Since: 0.10.22
1152  */
1153 void
1154 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1155 {
1156   g_return_if_fail (GST_IS_BASE_SINK (sink));
1157
1158   GST_OBJECT_LOCK (sink);
1159   sink->priv->blocksize = blocksize;
1160   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1161   GST_OBJECT_UNLOCK (sink);
1162 }
1163
1164 /**
1165  * gst_base_sink_get_blocksize:
1166  * @sink: a #GstBaseSink
1167  *
1168  * Get the number of bytes that the sink will pull when it is operating in pull
1169  * mode.
1170  *
1171  * Returns: the number of bytes @sink will pull in pull mode.
1172  *
1173  * Since: 0.10.22
1174  */
1175 guint
1176 gst_base_sink_get_blocksize (GstBaseSink * sink)
1177 {
1178   guint res;
1179
1180   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1181
1182   GST_OBJECT_LOCK (sink);
1183   res = sink->priv->blocksize;
1184   GST_OBJECT_UNLOCK (sink);
1185
1186   return res;
1187 }
1188
1189 static void
1190 gst_base_sink_set_property (GObject * object, guint prop_id,
1191     const GValue * value, GParamSpec * pspec)
1192 {
1193   GstBaseSink *sink = GST_BASE_SINK (object);
1194
1195   switch (prop_id) {
1196     case PROP_PREROLL_QUEUE_LEN:
1197       /* preroll lock necessary to serialize with finish_preroll */
1198       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1199       sink->preroll_queue_max_len = g_value_get_uint (value);
1200       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1201       break;
1202     case PROP_SYNC:
1203       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1204       break;
1205     case PROP_MAX_LATENESS:
1206       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1207       break;
1208     case PROP_QOS:
1209       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1210       break;
1211     case PROP_ASYNC:
1212       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1213       break;
1214     case PROP_TS_OFFSET:
1215       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1216       break;
1217     case PROP_BLOCKSIZE:
1218       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1219       break;
1220     case PROP_RENDER_DELAY:
1221       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1222       break;
1223     default:
1224       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1225       break;
1226   }
1227 }
1228
1229 static void
1230 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1231     GParamSpec * pspec)
1232 {
1233   GstBaseSink *sink = GST_BASE_SINK (object);
1234
1235   switch (prop_id) {
1236     case PROP_PREROLL_QUEUE_LEN:
1237       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1238       g_value_set_uint (value, sink->preroll_queue_max_len);
1239       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1240       break;
1241     case PROP_SYNC:
1242       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1243       break;
1244     case PROP_MAX_LATENESS:
1245       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1246       break;
1247     case PROP_QOS:
1248       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1249       break;
1250     case PROP_ASYNC:
1251       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1252       break;
1253     case PROP_TS_OFFSET:
1254       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1255       break;
1256     case PROP_LAST_BUFFER:
1257       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1258       break;
1259     case PROP_BLOCKSIZE:
1260       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1261       break;
1262     case PROP_RENDER_DELAY:
1263       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1264       break;
1265     default:
1266       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1267       break;
1268   }
1269 }
1270
1271
1272 static GstCaps *
1273 gst_base_sink_get_caps (GstBaseSink * sink)
1274 {
1275   return NULL;
1276 }
1277
1278 static gboolean
1279 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1280 {
1281   return TRUE;
1282 }
1283
1284 static GstFlowReturn
1285 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1286     GstCaps * caps, GstBuffer ** buf)
1287 {
1288   *buf = NULL;
1289   return GST_FLOW_OK;
1290 }
1291
1292 /* with PREROLL_LOCK, STREAM_LOCK */
1293 static void
1294 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1295 {
1296   GstMiniObject *obj;
1297
1298   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1299   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1300     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1301     gst_mini_object_unref (obj);
1302   }
1303   /* we can't have EOS anymore now */
1304   basesink->eos = FALSE;
1305   basesink->priv->received_eos = FALSE;
1306   basesink->have_preroll = FALSE;
1307   basesink->priv->step_unlock = FALSE;
1308   basesink->eos_queued = FALSE;
1309   basesink->preroll_queued = 0;
1310   basesink->buffers_queued = 0;
1311   basesink->events_queued = 0;
1312   /* can't report latency anymore until we preroll again */
1313   if (basesink->priv->async_enabled) {
1314     GST_OBJECT_LOCK (basesink);
1315     basesink->priv->have_latency = FALSE;
1316     GST_OBJECT_UNLOCK (basesink);
1317   }
1318   /* and signal any waiters now */
1319   GST_PAD_PREROLL_SIGNAL (pad);
1320 }
1321
1322 /* with STREAM_LOCK, configures given segment with the event information. */
1323 static void
1324 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1325     GstEvent * event, GstSegment * segment)
1326 {
1327   gboolean update;
1328   gdouble rate, arate;
1329   GstFormat format;
1330   gint64 start;
1331   gint64 stop;
1332   gint64 time;
1333
1334   /* the newsegment event is needed to bring the buffer timestamps to the
1335    * stream time and to drop samples outside of the playback segment. */
1336   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1337       &start, &stop, &time);
1338
1339   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1340    * We protect with the OBJECT_LOCK so that we can use the values to
1341    * safely answer a POSITION query. */
1342   GST_OBJECT_LOCK (basesink);
1343   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1344       stop, time);
1345
1346   if (format == GST_FORMAT_TIME) {
1347     GST_DEBUG_OBJECT (basesink,
1348         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1349         "format GST_FORMAT_TIME, "
1350         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1351         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1352         update, rate, arate, GST_TIME_ARGS (segment->start),
1353         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1354         GST_TIME_ARGS (segment->accum));
1355   } else {
1356     GST_DEBUG_OBJECT (basesink,
1357         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1358         "format %d, "
1359         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1360         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1361         segment->format, segment->start, segment->stop, segment->time,
1362         segment->accum);
1363   }
1364   GST_OBJECT_UNLOCK (basesink);
1365 }
1366
1367 /* with PREROLL_LOCK, STREAM_LOCK */
1368 static gboolean
1369 gst_base_sink_commit_state (GstBaseSink * basesink)
1370 {
1371   /* commit state and proceed to next pending state */
1372   GstState current, next, pending, post_pending;
1373   gboolean post_paused = FALSE;
1374   gboolean post_async_done = FALSE;
1375   gboolean post_playing = FALSE;
1376
1377   /* we are certainly not playing async anymore now */
1378   basesink->playing_async = FALSE;
1379
1380   GST_OBJECT_LOCK (basesink);
1381   current = GST_STATE (basesink);
1382   next = GST_STATE_NEXT (basesink);
1383   pending = GST_STATE_PENDING (basesink);
1384   post_pending = pending;
1385
1386   switch (pending) {
1387     case GST_STATE_PLAYING:
1388     {
1389       GstBaseSinkClass *bclass;
1390       GstStateChangeReturn ret;
1391
1392       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1393
1394       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1395
1396       basesink->need_preroll = FALSE;
1397       post_async_done = TRUE;
1398       basesink->priv->commited = TRUE;
1399       post_playing = TRUE;
1400       /* post PAUSED too when we were READY */
1401       if (current == GST_STATE_READY) {
1402         post_paused = TRUE;
1403       }
1404
1405       /* make sure we notify the subclass of async playing */
1406       if (bclass->async_play) {
1407         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1408         ret = bclass->async_play (basesink);
1409         if (ret == GST_STATE_CHANGE_FAILURE)
1410           goto async_failed;
1411       }
1412       break;
1413     }
1414     case GST_STATE_PAUSED:
1415       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1416       post_paused = TRUE;
1417       post_async_done = TRUE;
1418       basesink->priv->commited = TRUE;
1419       post_pending = GST_STATE_VOID_PENDING;
1420       break;
1421     case GST_STATE_READY:
1422     case GST_STATE_NULL:
1423       goto stopping;
1424     case GST_STATE_VOID_PENDING:
1425       goto nothing_pending;
1426     default:
1427       break;
1428   }
1429
1430   /* we can report latency queries now */
1431   basesink->priv->have_latency = TRUE;
1432
1433   GST_STATE (basesink) = pending;
1434   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1435   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1436   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1437   GST_OBJECT_UNLOCK (basesink);
1438
1439   if (post_paused) {
1440     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1441     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1442         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1443             current, next, post_pending));
1444   }
1445   if (post_async_done) {
1446     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1447     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1448         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1449   }
1450   if (post_playing) {
1451     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1452     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1453         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1454             next, pending, GST_STATE_VOID_PENDING));
1455   }
1456
1457   GST_STATE_BROADCAST (basesink);
1458
1459   return TRUE;
1460
1461 nothing_pending:
1462   {
1463     /* Depending on the state, set our vars. We get in this situation when the
1464      * state change function got a change to update the state vars before the
1465      * streaming thread did. This is fine but we need to make sure that we
1466      * update the need_preroll var since it was TRUE when we got here and might
1467      * become FALSE if we got to PLAYING. */
1468     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1469         gst_element_state_get_name (current));
1470     switch (current) {
1471       case GST_STATE_PLAYING:
1472         basesink->need_preroll = FALSE;
1473         break;
1474       case GST_STATE_PAUSED:
1475         basesink->need_preroll = TRUE;
1476         break;
1477       default:
1478         basesink->need_preroll = FALSE;
1479         basesink->flushing = TRUE;
1480         break;
1481     }
1482     /* we can report latency queries now */
1483     basesink->priv->have_latency = TRUE;
1484     GST_OBJECT_UNLOCK (basesink);
1485     return TRUE;
1486   }
1487 stopping:
1488   {
1489     /* app is going to READY */
1490     GST_DEBUG_OBJECT (basesink, "stopping");
1491     basesink->need_preroll = FALSE;
1492     basesink->flushing = TRUE;
1493     GST_OBJECT_UNLOCK (basesink);
1494     return FALSE;
1495   }
1496 async_failed:
1497   {
1498     GST_DEBUG_OBJECT (basesink, "async commit failed");
1499     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1500     GST_OBJECT_UNLOCK (basesink);
1501     return FALSE;
1502   }
1503 }
1504
1505 static void
1506 start_stepping (GstBaseSink * sink, GstSegment * segment,
1507     GstStepInfo * pending, GstStepInfo * current)
1508 {
1509   gint64 end;
1510   GstMessage *message;
1511
1512   GST_DEBUG_OBJECT (sink, "update pending step");
1513
1514   GST_OBJECT_LOCK (sink);
1515   memcpy (current, pending, sizeof (GstStepInfo));
1516   pending->valid = FALSE;
1517   GST_OBJECT_UNLOCK (sink);
1518
1519   /* post message first */
1520   message =
1521       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1522       current->amount, current->rate, current->flush, current->intermediate);
1523   gst_message_set_seqnum (message, current->seqnum);
1524   gst_element_post_message (GST_ELEMENT (sink), message);
1525
1526   /* get the running time of where we paused and remember it */
1527   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1528   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1529
1530   /* set the new rate for the remainder of the segment */
1531   current->start_rate = segment->rate;
1532   segment->rate *= current->rate;
1533   segment->abs_rate = ABS (segment->rate);
1534
1535   /* save values */
1536   if (segment->rate > 0.0)
1537     current->start_stop = segment->stop;
1538   else
1539     current->start_start = segment->start;
1540
1541   if (current->format == GST_FORMAT_TIME) {
1542     end = current->start + current->amount;
1543     if (!current->flush) {
1544       /* update the segment clipping regions for non-flushing seeks */
1545       if (segment->rate > 0.0) {
1546         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1547         segment->last_stop = segment->stop;
1548       } else {
1549         gint64 position;
1550
1551         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1552         segment->time = position;
1553         segment->start = position;
1554         segment->last_stop = position;
1555       }
1556     }
1557   }
1558
1559   GST_DEBUG_OBJECT (sink,
1560       "segment now rate %lf, applied rate %lf, "
1561       "format GST_FORMAT_TIME, "
1562       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1563       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1564       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1565       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1566       GST_TIME_ARGS (segment->accum));
1567
1568   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1569       GST_TIME_ARGS (current->start));
1570
1571   if (current->amount == -1) {
1572     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1573     current->valid = FALSE;
1574   } else {
1575     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1576         "rate: %f", current->amount, gst_format_get_name (current->format),
1577         current->rate);
1578   }
1579 }
1580
1581 static void
1582 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1583     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1584 {
1585   gint64 stop, position;
1586   GstMessage *message;
1587
1588   GST_DEBUG_OBJECT (sink, "step complete");
1589
1590   if (segment->rate > 0.0)
1591     stop = rstart;
1592   else
1593     stop = rstop;
1594
1595   GST_DEBUG_OBJECT (sink,
1596       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1597
1598   if (stop == -1)
1599     current->duration = current->position;
1600   else
1601     current->duration = stop - current->start;
1602
1603   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1604       GST_TIME_ARGS (current->duration));
1605
1606   position = current->start + current->duration;
1607
1608   /* now move the segment to the new running time */
1609   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1610
1611   if (current->flush) {
1612     /* and remove the accumulated time we flushed, start time did not change */
1613     segment->accum = current->start;
1614   } else {
1615     /* start time is now the stepped position */
1616     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1617   }
1618
1619   /* restore the previous rate */
1620   segment->rate = current->start_rate;
1621   segment->abs_rate = ABS (segment->rate);
1622
1623   if (segment->rate > 0.0)
1624     segment->stop = current->start_stop;
1625   else
1626     segment->start = current->start_start;
1627
1628   /* the clip segment is used for position report in paused... */
1629   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1630
1631   /* post the step done when we know the stepped duration in TIME */
1632   message =
1633       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1634       current->amount, current->rate, current->flush, current->intermediate,
1635       current->duration, eos);
1636   gst_message_set_seqnum (message, current->seqnum);
1637   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1638
1639   if (!current->intermediate)
1640     sink->need_preroll = current->need_preroll;
1641
1642   /* and the current step info finished and becomes invalid */
1643   current->valid = FALSE;
1644 }
1645
1646 static gboolean
1647 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1648     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1649     gint64 * rstop)
1650 {
1651   gboolean step_end = FALSE;
1652
1653   /* see if we need to skip this buffer because of stepping */
1654   switch (current->format) {
1655     case GST_FORMAT_TIME:
1656     {
1657       guint64 end;
1658       gint64 first, last;
1659
1660       if (segment->rate > 0.0) {
1661         if (segment->stop == *cstop)
1662           *rstop = *rstart + current->amount;
1663
1664         first = *rstart;
1665         last = *rstop;
1666       } else {
1667         if (segment->start == *cstart)
1668           *rstart = *rstop + current->amount;
1669
1670         first = *rstop;
1671         last = *rstart;
1672       }
1673
1674       end = current->start + current->amount;
1675       current->position = first - current->start;
1676
1677       if (G_UNLIKELY (segment->abs_rate != 1.0))
1678         current->position /= segment->abs_rate;
1679
1680       GST_DEBUG_OBJECT (sink,
1681           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1682           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1683       GST_DEBUG_OBJECT (sink,
1684           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1685           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1686           GST_TIME_ARGS (last - current->start),
1687           GST_TIME_ARGS (current->amount));
1688
1689       if ((current->flush && current->position >= current->amount)
1690           || last >= end) {
1691         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1692         step_end = TRUE;
1693         if (segment->rate > 0.0) {
1694           *rstart = end;
1695           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1696         } else {
1697           *rstop = end;
1698           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1699         }
1700       }
1701       GST_DEBUG_OBJECT (sink,
1702           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1703           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1704       GST_DEBUG_OBJECT (sink,
1705           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1706           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1707       break;
1708     }
1709     case GST_FORMAT_BUFFERS:
1710       GST_DEBUG_OBJECT (sink,
1711           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1712           current->position, current->amount);
1713
1714       if (current->position < current->amount) {
1715         current->position++;
1716       } else {
1717         step_end = TRUE;
1718       }
1719       break;
1720     case GST_FORMAT_DEFAULT:
1721     default:
1722       GST_DEBUG_OBJECT (sink,
1723           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1724           current->position, current->amount);
1725       break;
1726   }
1727   return step_end;
1728 }
1729
1730 /* with STREAM_LOCK, PREROLL_LOCK
1731  *
1732  * Returns TRUE if the object needs synchronisation and takes therefore
1733  * part in prerolling.
1734  *
1735  * rsstart/rsstop contain the start/stop in stream time.
1736  * rrstart/rrstop contain the start/stop in running time.
1737  */
1738 static gboolean
1739 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1740     GstClockTime * rsstart, GstClockTime * rsstop,
1741     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1742     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1743     gboolean * step_end)
1744 {
1745   GstBaseSinkClass *bclass;
1746   GstBuffer *buffer;
1747   GstClockTime start, stop;     /* raw start/stop timestamps */
1748   gint64 cstart, cstop;         /* clipped raw timestamps */
1749   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1750   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1751   GstFormat format;
1752   GstBaseSinkPrivate *priv;
1753   gboolean eos;
1754
1755   priv = basesink->priv;
1756
1757   /* start with nothing */
1758   start = stop = GST_CLOCK_TIME_NONE;
1759
1760   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1761     GstEvent *event = GST_EVENT_CAST (obj);
1762
1763     switch (GST_EVENT_TYPE (event)) {
1764         /* EOS event needs syncing */
1765       case GST_EVENT_EOS:
1766       {
1767         if (basesink->segment.rate >= 0.0) {
1768           sstart = sstop = priv->current_sstop;
1769           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1770             /* we have not seen a buffer yet, use the segment values */
1771             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1772                 basesink->segment.format, basesink->segment.stop);
1773           }
1774         } else {
1775           sstart = sstop = priv->current_sstart;
1776           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1777             /* we have not seen a buffer yet, use the segment values */
1778             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1779                 basesink->segment.format, basesink->segment.start);
1780           }
1781         }
1782
1783         rstart = rstop = priv->eos_rtime;
1784         *do_sync = rstart != -1;
1785         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1786             GST_TIME_ARGS (rstart));
1787         /* if we are stepping, we end now */
1788         *step_end = step->valid;
1789         eos = TRUE;
1790         goto eos_done;
1791       }
1792       default:
1793         /* other events do not need syncing */
1794         /* FIXME, maybe NEWSEGMENT might need synchronisation
1795          * since the POSITION query depends on accumulated times and
1796          * we cannot accumulate the current segment before the previous
1797          * one completed.
1798          */
1799         return FALSE;
1800     }
1801   }
1802
1803   eos = FALSE;
1804
1805   /* else do buffer sync code */
1806   buffer = GST_BUFFER_CAST (obj);
1807
1808   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1809
1810   /* just get the times to see if we need syncing, if the start returns -1 we
1811    * don't sync. */
1812   if (bclass->get_times)
1813     bclass->get_times (basesink, buffer, &start, &stop);
1814
1815   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1816     /* we don't need to sync but we still want to get the timestamps for
1817      * tracking the position */
1818     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1819     *do_sync = FALSE;
1820   } else {
1821     *do_sync = TRUE;
1822   }
1823
1824   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1825       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1826       GST_TIME_ARGS (stop), *do_sync);
1827
1828   /* collect segment and format for code clarity */
1829   format = segment->format;
1830
1831   /* no timestamp clipping if we did not get a TIME segment format */
1832   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1833     cstart = start;
1834     cstop = stop;
1835     /* do running and stream time in TIME format */
1836     format = GST_FORMAT_TIME;
1837     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1838     goto do_times;
1839   }
1840
1841   /* clip, only when we know about time */
1842   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1843               (gint64) start, (gint64) stop, &cstart, &cstop))) {
1844     if (step->valid) {
1845       GST_DEBUG_OBJECT (basesink, "step out of segment");
1846       /* when we are stepping, pretend we're at the end of the segment */
1847       if (segment->rate > 0.0) {
1848         cstart = segment->stop;
1849         cstop = segment->stop;
1850       } else {
1851         cstart = segment->start;
1852         cstop = segment->start;
1853       }
1854       goto do_times;
1855     }
1856     goto out_of_segment;
1857   }
1858
1859   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1860     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1861         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1862         GST_TIME_ARGS (cstop));
1863   }
1864
1865   /* set last stop position */
1866   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1867     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1868   else
1869     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1870
1871 do_times:
1872   rstart = gst_segment_to_running_time (segment, format, cstart);
1873   rstop = gst_segment_to_running_time (segment, format, cstop);
1874
1875   if (G_UNLIKELY (step->valid)) {
1876     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1877                 &rstart, &rstop))) {
1878       /* step is still busy, we discard data when we are flushing */
1879       *stepped = step->flush;
1880       GST_DEBUG_OBJECT (basesink, "stepping busy");
1881     }
1882   }
1883   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1884    * upstream is behaving very badly */
1885   sstart = gst_segment_to_stream_time (segment, format, cstart);
1886   sstop = gst_segment_to_stream_time (segment, format, cstop);
1887
1888 eos_done:
1889   /* eos_done label only called when doing EOS, we also stop stepping then */
1890   if (*step_end && step->flush) {
1891     GST_DEBUG_OBJECT (basesink, "flushing step ended");
1892     stop_stepping (basesink, segment, step, rstart, rstop, eos);
1893     *step_end = FALSE;
1894   }
1895
1896   /* save times */
1897   *rsstart = sstart;
1898   *rsstop = sstop;
1899   *rrstart = rstart;
1900   *rrstop = rstop;
1901
1902   /* buffers and EOS always need syncing and preroll */
1903   return TRUE;
1904
1905   /* special cases */
1906 out_of_segment:
1907   {
1908     /* we usually clip in the chain function already but stepping could cause
1909      * the segment to be updated later. we return FALSE so that we don't try
1910      * to sync on it. */
1911     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1912     return FALSE;
1913   }
1914 }
1915
1916 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1917  * adjust a timestamp with the latency and timestamp offset */
1918 static GstClockTime
1919 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1920 {
1921   GstClockTimeDiff ts_offset;
1922
1923   /* don't do anything funny with invalid timestamps */
1924   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1925     return time;
1926
1927   time += basesink->priv->latency;
1928
1929   /* apply offset, be carefull for underflows */
1930   ts_offset = basesink->priv->ts_offset;
1931   if (ts_offset < 0) {
1932     ts_offset = -ts_offset;
1933     if (ts_offset < time)
1934       time -= ts_offset;
1935     else
1936       time = 0;
1937   } else
1938     time += ts_offset;
1939
1940   return time;
1941 }
1942
1943 /**
1944  * gst_base_sink_wait_clock:
1945  * @sink: the sink
1946  * @time: the running_time to be reached
1947  * @jitter: the jitter to be filled with time diff (can be NULL)
1948  *
1949  * This function will block until @time is reached. It is usually called by
1950  * subclasses that use their own internal synchronisation.
1951  *
1952  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1953  * returned. Likewise, if synchronisation is disabled in the element or there
1954  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1955  *
1956  * This function should only be called with the PREROLL_LOCK held, like when
1957  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
1958  * receiving a buffer in
1959  * the #GstBaseSinkClass.render() vmethod.
1960  *
1961  * The @time argument should be the running_time of when this method should
1962  * return and is not adjusted with any latency or offset configured in the
1963  * sink.
1964  *
1965  * Since 0.10.20
1966  *
1967  * Returns: #GstClockReturn
1968  */
1969 GstClockReturn
1970 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1971     GstClockTimeDiff * jitter)
1972 {
1973   GstClockID id;
1974   GstClockReturn ret;
1975   GstClock *clock;
1976   GstClockTime base_time;
1977
1978   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1979     goto invalid_time;
1980
1981   GST_OBJECT_LOCK (sink);
1982   if (G_UNLIKELY (!sink->sync))
1983     goto no_sync;
1984
1985   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1986     goto no_clock;
1987
1988   base_time = GST_ELEMENT_CAST (sink)->base_time;
1989   GST_LOG_OBJECT (sink,
1990       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
1991       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
1992
1993   /* add base_time to running_time to get the time against the clock */
1994   time += base_time;
1995
1996   id = gst_clock_new_single_shot_id (clock, time);
1997   GST_OBJECT_UNLOCK (sink);
1998
1999   /* A blocking wait is performed on the clock. We save the ClockID
2000    * so we can unlock the entry at any time. While we are blocking, we
2001    * release the PREROLL_LOCK so that other threads can interrupt the
2002    * entry. */
2003   sink->clock_id = id;
2004   /* release the preroll lock while waiting */
2005   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2006
2007   ret = gst_clock_id_wait (id, jitter);
2008
2009   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2010   gst_clock_id_unref (id);
2011   sink->clock_id = NULL;
2012
2013   return ret;
2014
2015   /* no syncing needed */
2016 invalid_time:
2017   {
2018     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2019     return GST_CLOCK_BADTIME;
2020   }
2021 no_sync:
2022   {
2023     GST_DEBUG_OBJECT (sink, "sync disabled");
2024     GST_OBJECT_UNLOCK (sink);
2025     return GST_CLOCK_BADTIME;
2026   }
2027 no_clock:
2028   {
2029     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2030     GST_OBJECT_UNLOCK (sink);
2031     return GST_CLOCK_BADTIME;
2032   }
2033 }
2034
2035 /**
2036  * gst_base_sink_wait_preroll:
2037  * @sink: the sink
2038  *
2039  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2040  * against the clock it must unblock when going from PLAYING to the PAUSED state
2041  * and call this method before continuing to render the remaining data.
2042  *
2043  * This function will block until a state change to PLAYING happens (in which
2044  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2045  * to a state change to READY or a FLUSH event (in which case this function
2046  * returns #GST_FLOW_WRONG_STATE).
2047  *
2048  * This function should only be called with the PREROLL_LOCK held, like in the
2049  * render function.
2050  *
2051  * Since: 0.10.11
2052  *
2053  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2054  * continue. Any other return value should be returned from the render vmethod.
2055  */
2056 GstFlowReturn
2057 gst_base_sink_wait_preroll (GstBaseSink * sink)
2058 {
2059   sink->have_preroll = TRUE;
2060   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2061   /* block until the state changes, or we get a flush, or something */
2062   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2063   sink->have_preroll = FALSE;
2064   if (G_UNLIKELY (sink->flushing))
2065     goto stopping;
2066   if (G_UNLIKELY (sink->priv->step_unlock))
2067     goto step_unlocked;
2068   GST_DEBUG_OBJECT (sink, "continue after preroll");
2069
2070   return GST_FLOW_OK;
2071
2072   /* ERRORS */
2073 stopping:
2074   {
2075     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2076     return GST_FLOW_WRONG_STATE;
2077   }
2078 step_unlocked:
2079   {
2080     sink->priv->step_unlock = FALSE;
2081     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2082     return GST_FLOW_STEP;
2083   }
2084 }
2085
2086 /**
2087  * gst_base_sink_do_preroll:
2088  * @sink: the sink
2089  * @obj: the object that caused the preroll
2090  *
2091  * If the @sink spawns its own thread for pulling buffers from upstream it
2092  * should call this method after it has pulled a buffer. If the element needed
2093  * to preroll, this function will perform the preroll and will then block
2094  * until the element state is changed.
2095  *
2096  * This function should be called with the PREROLL_LOCK held.
2097  *
2098  * Since 0.10.22
2099  *
2100  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2101  * continue. Any other return value should be returned from the render vmethod.
2102  */
2103 GstFlowReturn
2104 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2105 {
2106   GstFlowReturn ret;
2107
2108   while (G_UNLIKELY (sink->need_preroll)) {
2109     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2110
2111     ret = gst_base_sink_preroll_object (sink, FALSE, obj);
2112     if (ret != GST_FLOW_OK)
2113       goto preroll_failed;
2114
2115     /* need to recheck here because the commit state could have
2116      * made us not need the preroll anymore */
2117     if (G_LIKELY (sink->need_preroll)) {
2118       /* block until the state changes, or we get a flush, or something */
2119       ret = gst_base_sink_wait_preroll (sink);
2120       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2121         goto preroll_failed;
2122     }
2123   }
2124   return GST_FLOW_OK;
2125
2126   /* ERRORS */
2127 preroll_failed:
2128   {
2129     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2130     return ret;
2131   }
2132 }
2133
2134 /**
2135  * gst_base_sink_wait_eos:
2136  * @sink: the sink
2137  * @time: the running_time to be reached
2138  * @jitter: the jitter to be filled with time diff (can be NULL)
2139  *
2140  * This function will block until @time is reached. It is usually called by
2141  * subclasses that use their own internal synchronisation but want to let the
2142  * EOS be handled by the base class.
2143  *
2144  * This function should only be called with the PREROLL_LOCK held, like when
2145  * receiving an EOS event in the ::event vmethod.
2146  *
2147  * The @time argument should be the running_time of when the EOS should happen
2148  * and will be adjusted with any latency and offset configured in the sink.
2149  *
2150  * Since 0.10.15
2151  *
2152  * Returns: #GstFlowReturn
2153  */
2154 GstFlowReturn
2155 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2156     GstClockTimeDiff * jitter)
2157 {
2158   GstClockReturn status;
2159   GstFlowReturn ret;
2160
2161   do {
2162     GstClockTime stime;
2163
2164     GST_DEBUG_OBJECT (sink, "checking preroll");
2165
2166     /* first wait for the playing state before we can continue */
2167     if (G_UNLIKELY (sink->need_preroll)) {
2168       ret = gst_base_sink_wait_preroll (sink);
2169       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2170         goto flushing;
2171     }
2172
2173     /* preroll done, we can sync since we are in PLAYING now. */
2174     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2175         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2176
2177     /* compensate for latency and ts_offset. We don't adjust for render delay
2178      * because we don't interact with the device on EOS normally. */
2179     stime = gst_base_sink_adjust_time (sink, time);
2180
2181     /* wait for the clock, this can be interrupted because we got shut down or
2182      * we PAUSED. */
2183     status = gst_base_sink_wait_clock (sink, stime, jitter);
2184
2185     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2186
2187     /* invalid time, no clock or sync disabled, just continue then */
2188     if (status == GST_CLOCK_BADTIME)
2189       break;
2190
2191     /* waiting could have been interrupted and we can be flushing now */
2192     if (G_UNLIKELY (sink->flushing))
2193       goto flushing;
2194
2195     /* retry if we got unscheduled, which means we did not reach the timeout
2196      * yet. if some other error occures, we continue. */
2197   } while (status == GST_CLOCK_UNSCHEDULED);
2198
2199   GST_DEBUG_OBJECT (sink, "end of stream");
2200
2201   return GST_FLOW_OK;
2202
2203   /* ERRORS */
2204 flushing:
2205   {
2206     GST_DEBUG_OBJECT (sink, "we are flushing");
2207     return GST_FLOW_WRONG_STATE;
2208   }
2209 }
2210
2211 /* with STREAM_LOCK, PREROLL_LOCK
2212  *
2213  * Make sure we are in PLAYING and synchronize an object to the clock.
2214  *
2215  * If we need preroll, we are not in PLAYING. We try to commit the state
2216  * if needed and then block if we still are not PLAYING.
2217  *
2218  * We start waiting on the clock in PLAYING. If we got interrupted, we
2219  * immediatly try to re-preroll.
2220  *
2221  * Some objects do not need synchronisation (most events) and so this function
2222  * immediatly returns GST_FLOW_OK.
2223  *
2224  * for objects that arrive later than max-lateness to be synchronized to the
2225  * clock have the @late boolean set to TRUE.
2226  *
2227  * This function keeps a running average of the jitter (the diff between the
2228  * clock time and the requested sync time). The jitter is negative for
2229  * objects that arrive in time and positive for late buffers.
2230  *
2231  * does not take ownership of obj.
2232  */
2233 static GstFlowReturn
2234 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2235     GstMiniObject * obj, gboolean * late, gboolean * step_end)
2236 {
2237   GstClockTimeDiff jitter;
2238   gboolean syncable;
2239   GstClockReturn status = GST_CLOCK_OK;
2240   GstClockTime rstart, rstop, sstart, sstop, stime;
2241   gboolean do_sync;
2242   GstBaseSinkPrivate *priv;
2243   GstFlowReturn ret;
2244   GstStepInfo *current, *pending;
2245   gboolean stepped;
2246
2247   priv = basesink->priv;
2248
2249 do_step:
2250   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2251   do_sync = TRUE;
2252   stepped = FALSE;
2253
2254   priv->current_rstart = GST_CLOCK_TIME_NONE;
2255
2256   /* get stepping info */
2257   current = &priv->current_step;
2258   pending = &priv->pending_step;
2259
2260   /* get timing information for this object against the render segment */
2261   syncable = gst_base_sink_get_sync_times (basesink, obj,
2262       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2263       current, step_end);
2264
2265   if (G_UNLIKELY (stepped))
2266     goto step_skipped;
2267
2268   /* a syncable object needs to participate in preroll and
2269    * clocking. All buffers and EOS are syncable. */
2270   if (G_UNLIKELY (!syncable))
2271     goto not_syncable;
2272
2273   /* store timing info for current object */
2274   priv->current_rstart = rstart;
2275   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2276
2277   /* save sync time for eos when the previous object needed sync */
2278   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2279
2280 again:
2281   /* first do preroll, this makes sure we commit our state
2282    * to PAUSED and can continue to PLAYING. We cannot perform
2283    * any clock sync in PAUSED because there is no clock. */
2284   ret = gst_base_sink_do_preroll (basesink, obj);
2285   if (G_UNLIKELY (ret != GST_FLOW_OK))
2286     goto preroll_failed;
2287
2288   /* update the segment with a pending step if the current one is invalid and we
2289    * have a new pending one. We only accept new step updates after a preroll */
2290   if (G_UNLIKELY (pending->valid && !current->valid)) {
2291     start_stepping (basesink, &basesink->segment, pending, current);
2292     goto do_step;
2293   }
2294
2295   /* After rendering we store the position of the last buffer so that we can use
2296    * it to report the position. We need to take the lock here. */
2297   GST_OBJECT_LOCK (basesink);
2298   priv->current_sstart = sstart;
2299   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2300   GST_OBJECT_UNLOCK (basesink);
2301
2302   if (!do_sync)
2303     goto done;
2304
2305   /* adjust for latency */
2306   stime = gst_base_sink_adjust_time (basesink, rstart);
2307
2308   /* adjust for render-delay, avoid underflows */
2309   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2310     if (stime > priv->render_delay)
2311       stime -= priv->render_delay;
2312     else
2313       stime = 0;
2314   }
2315
2316   /* preroll done, we can sync since we are in PLAYING now. */
2317   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2318       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2319       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2320
2321   /* This function will return immediatly if start == -1, no clock
2322    * or sync is disabled with GST_CLOCK_BADTIME. */
2323   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2324
2325   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
2326
2327   /* invalid time, no clock or sync disabled, just render */
2328   if (status == GST_CLOCK_BADTIME)
2329     goto done;
2330
2331   /* waiting could have been interrupted and we can be flushing now */
2332   if (G_UNLIKELY (basesink->flushing))
2333     goto flushing;
2334
2335   /* check for unlocked by a state change, we are not flushing so
2336    * we can try to preroll on the current buffer. */
2337   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2338     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2339     priv->call_preroll = TRUE;
2340     goto again;
2341   }
2342
2343   /* successful syncing done, record observation */
2344   priv->current_jitter = jitter;
2345
2346   /* check if the object should be dropped */
2347   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2348       status, jitter);
2349
2350 done:
2351   return GST_FLOW_OK;
2352
2353   /* ERRORS */
2354 step_skipped:
2355   {
2356     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2357     *late = TRUE;
2358     return GST_FLOW_OK;
2359   }
2360 not_syncable:
2361   {
2362     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2363     return GST_FLOW_OK;
2364   }
2365 flushing:
2366   {
2367     GST_DEBUG_OBJECT (basesink, "we are flushing");
2368     return GST_FLOW_WRONG_STATE;
2369   }
2370 preroll_failed:
2371   {
2372     GST_DEBUG_OBJECT (basesink, "preroll failed");
2373     *step_end = FALSE;
2374     return ret;
2375   }
2376 }
2377
2378 static gboolean
2379 gst_base_sink_send_qos (GstBaseSink * basesink,
2380     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2381 {
2382   GstEvent *event;
2383   gboolean res;
2384
2385   /* generate Quality-of-Service event */
2386   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2387       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2388       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2389
2390   event = gst_event_new_qos (proportion, diff, time);
2391
2392   /* send upstream */
2393   res = gst_pad_push_event (basesink->sinkpad, event);
2394
2395   return res;
2396 }
2397
2398 static void
2399 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2400 {
2401   GstBaseSinkPrivate *priv;
2402   GstClockTime start, stop;
2403   GstClockTimeDiff jitter;
2404   GstClockTime pt, entered, left;
2405   GstClockTime duration;
2406   gdouble rate;
2407
2408   priv = sink->priv;
2409
2410   start = priv->current_rstart;
2411
2412   if (priv->current_step.valid)
2413     return;
2414
2415   /* if Quality-of-Service disabled, do nothing */
2416   if (!g_atomic_int_get (&priv->qos_enabled) ||
2417       !GST_CLOCK_TIME_IS_VALID (start))
2418     return;
2419
2420   stop = priv->current_rstop;
2421   jitter = priv->current_jitter;
2422
2423   if (jitter < 0) {
2424     /* this is the time the buffer entered the sink */
2425     if (start < -jitter)
2426       entered = 0;
2427     else
2428       entered = start + jitter;
2429     left = start;
2430   } else {
2431     /* this is the time the buffer entered the sink */
2432     entered = start + jitter;
2433     /* this is the time the buffer left the sink */
2434     left = start + jitter;
2435   }
2436
2437   /* calculate duration of the buffer */
2438   if (GST_CLOCK_TIME_IS_VALID (stop))
2439     duration = stop - start;
2440   else
2441     duration = GST_CLOCK_TIME_NONE;
2442
2443   /* if we have the time when the last buffer left us, calculate
2444    * processing time */
2445   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2446     if (entered > priv->last_left) {
2447       pt = entered - priv->last_left;
2448     } else {
2449       pt = 0;
2450     }
2451   } else {
2452     pt = priv->avg_pt;
2453   }
2454
2455   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2456       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2457       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2458       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2459       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2460       jitter);
2461
2462   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2463       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2464       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2465       priv->avg_rate);
2466
2467   /* collect running averages. for first observations, we copy the
2468    * values */
2469   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2470     priv->avg_duration = duration;
2471   else
2472     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2473
2474   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2475     priv->avg_pt = pt;
2476   else
2477     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2478
2479   if (priv->avg_duration != 0)
2480     rate =
2481         gst_guint64_to_gdouble (priv->avg_pt) /
2482         gst_guint64_to_gdouble (priv->avg_duration);
2483   else
2484     rate = 0.0;
2485
2486   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2487     if (dropped || priv->avg_rate < 0.0) {
2488       priv->avg_rate = rate;
2489     } else {
2490       if (rate > 1.0)
2491         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2492       else
2493         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2494     }
2495   }
2496
2497   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2498       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2499       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2500       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2501
2502
2503   if (priv->avg_rate >= 0.0) {
2504     /* if we have a valid rate, start sending QoS messages */
2505     if (priv->current_jitter < 0) {
2506       /* make sure we never go below 0 when adding the jitter to the
2507        * timestamp. */
2508       if (priv->current_rstart < -priv->current_jitter)
2509         priv->current_jitter = -priv->current_rstart;
2510     }
2511     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2512         priv->current_jitter);
2513   }
2514
2515   /* record when this buffer will leave us */
2516   priv->last_left = left;
2517 }
2518
2519 /* reset all qos measuring */
2520 static void
2521 gst_base_sink_reset_qos (GstBaseSink * sink)
2522 {
2523   GstBaseSinkPrivate *priv;
2524
2525   priv = sink->priv;
2526
2527   priv->last_in_time = GST_CLOCK_TIME_NONE;
2528   priv->last_left = GST_CLOCK_TIME_NONE;
2529   priv->avg_duration = GST_CLOCK_TIME_NONE;
2530   priv->avg_pt = GST_CLOCK_TIME_NONE;
2531   priv->avg_rate = -1.0;
2532   priv->avg_render = GST_CLOCK_TIME_NONE;
2533   priv->rendered = 0;
2534   priv->dropped = 0;
2535
2536 }
2537
2538 /* Checks if the object was scheduled too late.
2539  *
2540  * start/stop contain the raw timestamp start and stop values
2541  * of the object.
2542  *
2543  * status and jitter contain the return values from the clock wait.
2544  *
2545  * returns TRUE if the buffer was too late.
2546  */
2547 static gboolean
2548 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2549     GstClockTime start, GstClockTime stop,
2550     GstClockReturn status, GstClockTimeDiff jitter)
2551 {
2552   gboolean late;
2553   gint64 max_lateness;
2554   GstBaseSinkPrivate *priv;
2555
2556   priv = basesink->priv;
2557
2558   late = FALSE;
2559
2560   /* only for objects that were too late */
2561   if (G_LIKELY (status != GST_CLOCK_EARLY))
2562     goto in_time;
2563
2564   max_lateness = basesink->abidata.ABI.max_lateness;
2565
2566   /* check if frame dropping is enabled */
2567   if (max_lateness == -1)
2568     goto no_drop;
2569
2570   /* only check for buffers */
2571   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2572     goto not_buffer;
2573
2574   /* can't do check if we don't have a timestamp */
2575   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (start)))
2576     goto no_timestamp;
2577
2578   /* we can add a valid stop time */
2579   if (GST_CLOCK_TIME_IS_VALID (stop))
2580     max_lateness += stop;
2581   else
2582     max_lateness += start;
2583
2584   /* if the jitter bigger than duration and lateness we are too late */
2585   if ((late = start + jitter > max_lateness)) {
2586     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2587         "buffer is too late %" GST_TIME_FORMAT
2588         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2589         GST_TIME_ARGS (max_lateness));
2590     /* !!emergency!!, if we did not receive anything valid for more than a
2591      * second, render it anyway so the user sees something */
2592     if (GST_CLOCK_TIME_IS_VALID (priv->last_in_time) &&
2593         start - priv->last_in_time > GST_SECOND) {
2594       late = FALSE;
2595       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2596           (_("A lot of buffers are being dropped.")),
2597           ("There may be a timestamping problem, or this computer is too slow."));
2598       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2599           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2600           GST_TIME_ARGS (priv->last_in_time));
2601     }
2602   }
2603
2604 done:
2605   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_in_time)) {
2606     priv->last_in_time = start;
2607   }
2608   return late;
2609
2610   /* all is fine */
2611 in_time:
2612   {
2613     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2614     goto done;
2615   }
2616 no_drop:
2617   {
2618     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2619     goto done;
2620   }
2621 not_buffer:
2622   {
2623     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2624     return FALSE;
2625   }
2626 no_timestamp:
2627   {
2628     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2629     return FALSE;
2630   }
2631 }
2632
2633 /* called before and after calling the render vmethod. It keeps track of how
2634  * much time was spent in the render method and is used to check if we are
2635  * flooded */
2636 static void
2637 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2638 {
2639   GstBaseSinkPrivate *priv;
2640
2641   priv = basesink->priv;
2642
2643   if (start) {
2644     priv->start = gst_util_get_timestamp ();
2645   } else {
2646     GstClockTime elapsed;
2647
2648     priv->stop = gst_util_get_timestamp ();
2649
2650     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2651
2652     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2653       priv->avg_render = elapsed;
2654     else
2655       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2656
2657     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2658         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2659   }
2660 }
2661
2662 /* with STREAM_LOCK, PREROLL_LOCK,
2663  *
2664  * Synchronize the object on the clock and then render it.
2665  *
2666  * takes ownership of obj.
2667  */
2668 static GstFlowReturn
2669 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2670     gboolean is_list, gpointer obj)
2671 {
2672   GstFlowReturn ret;
2673   GstBaseSinkClass *bclass;
2674   gboolean late, step_end;
2675   gpointer sync_obj;
2676
2677   GstBaseSinkPrivate *priv;
2678
2679   priv = basesink->priv;
2680
2681   if (is_list) {
2682     /*
2683      * If buffer list, use the first group buffer within the list
2684      * for syncing
2685      */
2686     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2687     g_assert (NULL != sync_obj);
2688   } else {
2689     sync_obj = obj;
2690   }
2691
2692 again:
2693   late = FALSE;
2694   step_end = FALSE;
2695
2696   /* synchronize this object, non syncable objects return OK
2697    * immediatly. */
2698   ret = gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end);
2699   if (G_UNLIKELY (ret != GST_FLOW_OK))
2700     goto sync_failed;
2701
2702   /* and now render, event or buffer/buffer list. */
2703   if (G_LIKELY (is_list || GST_IS_BUFFER (obj))) {
2704     /* drop late buffers unconditionally, let's hope it's unlikely */
2705     if (G_UNLIKELY (late))
2706       goto dropped;
2707
2708     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2709
2710     if (G_LIKELY ((is_list && bclass->render_list) ||
2711             (!is_list && bclass->render))) {
2712       gint do_qos;
2713
2714       /* read once, to get same value before and after */
2715       do_qos = g_atomic_int_get (&priv->qos_enabled);
2716
2717       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2718
2719       /* record rendering time for QoS and stats */
2720       if (do_qos)
2721         gst_base_sink_do_render_stats (basesink, TRUE);
2722
2723       if (!is_list) {
2724         GstBuffer *buf;
2725
2726         /* For buffer lists do not set last buffer. Creating buffer
2727          * with meaningful data can be done only with memcpy which will
2728          * significantly affect performance */
2729         buf = GST_BUFFER_CAST (obj);
2730         gst_base_sink_set_last_buffer (basesink, buf);
2731
2732         ret = bclass->render (basesink, buf);
2733       } else {
2734         GstBufferList *buflist;
2735
2736         buflist = GST_BUFFER_LIST_CAST (obj);
2737
2738         ret = bclass->render_list (basesink, buflist);
2739       }
2740
2741       if (do_qos)
2742         gst_base_sink_do_render_stats (basesink, FALSE);
2743
2744       if (ret == GST_FLOW_STEP)
2745         goto again;
2746
2747       if (G_UNLIKELY (basesink->flushing))
2748         goto flushing;
2749
2750       priv->rendered++;
2751     }
2752   } else {
2753     GstEvent *event = GST_EVENT_CAST (obj);
2754     gboolean event_res = TRUE;
2755     GstEventType type;
2756
2757     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2758
2759     type = GST_EVENT_TYPE (event);
2760
2761     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2762         gst_event_type_get_name (type));
2763
2764     if (bclass->event)
2765       event_res = bclass->event (basesink, event);
2766
2767     /* when we get here we could be flushing again when the event handler calls
2768      * _wait_eos(). We have to ignore this object in that case. */
2769     if (G_UNLIKELY (basesink->flushing))
2770       goto flushing;
2771
2772     if (G_LIKELY (event_res)) {
2773       guint32 seqnum;
2774
2775       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2776       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2777
2778       switch (type) {
2779         case GST_EVENT_EOS:
2780         {
2781           GstMessage *message;
2782
2783           /* the EOS event is completely handled so we mark
2784            * ourselves as being in the EOS state. eos is also
2785            * protected by the object lock so we can read it when
2786            * answering the POSITION query. */
2787           GST_OBJECT_LOCK (basesink);
2788           basesink->eos = TRUE;
2789           GST_OBJECT_UNLOCK (basesink);
2790
2791           /* ok, now we can post the message */
2792           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2793
2794           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2795           gst_message_set_seqnum (message, seqnum);
2796           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2797           break;
2798         }
2799         case GST_EVENT_NEWSEGMENT:
2800           /* configure the segment */
2801           gst_base_sink_configure_segment (basesink, pad, event,
2802               &basesink->segment);
2803           break;
2804         case GST_EVENT_SINK_MESSAGE:{
2805           GstMessage *msg = NULL;
2806
2807           gst_event_parse_sink_message (event, &msg);
2808
2809           if (msg)
2810             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
2811         }
2812         default:
2813           break;
2814       }
2815     }
2816   }
2817
2818 done:
2819   if (step_end) {
2820     /* the step ended, check if we need to activate a new step */
2821     GST_DEBUG_OBJECT (basesink, "step ended");
2822     stop_stepping (basesink, &basesink->segment, &priv->current_step,
2823         priv->current_rstart, priv->current_rstop, basesink->eos);
2824     goto again;
2825   }
2826
2827   gst_base_sink_perform_qos (basesink, late);
2828
2829   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2830   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
2831   return ret;
2832
2833   /* ERRORS */
2834 sync_failed:
2835   {
2836     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2837     goto done;
2838   }
2839 dropped:
2840   {
2841     priv->dropped++;
2842     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2843     goto done;
2844   }
2845 flushing:
2846   {
2847     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2848     gst_mini_object_unref (obj);
2849     return GST_FLOW_WRONG_STATE;
2850   }
2851 }
2852
2853 /* with STREAM_LOCK, PREROLL_LOCK
2854  *
2855  * Perform preroll on the given object. For buffers this means
2856  * calling the preroll subclass method.
2857  * If that succeeds, the state will be commited.
2858  *
2859  * function does not take ownership of obj.
2860  */
2861 static GstFlowReturn
2862 gst_base_sink_preroll_object (GstBaseSink * basesink, gboolean is_list,
2863     GstMiniObject * obj)
2864 {
2865   GstFlowReturn ret;
2866
2867   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2868
2869   /* if it's a buffer, we need to call the preroll method */
2870   if (G_LIKELY (is_list || GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2871     GstBaseSinkClass *bclass;
2872     GstBuffer *buf;
2873     GstClockTime timestamp;
2874
2875     if (is_list) {
2876       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2877       g_assert (NULL != buf);
2878     } else {
2879       buf = GST_BUFFER_CAST (obj);
2880     }
2881
2882     timestamp = GST_BUFFER_TIMESTAMP (buf);
2883
2884     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2885         GST_TIME_ARGS (timestamp));
2886
2887     /*
2888      * For buffer lists do not set last buffer. Creating buffer
2889      * with meaningful data can be done only with memcpy which will
2890      * significantly affect performance
2891      */
2892     if (!is_list) {
2893       gst_base_sink_set_last_buffer (basesink, buf);
2894     }
2895
2896     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2897     if (bclass->preroll)
2898       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2899         goto preroll_failed;
2900
2901     basesink->priv->call_preroll = FALSE;
2902   }
2903
2904   /* commit state */
2905   if (G_LIKELY (basesink->playing_async)) {
2906     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2907       goto stopping;
2908   }
2909
2910   return GST_FLOW_OK;
2911
2912   /* ERRORS */
2913 preroll_failed:
2914   {
2915     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2916     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2917     return ret;
2918   }
2919 stopping:
2920   {
2921     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2922     return GST_FLOW_WRONG_STATE;
2923   }
2924 }
2925
2926 /* with STREAM_LOCK, PREROLL_LOCK
2927  *
2928  * Queue an object for rendering.
2929  * The first prerollable object queued will complete the preroll. If the
2930  * preroll queue if filled, we render all the objects in the queue.
2931  *
2932  * This function takes ownership of the object.
2933  */
2934 static GstFlowReturn
2935 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2936     gboolean is_list, gpointer obj, gboolean prerollable)
2937 {
2938   GstFlowReturn ret = GST_FLOW_OK;
2939   gint length;
2940   GQueue *q;
2941
2942   if (G_UNLIKELY (basesink->need_preroll)) {
2943     if (G_LIKELY (prerollable))
2944       basesink->preroll_queued++;
2945
2946     length = basesink->preroll_queued;
2947
2948     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2949
2950     /* first prerollable item needs to finish the preroll */
2951     if (length == 1) {
2952       ret = gst_base_sink_preroll_object (basesink, is_list, obj);
2953       if (G_UNLIKELY (ret != GST_FLOW_OK))
2954         goto preroll_failed;
2955     }
2956     /* need to recheck if we need preroll, commmit state during preroll
2957      * could have made us not need more preroll. */
2958     if (G_UNLIKELY (basesink->need_preroll)) {
2959       /* see if we can render now, if we can't add the object to the preroll
2960        * queue. */
2961       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2962         goto more_preroll;
2963     }
2964   }
2965   /* we can start rendering (or blocking) the queued object
2966    * if any. */
2967   q = basesink->preroll_queue;
2968   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2969     GstMiniObject *o;
2970
2971     o = g_queue_pop_head (q);
2972     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2973
2974     /* do something with the return value */
2975     ret = gst_base_sink_render_object (basesink, pad, FALSE, o);
2976     if (ret != GST_FLOW_OK)
2977       goto dequeue_failed;
2978   }
2979
2980   /* now render the object */
2981   ret = gst_base_sink_render_object (basesink, pad, is_list, obj);
2982   basesink->preroll_queued = 0;
2983
2984   return ret;
2985
2986   /* special cases */
2987 preroll_failed:
2988   {
2989     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2990         gst_flow_get_name (ret));
2991     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
2992     return ret;
2993   }
2994 more_preroll:
2995   {
2996     /* add object to the queue and return */
2997     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2998         length, basesink->preroll_queue_max_len);
2999     g_queue_push_tail (basesink->preroll_queue, obj);
3000     return GST_FLOW_OK;
3001   }
3002 dequeue_failed:
3003   {
3004     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3005         gst_flow_get_name (ret));
3006     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3007     return ret;
3008   }
3009 }
3010
3011 /* with STREAM_LOCK
3012  *
3013  * This function grabs the PREROLL_LOCK and adds the object to
3014  * the queue.
3015  *
3016  * This function takes ownership of obj.
3017  */
3018 static GstFlowReturn
3019 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3020     GstMiniObject * obj, gboolean prerollable)
3021 {
3022   GstFlowReturn ret;
3023
3024   GST_PAD_PREROLL_LOCK (pad);
3025   if (G_UNLIKELY (basesink->flushing))
3026     goto flushing;
3027
3028   if (G_UNLIKELY (basesink->priv->received_eos))
3029     goto was_eos;
3030
3031   ret =
3032       gst_base_sink_queue_object_unlocked (basesink, pad, FALSE, obj,
3033       prerollable);
3034   GST_PAD_PREROLL_UNLOCK (pad);
3035
3036   return ret;
3037
3038   /* ERRORS */
3039 flushing:
3040   {
3041     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3042     GST_PAD_PREROLL_UNLOCK (pad);
3043     gst_mini_object_unref (obj);
3044     return GST_FLOW_WRONG_STATE;
3045   }
3046 was_eos:
3047   {
3048     GST_DEBUG_OBJECT (basesink,
3049         "we are EOS, dropping object, return UNEXPECTED");
3050     GST_PAD_PREROLL_UNLOCK (pad);
3051     gst_mini_object_unref (obj);
3052     return GST_FLOW_UNEXPECTED;
3053   }
3054 }
3055
3056 static void
3057 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3058 {
3059   /* make sure we are not blocked on the clock also clear any pending
3060    * eos state. */
3061   gst_base_sink_set_flushing (basesink, pad, TRUE);
3062
3063   /* we grab the stream lock but that is not needed since setting the
3064    * sink to flushing would make sure no state commit is being done
3065    * anymore */
3066   GST_PAD_STREAM_LOCK (pad);
3067   gst_base_sink_reset_qos (basesink);
3068   if (basesink->priv->async_enabled) {
3069     /* and we need to commit our state again on the next
3070      * prerolled buffer */
3071     basesink->playing_async = TRUE;
3072     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3073   } else {
3074     basesink->priv->have_latency = TRUE;
3075     basesink->need_preroll = FALSE;
3076   }
3077   gst_base_sink_set_last_buffer (basesink, NULL);
3078   GST_PAD_STREAM_UNLOCK (pad);
3079 }
3080
3081 static void
3082 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3083 {
3084   /* unset flushing so we can accept new data, this also flushes out any EOS
3085    * event. */
3086   gst_base_sink_set_flushing (basesink, pad, FALSE);
3087
3088   /* for position reporting */
3089   GST_OBJECT_LOCK (basesink);
3090   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3091   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3092   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3093   basesink->priv->call_preroll = TRUE;
3094   basesink->priv->current_step.valid = FALSE;
3095   basesink->priv->pending_step.valid = FALSE;
3096   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3097     /* we need new segment info after the flush. */
3098     basesink->have_newsegment = FALSE;
3099     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3100     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3101   }
3102   GST_OBJECT_UNLOCK (basesink);
3103 }
3104
3105 static gboolean
3106 gst_base_sink_event (GstPad * pad, GstEvent * event)
3107 {
3108   GstBaseSink *basesink;
3109   gboolean result = TRUE;
3110   GstBaseSinkClass *bclass;
3111
3112   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3113
3114   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3115
3116   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
3117       GST_EVENT_TYPE_NAME (event));
3118
3119   switch (GST_EVENT_TYPE (event)) {
3120     case GST_EVENT_EOS:
3121     {
3122       GstFlowReturn ret;
3123
3124       GST_PAD_PREROLL_LOCK (pad);
3125       if (G_UNLIKELY (basesink->flushing))
3126         goto flushing;
3127
3128       if (G_UNLIKELY (basesink->priv->received_eos)) {
3129         /* we can't accept anything when we are EOS */
3130         result = FALSE;
3131         gst_event_unref (event);
3132       } else {
3133         /* we set the received EOS flag here so that we can use it when testing if
3134          * we are prerolled and to refuse more buffers. */
3135         basesink->priv->received_eos = TRUE;
3136
3137         /* EOS is a prerollable object, we call the unlocked version because it
3138          * does not check the received_eos flag. */
3139         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3140             FALSE, GST_MINI_OBJECT_CAST (event), TRUE);
3141         if (G_UNLIKELY (ret != GST_FLOW_OK))
3142           result = FALSE;
3143       }
3144       GST_PAD_PREROLL_UNLOCK (pad);
3145       break;
3146     }
3147     case GST_EVENT_NEWSEGMENT:
3148     {
3149       GstFlowReturn ret;
3150       gboolean update;
3151
3152       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3153
3154       GST_PAD_PREROLL_LOCK (pad);
3155       if (G_UNLIKELY (basesink->flushing))
3156         goto flushing;
3157
3158       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3159           NULL, NULL);
3160
3161       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3162         /* we can't accept anything when we are EOS */
3163         result = FALSE;
3164         gst_event_unref (event);
3165       } else {
3166         /* the new segment is a non prerollable item and does not block anything,
3167          * we need to configure the current clipping segment and insert the event
3168          * in the queue to serialize it with the buffers for rendering. */
3169         gst_base_sink_configure_segment (basesink, pad, event,
3170             basesink->abidata.ABI.clip_segment);
3171
3172         ret =
3173             gst_base_sink_queue_object_unlocked (basesink, pad,
3174             FALSE, GST_MINI_OBJECT_CAST (event), FALSE);
3175         if (G_UNLIKELY (ret != GST_FLOW_OK))
3176           result = FALSE;
3177         else {
3178           GST_OBJECT_LOCK (basesink);
3179           basesink->have_newsegment = TRUE;
3180           GST_OBJECT_UNLOCK (basesink);
3181         }
3182       }
3183       GST_PAD_PREROLL_UNLOCK (pad);
3184       break;
3185     }
3186     case GST_EVENT_FLUSH_START:
3187       if (bclass->event)
3188         bclass->event (basesink, event);
3189
3190       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3191
3192       gst_base_sink_flush_start (basesink, pad);
3193
3194       gst_event_unref (event);
3195       break;
3196     case GST_EVENT_FLUSH_STOP:
3197       if (bclass->event)
3198         bclass->event (basesink, event);
3199
3200       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3201
3202       gst_base_sink_flush_stop (basesink, pad);
3203
3204       gst_event_unref (event);
3205       break;
3206     default:
3207       /* other events are sent to queue or subclass depending on if they
3208        * are serialized. */
3209       if (GST_EVENT_IS_SERIALIZED (event)) {
3210         gst_base_sink_queue_object (basesink, pad,
3211             GST_MINI_OBJECT_CAST (event), FALSE);
3212       } else {
3213         if (bclass->event)
3214           bclass->event (basesink, event);
3215         gst_event_unref (event);
3216       }
3217       break;
3218   }
3219 done:
3220   gst_object_unref (basesink);
3221
3222   return result;
3223
3224   /* ERRORS */
3225 flushing:
3226   {
3227     GST_DEBUG_OBJECT (basesink, "we are flushing");
3228     GST_PAD_PREROLL_UNLOCK (pad);
3229     result = FALSE;
3230     gst_event_unref (event);
3231     goto done;
3232   }
3233 }
3234
3235 /* default implementation to calculate the start and end
3236  * timestamps on a buffer, subclasses can override
3237  */
3238 static void
3239 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3240     GstClockTime * start, GstClockTime * end)
3241 {
3242   GstClockTime timestamp, duration;
3243
3244   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3245   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3246
3247     /* get duration to calculate end time */
3248     duration = GST_BUFFER_DURATION (buffer);
3249     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3250       *end = timestamp + duration;
3251     }
3252     *start = timestamp;
3253   }
3254 }
3255
3256 /* must be called with PREROLL_LOCK */
3257 static gboolean
3258 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3259 {
3260   gboolean is_prerolled, res;
3261
3262   /* we have 2 cases where the PREROLL_LOCK is released:
3263    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3264    *  2) we are syncing on the clock
3265    */
3266   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3267   res = !is_prerolled;
3268
3269   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3270       basesink->have_preroll, basesink->priv->received_eos, res);
3271
3272   return res;
3273 }
3274
3275 /* with STREAM_LOCK, PREROLL_LOCK
3276  *
3277  * Takes a buffer and compare the timestamps with the last segment.
3278  * If the buffer falls outside of the segment boundaries, drop it.
3279  * Else queue the buffer for preroll and rendering.
3280  *
3281  * This function takes ownership of the buffer.
3282  */
3283 static GstFlowReturn
3284 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3285     gboolean is_list, gpointer obj)
3286 {
3287   GstBaseSinkClass *bclass;
3288   GstFlowReturn result;
3289   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3290   GstSegment *clip_segment;
3291   GstBuffer *time_buf;
3292
3293   if (G_UNLIKELY (basesink->flushing))
3294     goto flushing;
3295
3296   if (G_UNLIKELY (basesink->priv->received_eos))
3297     goto was_eos;
3298
3299   if (is_list) {
3300     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3301     g_assert (NULL != time_buf);
3302   } else {
3303     time_buf = GST_BUFFER_CAST (obj);
3304   }
3305
3306   /* for code clarity */
3307   clip_segment = basesink->abidata.ABI.clip_segment;
3308
3309   if (G_UNLIKELY (!basesink->have_newsegment)) {
3310     gboolean sync;
3311
3312     sync = gst_base_sink_get_sync (basesink);
3313     if (sync) {
3314       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3315           (_("Internal data flow problem.")),
3316           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3317     }
3318
3319     /* this means this sink will assume timestamps start from 0 */
3320     GST_OBJECT_LOCK (basesink);
3321     clip_segment->start = 0;
3322     clip_segment->stop = -1;
3323     basesink->segment.start = 0;
3324     basesink->segment.stop = -1;
3325     basesink->have_newsegment = TRUE;
3326     GST_OBJECT_UNLOCK (basesink);
3327   }
3328
3329   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3330
3331   /* check if the buffer needs to be dropped, we first ask the subclass for the
3332    * start and end */
3333   if (bclass->get_times)
3334     bclass->get_times (basesink, time_buf, &start, &end);
3335
3336   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3337     /* if the subclass does not want sync, we use our own values so that we at
3338      * least clip the buffer to the segment */
3339     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3340   }
3341
3342   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3343       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3344
3345   /* a dropped buffer does not participate in anything */
3346   if (GST_CLOCK_TIME_IS_VALID (start) &&
3347       (clip_segment->format == GST_FORMAT_TIME)) {
3348     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3349                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3350       goto out_of_segment;
3351   }
3352
3353   /* now we can process the buffer in the queue, this function takes ownership
3354    * of the buffer */
3355   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3356       is_list, obj, TRUE);
3357   return result;
3358
3359   /* ERRORS */
3360 flushing:
3361   {
3362     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3363     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3364     return GST_FLOW_WRONG_STATE;
3365   }
3366 was_eos:
3367   {
3368     GST_DEBUG_OBJECT (basesink,
3369         "we are EOS, dropping object, return UNEXPECTED");
3370     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3371     return GST_FLOW_UNEXPECTED;
3372   }
3373 out_of_segment:
3374   {
3375     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3376     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3377     return GST_FLOW_OK;
3378   }
3379 }
3380
3381 /* with STREAM_LOCK
3382  */
3383 static GstFlowReturn
3384 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3385     gboolean is_list, gpointer obj)
3386 {
3387   GstFlowReturn result;
3388
3389   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3390     goto wrong_mode;
3391
3392   GST_PAD_PREROLL_LOCK (pad);
3393   result = gst_base_sink_chain_unlocked (basesink, pad, is_list, obj);
3394   GST_PAD_PREROLL_UNLOCK (pad);
3395
3396 done:
3397   return result;
3398
3399   /* ERRORS */
3400 wrong_mode:
3401   {
3402     GST_OBJECT_LOCK (pad);
3403     GST_WARNING_OBJECT (basesink,
3404         "Push on pad %s:%s, but it was not activated in push mode",
3405         GST_DEBUG_PAD_NAME (pad));
3406     GST_OBJECT_UNLOCK (pad);
3407     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3408     /* we don't post an error message this will signal to the peer
3409      * pushing that EOS is reached. */
3410     result = GST_FLOW_UNEXPECTED;
3411     goto done;
3412   }
3413 }
3414
3415 static GstFlowReturn
3416 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3417 {
3418   GstBaseSink *basesink;
3419
3420   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3421
3422   return gst_base_sink_chain_main (basesink, pad, FALSE, buf);
3423 }
3424
3425 static GstFlowReturn
3426 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3427 {
3428   GstBaseSink *basesink;
3429   GstBaseSinkClass *bclass;
3430   GstFlowReturn result;
3431
3432   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3433   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3434
3435   if (G_LIKELY (bclass->render_list)) {
3436     result = gst_base_sink_chain_main (basesink, pad, TRUE, list);
3437   } else {
3438     GstBufferListIterator *it;
3439     GstBuffer *group;
3440
3441     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3442
3443     it = gst_buffer_list_iterate (list);
3444
3445     if (gst_buffer_list_iterator_next_group (it)) {
3446       do {
3447         group = gst_buffer_list_iterator_merge_group (it);
3448         if (group == NULL) {
3449           group = gst_buffer_new ();
3450           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3451         } else {
3452           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3453         }
3454         result = gst_base_sink_chain_main (basesink, pad, FALSE, group);
3455       } while (result == GST_FLOW_OK
3456           && gst_buffer_list_iterator_next_group (it));
3457     } else {
3458       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3459       result =
3460           gst_base_sink_chain_main (basesink, pad, FALSE, gst_buffer_new ());
3461     }
3462     gst_buffer_list_iterator_free (it);
3463     gst_buffer_list_unref (list);
3464   }
3465   return result;
3466 }
3467
3468
3469 static gboolean
3470 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3471 {
3472   gboolean res = TRUE;
3473
3474   /* update our offset if the start/stop position was updated */
3475   if (segment->format == GST_FORMAT_BYTES) {
3476     segment->time = segment->start;
3477   } else if (segment->start == 0) {
3478     /* seek to start, we can implement a default for this. */
3479     segment->time = 0;
3480   } else {
3481     res = FALSE;
3482     GST_INFO_OBJECT (sink, "Can't do a default seek");
3483   }
3484
3485   return res;
3486 }
3487
3488 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3489
3490 static gboolean
3491 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3492     GstEvent * event, GstSegment * segment)
3493 {
3494   /* By default, we try one of 2 things:
3495    *   - For absolute seek positions, convert the requested position to our
3496    *     configured processing format and place it in the output segment \
3497    *   - For relative seek positions, convert our current (input) values to the
3498    *     seek format, adjust by the relative seek offset and then convert back to
3499    *     the processing format
3500    */
3501   GstSeekType cur_type, stop_type;
3502   gint64 cur, stop;
3503   GstSeekFlags flags;
3504   GstFormat seek_format, dest_format;
3505   gdouble rate;
3506   gboolean update;
3507   gboolean res = TRUE;
3508
3509   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3510       &cur_type, &cur, &stop_type, &stop);
3511   dest_format = segment->format;
3512
3513   if (seek_format == dest_format) {
3514     gst_segment_set_seek (segment, rate, seek_format, flags,
3515         cur_type, cur, stop_type, stop, &update);
3516     return TRUE;
3517   }
3518
3519   if (cur_type != GST_SEEK_TYPE_NONE) {
3520     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3521     res =
3522         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3523         &cur);
3524     cur_type = GST_SEEK_TYPE_SET;
3525   }
3526
3527   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3528     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3529     res =
3530         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3531         &stop);
3532     stop_type = GST_SEEK_TYPE_SET;
3533   }
3534
3535   /* And finally, configure our output segment in the desired format */
3536   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3537       stop_type, stop, &update);
3538
3539   if (!res)
3540     goto no_format;
3541
3542   return res;
3543
3544 no_format:
3545   {
3546     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3547     return FALSE;
3548   }
3549 }
3550
3551 /* perform a seek, only executed in pull mode */
3552 static gboolean
3553 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3554 {
3555   gboolean flush;
3556   gdouble rate;
3557   GstFormat seek_format, dest_format;
3558   GstSeekFlags flags;
3559   GstSeekType cur_type, stop_type;
3560   gboolean seekseg_configured = FALSE;
3561   gint64 cur, stop;
3562   gboolean update, res = TRUE;
3563   GstSegment seeksegment;
3564
3565   dest_format = sink->segment.format;
3566
3567   if (event) {
3568     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3569     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3570         &cur_type, &cur, &stop_type, &stop);
3571
3572     flush = flags & GST_SEEK_FLAG_FLUSH;
3573   } else {
3574     GST_DEBUG_OBJECT (sink, "performing seek without event");
3575     flush = FALSE;
3576   }
3577
3578   if (flush) {
3579     GST_DEBUG_OBJECT (sink, "flushing upstream");
3580     gst_pad_push_event (pad, gst_event_new_flush_start ());
3581     gst_base_sink_flush_start (sink, pad);
3582   } else {
3583     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3584   }
3585
3586   GST_PAD_STREAM_LOCK (pad);
3587
3588   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3589    * copy the current segment info into the temp segment that we can actually
3590    * attempt the seek with. We only update the real segment if the seek suceeds. */
3591   if (!seekseg_configured) {
3592     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3593
3594     /* now configure the final seek segment */
3595     if (event) {
3596       if (sink->segment.format != seek_format) {
3597         /* OK, here's where we give the subclass a chance to convert the relative
3598          * seek into an absolute one in the processing format. We set up any
3599          * absolute seek above, before taking the stream lock. */
3600         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3601                 &seeksegment)) {
3602           GST_DEBUG_OBJECT (sink,
3603               "Preparing the seek failed after flushing. " "Aborting seek");
3604           res = FALSE;
3605         }
3606       } else {
3607         /* The seek format matches our processing format, no need to ask the
3608          * the subclass to configure the segment. */
3609         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3610             cur_type, cur, stop_type, stop, &update);
3611       }
3612     }
3613     /* Else, no seek event passed, so we're just (re)starting the
3614        current segment. */
3615   }
3616
3617   if (res) {
3618     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3619         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3620         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3621
3622     /* do the seek, segment.last_stop contains the new position. */
3623     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3624   }
3625
3626
3627   if (flush) {
3628     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3629     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3630     gst_base_sink_flush_stop (sink, pad);
3631   } else if (res && sink->abidata.ABI.running) {
3632     /* we are running the current segment and doing a non-flushing seek,
3633      * close the segment first based on the last_stop. */
3634     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3635         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3636   }
3637
3638   /* The subclass must have converted the segment to the processing format
3639    * by now */
3640   if (res && seeksegment.format != dest_format) {
3641     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3642         "in the correct format. Aborting seek.");
3643     res = FALSE;
3644   }
3645
3646   /* if successfull seek, we update our real segment and push
3647    * out the new segment. */
3648   if (res) {
3649     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3650
3651     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3652       gst_element_post_message (GST_ELEMENT (sink),
3653           gst_message_new_segment_start (GST_OBJECT (sink),
3654               sink->segment.format, sink->segment.last_stop));
3655     }
3656   }
3657
3658   sink->priv->discont = TRUE;
3659   sink->abidata.ABI.running = TRUE;
3660
3661   GST_PAD_STREAM_UNLOCK (pad);
3662
3663   return res;
3664 }
3665
3666 static void
3667 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3668     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3669     gboolean flush, gboolean intermediate)
3670 {
3671   GST_OBJECT_LOCK (sink);
3672   pending->seqnum = seqnum;
3673   pending->format = format;
3674   pending->amount = amount;
3675   pending->position = 0;
3676   pending->rate = rate;
3677   pending->flush = flush;
3678   pending->intermediate = intermediate;
3679   pending->valid = TRUE;
3680   /* flush invalidates the current stepping segment */
3681   if (flush)
3682     current->valid = FALSE;
3683   GST_OBJECT_UNLOCK (sink);
3684 }
3685
3686 static gboolean
3687 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3688 {
3689   GstBaseSinkPrivate *priv;
3690   GstBaseSinkClass *bclass;
3691   gboolean flush, intermediate;
3692   gdouble rate;
3693   GstFormat format;
3694   guint64 amount;
3695   guint seqnum;
3696   GstStepInfo *pending, *current;
3697   GstMessage *message;
3698
3699   bclass = GST_BASE_SINK_GET_CLASS (sink);
3700   priv = sink->priv;
3701
3702   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3703
3704   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3705   seqnum = gst_event_get_seqnum (event);
3706
3707   pending = &priv->pending_step;
3708   current = &priv->current_step;
3709
3710   /* post message first */
3711   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3712       amount, rate, flush, intermediate);
3713   gst_message_set_seqnum (message, seqnum);
3714   gst_element_post_message (GST_ELEMENT (sink), message);
3715
3716   if (flush) {
3717     /* we need to call ::unlock before locking PREROLL_LOCK
3718      * since we lock it before going into ::render */
3719     if (bclass->unlock)
3720       bclass->unlock (sink);
3721
3722     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3723     /* now that we have the PREROLL lock, clear our unlock request */
3724     if (bclass->unlock_stop)
3725       bclass->unlock_stop (sink);
3726
3727     /* update the stepinfo and make it valid */
3728     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3729         intermediate);
3730
3731     if (sink->priv->async_enabled) {
3732       /* and we need to commit our state again on the next
3733        * prerolled buffer */
3734       sink->playing_async = TRUE;
3735       priv->pending_step.need_preroll = TRUE;
3736       sink->need_preroll = FALSE;
3737       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3738     } else {
3739       sink->priv->have_latency = TRUE;
3740       sink->need_preroll = FALSE;
3741     }
3742     priv->current_sstart = GST_CLOCK_TIME_NONE;
3743     priv->current_sstop = GST_CLOCK_TIME_NONE;
3744     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3745     priv->call_preroll = TRUE;
3746     gst_base_sink_set_last_buffer (sink, NULL);
3747     gst_base_sink_reset_qos (sink);
3748
3749     if (sink->clock_id) {
3750       gst_clock_id_unschedule (sink->clock_id);
3751     }
3752
3753     if (sink->have_preroll) {
3754       GST_DEBUG_OBJECT (sink, "signal waiter");
3755       priv->step_unlock = TRUE;
3756       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3757     }
3758     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3759   } else {
3760     /* update the stepinfo and make it valid */
3761     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3762         intermediate);
3763   }
3764
3765   return TRUE;
3766 }
3767
3768 /* with STREAM_LOCK
3769  */
3770 static void
3771 gst_base_sink_loop (GstPad * pad)
3772 {
3773   GstBaseSink *basesink;
3774   GstBuffer *buf = NULL;
3775   GstFlowReturn result;
3776   guint blocksize;
3777   guint64 offset;
3778
3779   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3780
3781   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3782
3783   if ((blocksize = basesink->priv->blocksize) == 0)
3784     blocksize = -1;
3785
3786   offset = basesink->segment.last_stop;
3787
3788   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3789       offset, blocksize);
3790
3791   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3792   if (G_UNLIKELY (result != GST_FLOW_OK))
3793     goto paused;
3794
3795   if (G_UNLIKELY (buf == NULL))
3796     goto no_buffer;
3797
3798   offset += GST_BUFFER_SIZE (buf);
3799
3800   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3801
3802   GST_PAD_PREROLL_LOCK (pad);
3803   result = gst_base_sink_chain_unlocked (basesink, pad, FALSE, buf);
3804   GST_PAD_PREROLL_UNLOCK (pad);
3805   if (G_UNLIKELY (result != GST_FLOW_OK))
3806     goto paused;
3807
3808   return;
3809
3810   /* ERRORS */
3811 paused:
3812   {
3813     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3814         gst_flow_get_name (result));
3815     gst_pad_pause_task (pad);
3816     /* fatal errors and NOT_LINKED cause EOS */
3817     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3818       if (result == GST_FLOW_UNEXPECTED) {
3819         /* perform EOS logic */
3820         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3821           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3822               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3823                   basesink->segment.format, basesink->segment.last_stop));
3824         } else {
3825           gst_base_sink_event (pad, gst_event_new_eos ());
3826         }
3827       } else {
3828         /* for fatal errors we post an error message, post the error
3829          * first so the app knows about the error first. */
3830         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3831             (_("Internal data stream error.")),
3832             ("stream stopped, reason %s", gst_flow_get_name (result)));
3833         gst_base_sink_event (pad, gst_event_new_eos ());
3834       }
3835     }
3836     return;
3837   }
3838 no_buffer:
3839   {
3840     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3841     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3842         (_("Internal data flow error.")), ("element returned NULL buffer"));
3843     result = GST_FLOW_ERROR;
3844     goto paused;
3845   }
3846 }
3847
3848 static gboolean
3849 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3850     gboolean flushing)
3851 {
3852   GstBaseSinkClass *bclass;
3853
3854   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3855
3856   if (flushing) {
3857     /* unlock any subclasses, we need to do this before grabbing the
3858      * PREROLL_LOCK since we hold this lock before going into ::render. */
3859     if (bclass->unlock)
3860       bclass->unlock (basesink);
3861   }
3862
3863   GST_PAD_PREROLL_LOCK (pad);
3864   basesink->flushing = flushing;
3865   if (flushing) {
3866     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3867     if (bclass->unlock_stop)
3868       bclass->unlock_stop (basesink);
3869
3870     /* set need_preroll before we unblock the clock. If the clock is unblocked
3871      * before timing out, we can reuse the buffer for preroll. */
3872     basesink->need_preroll = TRUE;
3873
3874     /* step 2, unblock clock sync (if any) or any other blocking thing */
3875     if (basesink->clock_id) {
3876       gst_clock_id_unschedule (basesink->clock_id);
3877     }
3878
3879     /* flush out the data thread if it's locked in finish_preroll, this will
3880      * also flush out the EOS state */
3881     GST_DEBUG_OBJECT (basesink,
3882         "flushing out data thread, need preroll to TRUE");
3883     gst_base_sink_preroll_queue_flush (basesink, pad);
3884   }
3885   GST_PAD_PREROLL_UNLOCK (pad);
3886
3887   return TRUE;
3888 }
3889
3890 static gboolean
3891 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3892 {
3893   gboolean result;
3894
3895   if (active) {
3896     /* start task */
3897     result = gst_pad_start_task (basesink->sinkpad,
3898         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3899   } else {
3900     /* step 2, make sure streaming finishes */
3901     result = gst_pad_stop_task (basesink->sinkpad);
3902   }
3903
3904   return result;
3905 }
3906
3907 static gboolean
3908 gst_base_sink_pad_activate (GstPad * pad)
3909 {
3910   gboolean result = FALSE;
3911   GstBaseSink *basesink;
3912
3913   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3914
3915   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3916
3917   gst_base_sink_set_flushing (basesink, pad, FALSE);
3918
3919   /* we need to have the pull mode enabled */
3920   if (!basesink->can_activate_pull) {
3921     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3922     goto fallback;
3923   }
3924
3925   /* check if downstreams supports pull mode at all */
3926   if (!gst_pad_check_pull_range (pad)) {
3927     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3928     goto fallback;
3929   }
3930
3931   /* set the pad mode before starting the task so that it's in the
3932    * correct state for the new thread. also the sink set_caps and get_caps
3933    * function checks this */
3934   basesink->pad_mode = GST_ACTIVATE_PULL;
3935
3936   /* we first try to negotiate a format so that when we try to activate
3937    * downstream, it knows about our format */
3938   if (!gst_base_sink_negotiate_pull (basesink)) {
3939     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3940     goto fallback;
3941   }
3942
3943   /* ok activate now */
3944   if (!gst_pad_activate_pull (pad, TRUE)) {
3945     /* clear any pending caps */
3946     GST_OBJECT_LOCK (basesink);
3947     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3948     GST_OBJECT_UNLOCK (basesink);
3949     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3950     goto fallback;
3951   }
3952
3953   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3954   result = TRUE;
3955   goto done;
3956
3957   /* push mode fallback */
3958 fallback:
3959   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3960   if ((result = gst_pad_activate_push (pad, TRUE))) {
3961     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3962   }
3963
3964 done:
3965   if (!result) {
3966     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3967     gst_base_sink_set_flushing (basesink, pad, TRUE);
3968   }
3969
3970   gst_object_unref (basesink);
3971
3972   return result;
3973 }
3974
3975 static gboolean
3976 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3977 {
3978   gboolean result;
3979   GstBaseSink *basesink;
3980
3981   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3982
3983   if (active) {
3984     if (!basesink->can_activate_push) {
3985       result = FALSE;
3986       basesink->pad_mode = GST_ACTIVATE_NONE;
3987     } else {
3988       result = TRUE;
3989       basesink->pad_mode = GST_ACTIVATE_PUSH;
3990     }
3991   } else {
3992     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3993       g_warning ("Internal GStreamer activation error!!!");
3994       result = FALSE;
3995     } else {
3996       gst_base_sink_set_flushing (basesink, pad, TRUE);
3997       result = TRUE;
3998       basesink->pad_mode = GST_ACTIVATE_NONE;
3999     }
4000   }
4001
4002   gst_object_unref (basesink);
4003
4004   return result;
4005 }
4006
4007 static gboolean
4008 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4009 {
4010   GstCaps *caps;
4011   gboolean result;
4012
4013   result = FALSE;
4014
4015   /* this returns the intersection between our caps and the peer caps. If there
4016    * is no peer, it returns NULL and we can't operate in pull mode so we can
4017    * fail the negotiation. */
4018   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4019   if (caps == NULL || gst_caps_is_empty (caps))
4020     goto no_caps_possible;
4021
4022   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4023
4024   caps = gst_caps_make_writable (caps);
4025   /* get the first (prefered) format */
4026   gst_caps_truncate (caps);
4027   /* try to fixate */
4028   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4029
4030   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4031
4032   if (gst_caps_is_any (caps)) {
4033     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4034         "allowing pull()");
4035     /* neither side has template caps in this case, so they are prepared for
4036        pull() without setcaps() */
4037     result = TRUE;
4038   } else if (gst_caps_is_fixed (caps)) {
4039     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4040       goto could_not_set_caps;
4041
4042     GST_OBJECT_LOCK (basesink);
4043     gst_caps_replace (&basesink->priv->pull_caps, caps);
4044     GST_OBJECT_UNLOCK (basesink);
4045
4046     result = TRUE;
4047   }
4048
4049   gst_caps_unref (caps);
4050
4051   return result;
4052
4053 no_caps_possible:
4054   {
4055     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4056     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4057     if (caps)
4058       gst_caps_unref (caps);
4059     return FALSE;
4060   }
4061 could_not_set_caps:
4062   {
4063     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4064     gst_caps_unref (caps);
4065     return FALSE;
4066   }
4067 }
4068
4069 /* this won't get called until we implement an activate function */
4070 static gboolean
4071 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4072 {
4073   gboolean result = FALSE;
4074   GstBaseSink *basesink;
4075   GstBaseSinkClass *bclass;
4076
4077   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4078   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4079
4080   if (active) {
4081     GstFormat format;
4082     gint64 duration;
4083
4084     /* we mark we have a newsegment here because pull based
4085      * mode works just fine without having a newsegment before the
4086      * first buffer */
4087     format = GST_FORMAT_BYTES;
4088
4089     gst_segment_init (&basesink->segment, format);
4090     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4091     GST_OBJECT_LOCK (basesink);
4092     basesink->have_newsegment = TRUE;
4093     GST_OBJECT_UNLOCK (basesink);
4094
4095     /* get the peer duration in bytes */
4096     result = gst_pad_query_peer_duration (pad, &format, &duration);
4097     if (result) {
4098       GST_DEBUG_OBJECT (basesink,
4099           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4100       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4101           duration);
4102       gst_segment_set_duration (&basesink->segment, format, duration);
4103     } else {
4104       GST_DEBUG_OBJECT (basesink, "unknown duration");
4105     }
4106
4107     if (bclass->activate_pull)
4108       result = bclass->activate_pull (basesink, TRUE);
4109     else
4110       result = FALSE;
4111
4112     if (!result)
4113       goto activate_failed;
4114
4115   } else {
4116     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4117       g_warning ("Internal GStreamer activation error!!!");
4118       result = FALSE;
4119     } else {
4120       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4121       if (bclass->activate_pull)
4122         result &= bclass->activate_pull (basesink, FALSE);
4123       basesink->pad_mode = GST_ACTIVATE_NONE;
4124       /* clear any pending caps */
4125       GST_OBJECT_LOCK (basesink);
4126       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4127       GST_OBJECT_UNLOCK (basesink);
4128     }
4129   }
4130   gst_object_unref (basesink);
4131
4132   return result;
4133
4134   /* ERRORS */
4135 activate_failed:
4136   {
4137     /* reset, as starting the thread failed */
4138     basesink->pad_mode = GST_ACTIVATE_NONE;
4139
4140     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4141     return FALSE;
4142   }
4143 }
4144
4145 /* send an event to our sinkpad peer. */
4146 static gboolean
4147 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4148 {
4149   GstPad *pad;
4150   GstBaseSink *basesink = GST_BASE_SINK (element);
4151   gboolean forward, result = TRUE;
4152   GstActivateMode mode;
4153
4154   GST_OBJECT_LOCK (element);
4155   /* get the pad and the scheduling mode */
4156   pad = gst_object_ref (basesink->sinkpad);
4157   mode = basesink->pad_mode;
4158   GST_OBJECT_UNLOCK (element);
4159
4160   /* only push UPSTREAM events upstream */
4161   forward = GST_EVENT_IS_UPSTREAM (event);
4162
4163   switch (GST_EVENT_TYPE (event)) {
4164     case GST_EVENT_LATENCY:
4165     {
4166       GstClockTime latency;
4167
4168       gst_event_parse_latency (event, &latency);
4169
4170       /* store the latency. We use this to adjust the running_time before syncing
4171        * it to the clock. */
4172       GST_OBJECT_LOCK (element);
4173       basesink->priv->latency = latency;
4174       if (!basesink->priv->have_latency)
4175         forward = FALSE;
4176       GST_OBJECT_UNLOCK (element);
4177       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4178           GST_TIME_ARGS (latency));
4179
4180       /* We forward this event so that all elements know about the global pipeline
4181        * latency. This is interesting for an element when it wants to figure out
4182        * when a particular piece of data will be rendered. */
4183       break;
4184     }
4185     case GST_EVENT_SEEK:
4186       /* in pull mode we will execute the seek */
4187       if (mode == GST_ACTIVATE_PULL)
4188         result = gst_base_sink_perform_seek (basesink, pad, event);
4189       break;
4190     case GST_EVENT_STEP:
4191       result = gst_base_sink_perform_step (basesink, pad, event);
4192       forward = FALSE;
4193       break;
4194     default:
4195       break;
4196   }
4197
4198   if (forward) {
4199     result = gst_pad_push_event (pad, event);
4200   } else {
4201     /* not forwarded, unref the event */
4202     gst_event_unref (event);
4203   }
4204
4205   gst_object_unref (pad);
4206   return result;
4207 }
4208
4209 static gboolean
4210 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
4211 {
4212   GstPad *peer;
4213   gboolean res = FALSE;
4214
4215   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
4216     res = gst_pad_query (peer, query);
4217     gst_object_unref (peer);
4218   }
4219   return res;
4220 }
4221
4222 /* get the end position of the last seen object, this is used
4223  * for EOS and for making sure that we don't report a position we
4224  * have not reached yet. With LOCK. */
4225 static gboolean
4226 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
4227     gint64 * cur, gboolean * upstream)
4228 {
4229   GstFormat oformat;
4230   GstSegment *segment;
4231   gboolean ret = TRUE;
4232
4233   segment = &basesink->segment;
4234   oformat = segment->format;
4235
4236   if (oformat == GST_FORMAT_TIME) {
4237     /* return last observed stream time, we keep the stream time around in the
4238      * time format. */
4239     *cur = basesink->priv->current_sstop;
4240   } else {
4241     /* convert last stop to stream time */
4242     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4243   }
4244
4245   if (*cur != -1 && oformat != format) {
4246     GST_OBJECT_UNLOCK (basesink);
4247     /* convert to the target format if we need to, release lock first */
4248     ret =
4249         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4250     if (!ret) {
4251       *cur = -1;
4252       *upstream = TRUE;
4253     }
4254     GST_OBJECT_LOCK (basesink);
4255   }
4256
4257   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
4258       GST_TIME_ARGS (*cur));
4259
4260   return ret;
4261 }
4262
4263 /* get the position when we are PAUSED, this is the stream time of the buffer
4264  * that prerolled. If no buffer is prerolled (we are still flushing), this
4265  * value will be -1. With LOCK. */
4266 static gboolean
4267 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4268     gint64 * cur, gboolean * upstream)
4269 {
4270   gboolean res;
4271   gint64 time;
4272   GstSegment *segment;
4273   GstFormat oformat;
4274
4275   /* we don't use the clip segment in pull mode, when seeking we update the
4276    * main segment directly with the new segment values without it having to be
4277    * activated by the rendering after preroll */
4278   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4279     segment = basesink->abidata.ABI.clip_segment;
4280   else
4281     segment = &basesink->segment;
4282   oformat = segment->format;
4283
4284   if (oformat == GST_FORMAT_TIME) {
4285     *cur = basesink->priv->current_sstart;
4286     if (segment->rate < 0.0 &&
4287         GST_CLOCK_TIME_IS_VALID (basesink->priv->current_sstop)) {
4288       /* for reverse playback we prefer the stream time stop position if we have
4289        * one */
4290       *cur = basesink->priv->current_sstop;
4291     }
4292   } else {
4293     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4294   }
4295
4296   time = segment->time;
4297
4298   if (*cur != -1) {
4299     *cur = MAX (*cur, time);
4300     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4301         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4302   } else {
4303     /* we have no buffer, use the segment times. */
4304     if (segment->rate >= 0.0) {
4305       /* forward, next position is always the time of the segment */
4306       *cur = time;
4307       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4308           GST_TIME_ARGS (*cur));
4309     } else {
4310       /* reverse, next expected timestamp is segment->stop. We use the function
4311        * to get things right for negative applied_rates. */
4312       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4313       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4314           GST_TIME_ARGS (*cur));
4315     }
4316   }
4317
4318   res = (*cur != -1);
4319   if (res && oformat != format) {
4320     GST_OBJECT_UNLOCK (basesink);
4321     res =
4322         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4323     if (!res) {
4324       *cur = -1;
4325       *upstream = TRUE;
4326     }
4327     GST_OBJECT_LOCK (basesink);
4328   }
4329
4330   return res;
4331 }
4332
4333 static gboolean
4334 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4335     gint64 * cur, gboolean * upstream)
4336 {
4337   GstClock *clock;
4338   gboolean res = FALSE;
4339   GstFormat oformat, tformat;
4340   GstClockTime now, latency;
4341   GstClockTimeDiff base;
4342   gint64 time, accum, duration;
4343   gdouble rate;
4344   gint64 last;
4345
4346   GST_OBJECT_LOCK (basesink);
4347   /* our intermediate time format */
4348   tformat = GST_FORMAT_TIME;
4349   /* get the format in the segment */
4350   oformat = basesink->segment.format;
4351
4352   /* can only give answer based on the clock if not EOS */
4353   if (G_UNLIKELY (basesink->eos))
4354     goto in_eos;
4355
4356   /* we can only get the segment when we are not NULL or READY */
4357   if (!basesink->have_newsegment)
4358     goto wrong_state;
4359
4360   /* when not in PLAYING or when we're busy with a state change, we
4361    * cannot read from the clock so we report time based on the
4362    * last seen timestamp. */
4363   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4364       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4365     goto in_pause;
4366
4367   /* we need to sync on the clock. */
4368   if (basesink->sync == FALSE)
4369     goto no_sync;
4370
4371   /* and we need a clock */
4372   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4373     goto no_sync;
4374
4375   /* collect all data we need holding the lock */
4376   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4377     time = basesink->segment.time;
4378   else
4379     time = 0;
4380
4381   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4382     duration = basesink->segment.stop - basesink->segment.start;
4383   else
4384     duration = 0;
4385
4386   base = GST_ELEMENT_CAST (basesink)->base_time;
4387   accum = basesink->segment.accum;
4388   rate = basesink->segment.rate * basesink->segment.applied_rate;
4389   latency = basesink->priv->latency;
4390
4391   gst_object_ref (clock);
4392
4393   /* this function might release the LOCK */
4394   gst_base_sink_get_position_last (basesink, format, &last, upstream);
4395
4396   /* need to release the object lock before we can get the time,
4397    * a clock might take the LOCK of the provider, which could be
4398    * a basesink subclass. */
4399   GST_OBJECT_UNLOCK (basesink);
4400
4401   now = gst_clock_get_time (clock);
4402
4403   if (oformat != tformat) {
4404     /* convert accum, time and duration to time */
4405     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4406             &accum))
4407       goto convert_failed;
4408     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4409             &duration))
4410       goto convert_failed;
4411     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4412             &time))
4413       goto convert_failed;
4414   }
4415
4416   /* subtract base time and accumulated time from the clock time.
4417    * Make sure we don't go negative. This is the current time in
4418    * the segment which we need to scale with the combined
4419    * rate and applied rate. */
4420   base += accum;
4421   base += latency;
4422   if (GST_CLOCK_DIFF (base, now) < 0)
4423     base = now;
4424
4425   /* for negative rates we need to count back from the segment
4426    * duration. */
4427   if (rate < 0.0)
4428     time += duration;
4429
4430   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4431
4432   /* never report more than last seen position */
4433   if (last != -1)
4434     *cur = MIN (last, *cur);
4435
4436   gst_object_unref (clock);
4437
4438   GST_DEBUG_OBJECT (basesink,
4439       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4440       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4441       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4442       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4443
4444   if (oformat != format) {
4445     /* convert time to final format */
4446     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4447       goto convert_failed;
4448   }
4449
4450   res = TRUE;
4451
4452 done:
4453   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4454       res, GST_TIME_ARGS (*cur));
4455   return res;
4456
4457   /* special cases */
4458 in_eos:
4459   {
4460     GST_DEBUG_OBJECT (basesink, "position in EOS");
4461     res = gst_base_sink_get_position_last (basesink, format, cur, upstream);
4462     GST_OBJECT_UNLOCK (basesink);
4463     goto done;
4464   }
4465 in_pause:
4466   {
4467     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4468     res = gst_base_sink_get_position_paused (basesink, format, cur, upstream);
4469     GST_OBJECT_UNLOCK (basesink);
4470     goto done;
4471   }
4472 wrong_state:
4473   {
4474     /* in NULL or READY we always return FALSE and -1 */
4475     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4476     res = FALSE;
4477     *cur = -1;
4478     GST_OBJECT_UNLOCK (basesink);
4479     goto done;
4480   }
4481 no_sync:
4482   {
4483     /* report last seen timestamp if any, else ask upstream to answer */
4484     if ((*cur = basesink->priv->current_sstart) != -1)
4485       res = TRUE;
4486     else
4487       *upstream = TRUE;
4488
4489     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4490         res, GST_TIME_ARGS (*cur));
4491     GST_OBJECT_UNLOCK (basesink);
4492     return res;
4493   }
4494 convert_failed:
4495   {
4496     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4497     *upstream = TRUE;
4498     return FALSE;
4499   }
4500 }
4501
4502 static gboolean
4503 gst_base_sink_query (GstElement * element, GstQuery * query)
4504 {
4505   gboolean res = FALSE;
4506
4507   GstBaseSink *basesink = GST_BASE_SINK (element);
4508
4509   switch (GST_QUERY_TYPE (query)) {
4510     case GST_QUERY_POSITION:
4511     {
4512       gint64 cur = 0;
4513       GstFormat format;
4514       gboolean upstream = FALSE;
4515
4516       gst_query_parse_position (query, &format, NULL);
4517
4518       GST_DEBUG_OBJECT (basesink, "position format %d", format);
4519
4520       /* first try to get the position based on the clock */
4521       if ((res =
4522               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4523         gst_query_set_position (query, format, cur);
4524       } else if (upstream) {
4525         /* fallback to peer query */
4526         res = gst_base_sink_peer_query (basesink, query);
4527       }
4528       break;
4529     }
4530     case GST_QUERY_DURATION:
4531     {
4532       GstFormat format, uformat;
4533       gint64 duration, uduration;
4534
4535       gst_query_parse_duration (query, &format, NULL);
4536
4537       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4538           gst_format_get_name (format));
4539
4540       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4541         uformat = GST_FORMAT_BYTES;
4542
4543         /* get the duration in bytes, in pull mode that's all we are sure to
4544          * know. We have to explicitly get this value from upstream instead of
4545          * using our cached value because it might change. Duration caching
4546          * should be done at a higher level. */
4547         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4548             &uduration);
4549         if (res) {
4550           gst_segment_set_duration (&basesink->segment, uformat, uduration);
4551           if (format != uformat) {
4552             /* convert to the requested format */
4553             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4554                 &format, &duration);
4555           } else {
4556             duration = uduration;
4557           }
4558           if (res) {
4559             /* set the result */
4560             gst_query_set_duration (query, format, duration);
4561           }
4562         }
4563       } else {
4564         /* in push mode we simply forward upstream */
4565         res = gst_base_sink_peer_query (basesink, query);
4566       }
4567       break;
4568     }
4569     case GST_QUERY_LATENCY:
4570     {
4571       gboolean live, us_live;
4572       GstClockTime min, max;
4573
4574       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4575                   &max))) {
4576         gst_query_set_latency (query, live, min, max);
4577       }
4578       break;
4579     }
4580     case GST_QUERY_JITTER:
4581       break;
4582     case GST_QUERY_RATE:
4583       /* gst_query_set_rate (query, basesink->segment_rate); */
4584       res = TRUE;
4585       break;
4586     case GST_QUERY_SEGMENT:
4587     {
4588       /* FIXME, bring start/stop to stream time */
4589       gst_query_set_segment (query, basesink->segment.rate,
4590           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4591       break;
4592     }
4593     case GST_QUERY_SEEKING:
4594     case GST_QUERY_CONVERT:
4595     case GST_QUERY_FORMATS:
4596     default:
4597       res = gst_base_sink_peer_query (basesink, query);
4598       break;
4599   }
4600   return res;
4601 }
4602
4603 static GstStateChangeReturn
4604 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4605 {
4606   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4607   GstBaseSink *basesink = GST_BASE_SINK (element);
4608   GstBaseSinkClass *bclass;
4609   GstBaseSinkPrivate *priv;
4610
4611   priv = basesink->priv;
4612
4613   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4614
4615   switch (transition) {
4616     case GST_STATE_CHANGE_NULL_TO_READY:
4617       if (bclass->start)
4618         if (!bclass->start (basesink))
4619           goto start_failed;
4620       break;
4621     case GST_STATE_CHANGE_READY_TO_PAUSED:
4622       /* need to complete preroll before this state change completes, there
4623        * is no data flow in READY so we can safely assume we need to preroll. */
4624       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4625       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4626       basesink->have_newsegment = FALSE;
4627       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4628       gst_segment_init (basesink->abidata.ABI.clip_segment,
4629           GST_FORMAT_UNDEFINED);
4630       basesink->offset = 0;
4631       basesink->have_preroll = FALSE;
4632       priv->step_unlock = FALSE;
4633       basesink->need_preroll = TRUE;
4634       basesink->playing_async = TRUE;
4635       priv->current_sstart = GST_CLOCK_TIME_NONE;
4636       priv->current_sstop = GST_CLOCK_TIME_NONE;
4637       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4638       priv->latency = 0;
4639       basesink->eos = FALSE;
4640       priv->received_eos = FALSE;
4641       gst_base_sink_reset_qos (basesink);
4642       priv->commited = FALSE;
4643       priv->call_preroll = TRUE;
4644       priv->current_step.valid = FALSE;
4645       priv->pending_step.valid = FALSE;
4646       if (priv->async_enabled) {
4647         GST_DEBUG_OBJECT (basesink, "doing async state change");
4648         /* when async enabled, post async-start message and return ASYNC from
4649          * the state change function */
4650         ret = GST_STATE_CHANGE_ASYNC;
4651         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4652             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4653       } else {
4654         priv->have_latency = TRUE;
4655       }
4656       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4657       break;
4658     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4659       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4660       if (!gst_base_sink_needs_preroll (basesink)) {
4661         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4662         /* no preroll needed anymore now. */
4663         basesink->playing_async = FALSE;
4664         basesink->need_preroll = FALSE;
4665         if (basesink->eos) {
4666           GstMessage *message;
4667
4668           /* need to post EOS message here */
4669           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4670           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4671           gst_message_set_seqnum (message, basesink->priv->seqnum);
4672           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4673         } else {
4674           GST_DEBUG_OBJECT (basesink, "signal preroll");
4675           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4676         }
4677       } else {
4678         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4679         basesink->need_preroll = TRUE;
4680         basesink->playing_async = TRUE;
4681         priv->call_preroll = TRUE;
4682         priv->commited = FALSE;
4683         if (priv->async_enabled) {
4684           GST_DEBUG_OBJECT (basesink, "doing async state change");
4685           ret = GST_STATE_CHANGE_ASYNC;
4686           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4687               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4688         }
4689       }
4690       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4691       break;
4692     default:
4693       break;
4694   }
4695
4696   {
4697     GstStateChangeReturn bret;
4698
4699     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4700     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4701       goto activate_failed;
4702   }
4703
4704   switch (transition) {
4705     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4706       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4707       /* FIXME, make sure we cannot enter _render first */
4708
4709       /* we need to call ::unlock before locking PREROLL_LOCK
4710        * since we lock it before going into ::render */
4711       if (bclass->unlock)
4712         bclass->unlock (basesink);
4713
4714       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4715       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4716       /* now that we have the PREROLL lock, clear our unlock request */
4717       if (bclass->unlock_stop)
4718         bclass->unlock_stop (basesink);
4719
4720       /* we need preroll again and we set the flag before unlocking the clockid
4721        * because if the clockid is unlocked before a current buffer expired, we
4722        * can use that buffer to preroll with */
4723       basesink->need_preroll = TRUE;
4724
4725       if (basesink->clock_id) {
4726         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4727         gst_clock_id_unschedule (basesink->clock_id);
4728       }
4729
4730       /* if we don't have a preroll buffer we need to wait for a preroll and
4731        * return ASYNC. */
4732       if (!gst_base_sink_needs_preroll (basesink)) {
4733         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4734         basesink->playing_async = FALSE;
4735       } else {
4736         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4737           GST_DEBUG_OBJECT (basesink, "element is <= READY");
4738           ret = GST_STATE_CHANGE_SUCCESS;
4739         } else {
4740           GST_DEBUG_OBJECT (basesink,
4741               "PLAYING to PAUSED, we are not prerolled");
4742           basesink->playing_async = TRUE;
4743           priv->commited = FALSE;
4744           priv->call_preroll = TRUE;
4745           if (priv->async_enabled) {
4746             GST_DEBUG_OBJECT (basesink, "doing async state change");
4747             ret = GST_STATE_CHANGE_ASYNC;
4748             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4749                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4750                     FALSE));
4751           }
4752         }
4753       }
4754       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4755           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4756
4757       gst_base_sink_reset_qos (basesink);
4758       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4759       break;
4760     case GST_STATE_CHANGE_PAUSED_TO_READY:
4761       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4762       /* start by reseting our position state with the object lock so that the
4763        * position query gets the right idea. We do this before we post the
4764        * messages so that the message handlers pick this up. */
4765       GST_OBJECT_LOCK (basesink);
4766       basesink->have_newsegment = FALSE;
4767       priv->current_sstart = GST_CLOCK_TIME_NONE;
4768       priv->current_sstop = GST_CLOCK_TIME_NONE;
4769       priv->have_latency = FALSE;
4770       GST_OBJECT_UNLOCK (basesink);
4771
4772       gst_base_sink_set_last_buffer (basesink, NULL);
4773       priv->call_preroll = FALSE;
4774
4775       if (!priv->commited) {
4776         if (priv->async_enabled) {
4777           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4778
4779           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4780               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4781                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4782
4783           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4784               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4785         }
4786         priv->commited = TRUE;
4787       } else {
4788         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4789       }
4790       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4791       break;
4792     case GST_STATE_CHANGE_READY_TO_NULL:
4793       if (bclass->stop) {
4794         if (!bclass->stop (basesink)) {
4795           GST_WARNING_OBJECT (basesink, "failed to stop");
4796         }
4797       }
4798       gst_base_sink_set_last_buffer (basesink, NULL);
4799       priv->call_preroll = FALSE;
4800       break;
4801     default:
4802       break;
4803   }
4804
4805   return ret;
4806
4807   /* ERRORS */
4808 start_failed:
4809   {
4810     GST_DEBUG_OBJECT (basesink, "failed to start");
4811     return GST_STATE_CHANGE_FAILURE;
4812   }
4813 activate_failed:
4814   {
4815     GST_DEBUG_OBJECT (basesink,
4816         "element failed to change states -- activation problem?");
4817     return GST_STATE_CHANGE_FAILURE;
4818   }
4819 }