libs/gst/base/gstbasesink.c: Improve position reporting while flushing and other...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 /* FIXME, some stuff in ABI.data and other in Private...
156  * Make up your mind please.
157  */
158 struct _GstBaseSinkPrivate
159 {
160   gint qos_enabled;             /* ATOMIC */
161   gboolean async_enabled;
162   GstClockTimeDiff ts_offset;
163   GstClockTime render_delay;
164
165   /* start, stop of current buffer, stream time, used to report position */
166   GstClockTime current_sstart;
167   GstClockTime current_sstop;
168
169   /* start, stop and jitter of current buffer, running time */
170   GstClockTime current_rstart;
171   GstClockTime current_rstop;
172   GstClockTimeDiff current_jitter;
173
174   /* EOS sync time in running time */
175   GstClockTime eos_rtime;
176
177   /* last buffer that arrived in time, running time */
178   GstClockTime last_in_time;
179   /* when the last buffer left the sink, running time */
180   GstClockTime last_left;
181
182   /* running averages go here these are done on running time */
183   GstClockTime avg_pt;
184   GstClockTime avg_duration;
185   gdouble avg_rate;
186
187   /* these are done on system time. avg_jitter and avg_render are
188    * compared to eachother to see if the rendering time takes a
189    * huge amount of the processing, If so we are flooded with
190    * buffers. */
191   GstClockTime last_left_systime;
192   GstClockTime avg_jitter;
193   GstClockTime start, stop;
194   GstClockTime avg_render;
195
196   /* number of rendered and dropped frames */
197   guint64 rendered;
198   guint64 dropped;
199
200   /* latency stuff */
201   GstClockTime latency;
202
203   /* if we already commited the state */
204   gboolean commited;
205
206   /* when we received EOS */
207   gboolean received_eos;
208
209   /* when we are prerolled and able to report latency */
210   gboolean have_latency;
211
212   /* the last buffer we prerolled or rendered. Useful for making snapshots */
213   GstBuffer *last_buffer;
214 };
215
216 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
217
218 /* generic running average, this has a neutral window size */
219 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
220
221 /* the windows for these running averages are experimentally obtained.
222  * possitive values get averaged more while negative values use a small
223  * window so we can react faster to badness. */
224 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
225 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
226
227 /* BaseSink properties */
228
229 #define DEFAULT_SIZE 1024
230 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
231 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
232
233 #define DEFAULT_PREROLL_QUEUE_LEN       0
234 #define DEFAULT_SYNC                    TRUE
235 #define DEFAULT_MAX_LATENESS            -1
236 #define DEFAULT_QOS                     FALSE
237 #define DEFAULT_ASYNC                   TRUE
238 #define DEFAULT_TS_OFFSET               0
239
240 enum
241 {
242   PROP_0,
243   PROP_PREROLL_QUEUE_LEN,
244   PROP_SYNC,
245   PROP_MAX_LATENESS,
246   PROP_QOS,
247   PROP_ASYNC,
248   PROP_TS_OFFSET,
249   PROP_LAST_BUFFER,
250   PROP_LAST
251 };
252
253 static GstElementClass *parent_class = NULL;
254
255 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
256 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
257 static void gst_base_sink_finalize (GObject * object);
258
259 GType
260 gst_base_sink_get_type (void)
261 {
262   static GType base_sink_type = 0;
263
264   if (G_UNLIKELY (base_sink_type == 0)) {
265     static const GTypeInfo base_sink_info = {
266       sizeof (GstBaseSinkClass),
267       NULL,
268       NULL,
269       (GClassInitFunc) gst_base_sink_class_init,
270       NULL,
271       NULL,
272       sizeof (GstBaseSink),
273       0,
274       (GInstanceInitFunc) gst_base_sink_init,
275     };
276
277     base_sink_type = g_type_register_static (GST_TYPE_ELEMENT,
278         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
279   }
280   return base_sink_type;
281 }
282
283 static void gst_base_sink_set_property (GObject * object, guint prop_id,
284     const GValue * value, GParamSpec * pspec);
285 static void gst_base_sink_get_property (GObject * object, guint prop_id,
286     GValue * value, GParamSpec * pspec);
287
288 static gboolean gst_base_sink_send_event (GstElement * element,
289     GstEvent * event);
290 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
291
292 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
293 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
294 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
295     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
296 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
297     GstClockTime * start, GstClockTime * end);
298 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
299     GstPad * pad, gboolean flushing);
300 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
301     gboolean active);
302
303 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
304     GstStateChange transition);
305
306 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
307 static void gst_base_sink_loop (GstPad * pad);
308 static gboolean gst_base_sink_pad_activate (GstPad * pad);
309 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
310 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
311 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
312 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
313
314 /* check if an object was too late */
315 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
316     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
317     GstClockReturn status, GstClockTimeDiff jitter);
318
319 static void
320 gst_base_sink_class_init (GstBaseSinkClass * klass)
321 {
322   GObjectClass *gobject_class;
323   GstElementClass *gstelement_class;
324
325   gobject_class = G_OBJECT_CLASS (klass);
326   gstelement_class = GST_ELEMENT_CLASS (klass);
327
328   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
329       "basesink element");
330
331   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
332
333   parent_class = g_type_class_peek_parent (klass);
334
335   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
336   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
337   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
338
339   /* FIXME, this next value should be configured using an event from the
340    * upstream element, ie, the BUFFER_SIZE event. */
341   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
342       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
343           "Number of buffers to queue during preroll", 0, G_MAXUINT,
344           DEFAULT_PREROLL_QUEUE_LEN,
345           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
346
347   g_object_class_install_property (gobject_class, PROP_SYNC,
348       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
352       g_param_spec_int64 ("max-lateness", "Max Lateness",
353           "Maximum number of nanoseconds that a buffer can be late before it "
354           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357   g_object_class_install_property (gobject_class, PROP_QOS,
358       g_param_spec_boolean ("qos", "Qos",
359           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
360           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361   /**
362    * GstBaseSink:async
363    *
364    * If set to #TRUE, the basesink will perform asynchronous state changes.
365    * When set to #FALSE, the sink will not signal the parent when it prerolls.
366    * Use this option when dealing with sparse streams or when synchronisation is
367    * not required.
368    *
369    * Since: 0.10.15
370    */
371   g_object_class_install_property (gobject_class, PROP_ASYNC,
372       g_param_spec_boolean ("async", "Async",
373           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   /**
376    * GstBaseSink:ts-offset
377    *
378    * Controls the final synchronisation, a negative value will render the buffer
379    * earlier while a positive value delays playback. This property can be 
380    * used to fix synchronisation in bad files.
381    *
382    * Since: 0.10.15
383    */
384   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
385       g_param_spec_int64 ("ts-offset", "TS Offset",
386           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
387           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   /**
390    * GstBaseSink:last-buffer
391    *
392    * The last buffer that arrived in the sink and was used for preroll or for
393    * rendering. This property can be used to generate thumbnails. This property
394    * can be NULL when the sink has not yet received a bufer.
395    *
396    * Since: 0.10.15
397    */
398   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
399       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
400           "The last buffer received in the sink", GST_TYPE_BUFFER,
401           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
402
403   gstelement_class->change_state =
404       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
405   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
406   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
407
408   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
409   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
410   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
411   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
412   klass->activate_pull =
413       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
414 }
415
416 static GstCaps *
417 gst_base_sink_pad_getcaps (GstPad * pad)
418 {
419   GstBaseSinkClass *bclass;
420   GstBaseSink *bsink;
421   GstCaps *caps = NULL;
422
423   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
424   bclass = GST_BASE_SINK_GET_CLASS (bsink);
425   if (bclass->get_caps)
426     caps = bclass->get_caps (bsink);
427
428   if (caps == NULL) {
429     GstPadTemplate *pad_template;
430
431     pad_template =
432         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
433     if (pad_template != NULL) {
434       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
435     }
436   }
437   gst_object_unref (bsink);
438
439   return caps;
440 }
441
442 static gboolean
443 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
444 {
445   GstBaseSinkClass *bclass;
446   GstBaseSink *bsink;
447   gboolean res = TRUE;
448
449   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
450   bclass = GST_BASE_SINK_GET_CLASS (bsink);
451
452   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
453     GstPad *peer = gst_pad_get_peer (pad);
454
455     if (peer)
456       res = gst_pad_set_caps (peer, caps);
457     else
458       res = FALSE;
459
460     if (!res)
461       GST_DEBUG_OBJECT (bsink, "peer setcaps() failed");
462   }
463
464   if (res && bclass->set_caps)
465     res = bclass->set_caps (bsink, caps);
466
467   gst_object_unref (bsink);
468
469   return res;
470 }
471
472 static void
473 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
474 {
475   GstBaseSinkClass *bclass;
476   GstBaseSink *bsink;
477
478   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
479   bclass = GST_BASE_SINK_GET_CLASS (bsink);
480
481   if (bclass->fixate)
482     bclass->fixate (bsink, caps);
483
484   gst_object_unref (bsink);
485 }
486
487 static GstFlowReturn
488 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
489     GstCaps * caps, GstBuffer ** buf)
490 {
491   GstBaseSinkClass *bclass;
492   GstBaseSink *bsink;
493   GstFlowReturn result = GST_FLOW_OK;
494
495   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
496   bclass = GST_BASE_SINK_GET_CLASS (bsink);
497
498   if (bclass->buffer_alloc)
499     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
500   else
501     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
502
503   gst_object_unref (bsink);
504
505   return result;
506 }
507
508 static void
509 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
510 {
511   GstPadTemplate *pad_template;
512   GstBaseSinkPrivate *priv;
513
514   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
515
516   pad_template =
517       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
518   g_return_if_fail (pad_template != NULL);
519
520   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
521
522   gst_pad_set_getcaps_function (basesink->sinkpad,
523       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
524   gst_pad_set_setcaps_function (basesink->sinkpad,
525       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
526   gst_pad_set_fixatecaps_function (basesink->sinkpad,
527       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
528   gst_pad_set_bufferalloc_function (basesink->sinkpad,
529       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
530   gst_pad_set_activate_function (basesink->sinkpad,
531       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
532   gst_pad_set_activatepush_function (basesink->sinkpad,
533       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
534   gst_pad_set_activatepull_function (basesink->sinkpad,
535       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
536   gst_pad_set_event_function (basesink->sinkpad,
537       GST_DEBUG_FUNCPTR (gst_base_sink_event));
538   gst_pad_set_chain_function (basesink->sinkpad,
539       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
540   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
541
542   basesink->pad_mode = GST_ACTIVATE_NONE;
543   basesink->preroll_queue = g_queue_new ();
544   basesink->abidata.ABI.clip_segment = gst_segment_new ();
545   priv->have_latency = FALSE;
546
547   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
548   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
549
550   basesink->sync = DEFAULT_SYNC;
551   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
552   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
553   priv->async_enabled = DEFAULT_ASYNC;
554   priv->ts_offset = DEFAULT_TS_OFFSET;
555   priv->render_delay = 0;
556
557   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
558 }
559
560 static void
561 gst_base_sink_finalize (GObject * object)
562 {
563   GstBaseSink *basesink;
564
565   basesink = GST_BASE_SINK (object);
566
567   g_queue_free (basesink->preroll_queue);
568   gst_segment_free (basesink->abidata.ABI.clip_segment);
569
570   G_OBJECT_CLASS (parent_class)->finalize (object);
571 }
572
573 /**
574  * gst_base_sink_set_sync:
575  * @sink: the sink
576  * @sync: the new sync value.
577  *
578  * Configures @sink to synchronize on the clock or not. When
579  * @sync is FALSE, incomming samples will be played as fast as
580  * possible. If @sync is TRUE, the timestamps of the incomming
581  * buffers will be used to schedule the exact render time of its
582  * contents.
583  *
584  * Since: 0.10.4
585  */
586 void
587 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
588 {
589   g_return_if_fail (GST_IS_BASE_SINK (sink));
590
591   GST_OBJECT_LOCK (sink);
592   sink->sync = sync;
593   GST_OBJECT_UNLOCK (sink);
594 }
595
596 /**
597  * gst_base_sink_get_sync:
598  * @sink: the sink
599  *
600  * Checks if @sink is currently configured to synchronize against the
601  * clock.
602  *
603  * Returns: TRUE if the sink is configured to synchronize against the clock.
604  *
605  * Since: 0.10.4
606  */
607 gboolean
608 gst_base_sink_get_sync (GstBaseSink * sink)
609 {
610   gboolean res;
611
612   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
613
614   GST_OBJECT_LOCK (sink);
615   res = sink->sync;
616   GST_OBJECT_UNLOCK (sink);
617
618   return res;
619 }
620
621 /**
622  * gst_base_sink_set_max_lateness:
623  * @sink: the sink
624  * @max_lateness: the new max lateness value.
625  *
626  * Sets the new max lateness value to @max_lateness. This value is
627  * used to decide if a buffer should be dropped or not based on the
628  * buffer timestamp and the current clock time. A value of -1 means
629  * an unlimited time.
630  *
631  * Since: 0.10.4
632  */
633 void
634 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
635 {
636   g_return_if_fail (GST_IS_BASE_SINK (sink));
637
638   GST_OBJECT_LOCK (sink);
639   sink->abidata.ABI.max_lateness = max_lateness;
640   GST_OBJECT_UNLOCK (sink);
641 }
642
643 /**
644  * gst_base_sink_get_max_lateness:
645  * @sink: the sink
646  *
647  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
648  * more details.
649  *
650  * Returns: The maximum time in nanoseconds that a buffer can be late
651  * before it is dropped and not rendered. A value of -1 means an
652  * unlimited time.
653  *
654  * Since: 0.10.4
655  */
656 gint64
657 gst_base_sink_get_max_lateness (GstBaseSink * sink)
658 {
659   gint64 res;
660
661   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
662
663   GST_OBJECT_LOCK (sink);
664   res = sink->abidata.ABI.max_lateness;
665   GST_OBJECT_UNLOCK (sink);
666
667   return res;
668 }
669
670 /**
671  * gst_base_sink_set_qos_enabled:
672  * @sink: the sink
673  * @enabled: the new qos value.
674  *
675  * Configures @sink to send Quality-of-Service events upstream.
676  *
677  * Since: 0.10.5
678  */
679 void
680 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
681 {
682   g_return_if_fail (GST_IS_BASE_SINK (sink));
683
684   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
685 }
686
687 /**
688  * gst_base_sink_is_qos_enabled:
689  * @sink: the sink
690  *
691  * Checks if @sink is currently configured to send Quality-of-Service events
692  * upstream.
693  *
694  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
695  *
696  * Since: 0.10.5
697  */
698 gboolean
699 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
700 {
701   gboolean res;
702
703   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
704
705   res = g_atomic_int_get (&sink->priv->qos_enabled);
706
707   return res;
708 }
709
710 /**
711  * gst_base_sink_set_async_enabled:
712  * @sink: the sink
713  * @enabled: the new async value.
714  *
715  * Configures @sink to perform all state changes asynchronusly. When async is
716  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
717  * preroll buffer. This feature is usefull if the sink does not synchronize
718  * against the clock or when it is dealing with sparse streams.
719  *
720  * Since: 0.10.15
721  */
722 void
723 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
724 {
725   g_return_if_fail (GST_IS_BASE_SINK (sink));
726
727   GST_PAD_PREROLL_LOCK (sink->sinkpad);
728   sink->priv->async_enabled = enabled;
729   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
730   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
731 }
732
733 /**
734  * gst_base_sink_is_async_enabled:
735  * @sink: the sink
736  *
737  * Checks if @sink is currently configured to perform asynchronous state
738  * changes to PAUSED.
739  *
740  * Returns: TRUE if the sink is configured to perform asynchronous state
741  * changes.
742  *
743  * Since: 0.10.15
744  */
745 gboolean
746 gst_base_sink_is_async_enabled (GstBaseSink * sink)
747 {
748   gboolean res;
749
750   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
751
752   GST_PAD_PREROLL_LOCK (sink->sinkpad);
753   res = sink->priv->async_enabled;
754   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
755
756   return res;
757 }
758
759 /**
760  * gst_base_sink_set_ts_offset:
761  * @sink: the sink
762  * @offset: the new offset
763  *
764  * Adjust the synchronisation of @sink with @offset. A negative value will
765  * render buffers earlier than their timestamp. A positive value will delay
766  * rendering. This function can be used to fix playback of badly timestamped
767  * buffers.
768  *
769  * Since: 0.10.15
770  */
771 void
772 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
773 {
774   g_return_if_fail (GST_IS_BASE_SINK (sink));
775
776   GST_OBJECT_LOCK (sink);
777   sink->priv->ts_offset = offset;
778   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
779   GST_OBJECT_UNLOCK (sink);
780 }
781
782 /**
783  * gst_base_sink_get_ts_offset:
784  * @sink: the sink
785  *
786  * Get the synchronisation offset of @sink.
787  *
788  * Returns: The synchronisation offset.
789  *
790  * Since: 0.10.15
791  */
792 GstClockTimeDiff
793 gst_base_sink_get_ts_offset (GstBaseSink * sink)
794 {
795   GstClockTimeDiff res;
796
797   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
798
799   GST_OBJECT_LOCK (sink);
800   res = sink->priv->ts_offset;
801   GST_OBJECT_UNLOCK (sink);
802
803   return res;
804 }
805
806 /**
807  * gst_base_sink_get_last_buffer:
808  * @sink: the sink
809  *
810  * Get the last buffer that arrived in the sink and was used for preroll or for
811  * rendering. This property can be used to generate thumbnails.
812  *
813  * The #GstCaps on the buffer can be used to determine the type of the buffer.
814  * 
815  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
816  * NULL when no buffer has arrived in the sink yet or when the sink is not in
817  * PAUSED or PLAYING.
818  *
819  * Since: 0.10.15
820  */
821 GstBuffer *
822 gst_base_sink_get_last_buffer (GstBaseSink * sink)
823 {
824   GstBuffer *res;
825
826   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
827
828   GST_OBJECT_LOCK (sink);
829   if ((res = sink->priv->last_buffer))
830     gst_buffer_ref (res);
831   GST_OBJECT_UNLOCK (sink);
832
833   return res;
834 }
835
836 static void
837 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
838 {
839   GstBuffer *old;
840
841   if (buffer)
842     gst_buffer_ref (buffer);
843
844   GST_OBJECT_LOCK (sink);
845   old = sink->priv->last_buffer;
846   sink->priv->last_buffer = buffer;
847   GST_OBJECT_UNLOCK (sink);
848
849   if (old)
850     gst_buffer_unref (old);
851 }
852
853 /**
854  * gst_base_sink_get_latency:
855  * @sink: the sink
856  *
857  * Get the currently configured latency.
858  *
859  * Returns: The configured latency.
860  *
861  * Since: 0.10.12
862  */
863 GstClockTime
864 gst_base_sink_get_latency (GstBaseSink * sink)
865 {
866   GstClockTime res;
867
868   GST_OBJECT_LOCK (sink);
869   res = sink->priv->latency;
870   GST_OBJECT_UNLOCK (sink);
871
872   return res;
873 }
874
875 /**
876  * gst_base_sink_query_latency:
877  * @sink: the sink
878  * @live: if the sink is live
879  * @upstream_live: if an upstream element is live
880  * @min_latency: the min latency of the upstream elements
881  * @max_latency: the max latency of the upstream elements
882  *
883  * Query the sink for the latency parameters. The latency will be queried from
884  * the upstream elements. @live will be TRUE if @sink is configured to
885  * synchronize against the clock. @upstream_live will be TRUE if an upstream
886  * element is live. 
887  *
888  * If both @live and @upstream_live are TRUE, the sink will want to compensate
889  * for the latency introduced by the upstream elements by setting the
890  * @min_latency to a strictly possitive value.
891  *
892  * This function is mostly used by subclasses. 
893  *
894  * Returns: TRUE if the query succeeded.
895  *
896  * Since: 0.10.12
897  */
898 gboolean
899 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
900     gboolean * upstream_live, GstClockTime * min_latency,
901     GstClockTime * max_latency)
902 {
903   gboolean l, us_live, res, have_latency;
904   GstClockTime min, max, render_delay;
905   GstQuery *query;
906   GstClockTime us_min, us_max;
907
908   /* we are live when we sync to the clock */
909   GST_OBJECT_LOCK (sink);
910   l = sink->sync;
911   have_latency = sink->priv->have_latency;
912   render_delay = sink->priv->render_delay;
913   GST_OBJECT_UNLOCK (sink);
914
915   /* assume no latency */
916   min = 0;
917   max = -1;
918   us_live = FALSE;
919
920   if (have_latency) {
921     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
922     /* we are ready for a latency query this is when we preroll or when we are
923      * not async. */
924     query = gst_query_new_latency ();
925
926     /* ask the peer for the latency */
927     if ((res = gst_base_sink_peer_query (sink, query))) {
928       /* get upstream min and max latency */
929       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
930
931       if (us_live) {
932         /* upstream live, use its latency, subclasses should use these
933          * values to create the complete latency. */
934         min = us_min;
935         max = us_max;
936       }
937       if (l) {
938         /* we need to add the render delay if we are live */
939         if (min != -1)
940           min += render_delay;
941         if (max != -1)
942           max += render_delay;
943       }
944     }
945     gst_query_unref (query);
946   } else {
947     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
948     res = FALSE;
949   }
950
951   /* not live, we tried to do the query, if it failed we return TRUE anyway */
952   if (!res) {
953     if (!l) {
954       res = TRUE;
955       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
956     } else {
957       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
958     }
959   }
960
961   if (res) {
962     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
963         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
964         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
965
966     if (live)
967       *live = l;
968     if (upstream_live)
969       *upstream_live = us_live;
970     if (min_latency)
971       *min_latency = min;
972     if (max_latency)
973       *max_latency = max;
974   }
975   return res;
976 }
977
978 /**
979  * gst_base_sink_set_render_delay:
980  * @sink: a #GstBaseSink
981  * @delay: the new delay
982  *
983  * Set the render delay in @sink to @delay. The render delay is the time 
984  * between actual rendering of a buffer and its synchronisation time. Some
985  * devices might delay media rendering which can be compensated for with this
986  * function. 
987  *
988  * After calling this function, this sink will report additional latency and
989  * other sinks will adjust their latency to delay the rendering of their media.
990  *
991  * This function is usually called by subclasses.
992  *
993  * Since: 0.10.21
994  */
995 void
996 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
997 {
998   g_return_if_fail (GST_IS_BASE_SINK (sink));
999
1000   GST_OBJECT_LOCK (sink);
1001   sink->priv->render_delay = delay;
1002   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1003       GST_TIME_ARGS (delay));
1004   GST_OBJECT_UNLOCK (sink);
1005 }
1006
1007 /**
1008  * gst_base_sink_get_render_delay:
1009  * @sink: a #GstBaseSink
1010  *
1011  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1012  * information about the render delay.
1013  *
1014  * Returns: the render delay of @sink.
1015  *
1016  * Since: 0.10.21
1017  */
1018 GstClockTime
1019 gst_base_sink_get_render_delay (GstBaseSink * sink)
1020 {
1021   GstClockTimeDiff res;
1022
1023   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1024
1025   GST_OBJECT_LOCK (sink);
1026   res = sink->priv->render_delay;
1027   GST_OBJECT_UNLOCK (sink);
1028
1029   return res;
1030 }
1031
1032 static void
1033 gst_base_sink_set_property (GObject * object, guint prop_id,
1034     const GValue * value, GParamSpec * pspec)
1035 {
1036   GstBaseSink *sink = GST_BASE_SINK (object);
1037
1038   switch (prop_id) {
1039     case PROP_PREROLL_QUEUE_LEN:
1040       /* preroll lock necessary to serialize with finish_preroll */
1041       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1042       sink->preroll_queue_max_len = g_value_get_uint (value);
1043       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1044       break;
1045     case PROP_SYNC:
1046       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1047       break;
1048     case PROP_MAX_LATENESS:
1049       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1050       break;
1051     case PROP_QOS:
1052       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1053       break;
1054     case PROP_ASYNC:
1055       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1056       break;
1057     case PROP_TS_OFFSET:
1058       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1059       break;
1060     default:
1061       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1062       break;
1063   }
1064 }
1065
1066 static void
1067 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1068     GParamSpec * pspec)
1069 {
1070   GstBaseSink *sink = GST_BASE_SINK (object);
1071
1072   switch (prop_id) {
1073     case PROP_PREROLL_QUEUE_LEN:
1074       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1075       g_value_set_uint (value, sink->preroll_queue_max_len);
1076       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1077       break;
1078     case PROP_SYNC:
1079       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1080       break;
1081     case PROP_MAX_LATENESS:
1082       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1083       break;
1084     case PROP_QOS:
1085       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1086       break;
1087     case PROP_ASYNC:
1088       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1089       break;
1090     case PROP_TS_OFFSET:
1091       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1092       break;
1093     case PROP_LAST_BUFFER:
1094       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1095       break;
1096     default:
1097       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1098       break;
1099   }
1100 }
1101
1102
1103 static GstCaps *
1104 gst_base_sink_get_caps (GstBaseSink * sink)
1105 {
1106   return NULL;
1107 }
1108
1109 static gboolean
1110 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1111 {
1112   return TRUE;
1113 }
1114
1115 static GstFlowReturn
1116 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1117     GstCaps * caps, GstBuffer ** buf)
1118 {
1119   *buf = NULL;
1120   return GST_FLOW_OK;
1121 }
1122
1123 /* with PREROLL_LOCK, STREAM_LOCK */
1124 static void
1125 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1126 {
1127   GstMiniObject *obj;
1128
1129   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1130   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1131     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1132     gst_mini_object_unref (obj);
1133   }
1134   /* we can't have EOS anymore now */
1135   basesink->eos = FALSE;
1136   basesink->priv->received_eos = FALSE;
1137   basesink->have_preroll = FALSE;
1138   basesink->eos_queued = FALSE;
1139   basesink->preroll_queued = 0;
1140   basesink->buffers_queued = 0;
1141   basesink->events_queued = 0;
1142   /* can't report latency anymore until we preroll again */
1143   if (basesink->priv->async_enabled) {
1144     GST_OBJECT_LOCK (basesink);
1145     basesink->priv->have_latency = FALSE;
1146     GST_OBJECT_UNLOCK (basesink);
1147   }
1148   /* and signal any waiters now */
1149   GST_PAD_PREROLL_SIGNAL (pad);
1150 }
1151
1152 /* with STREAM_LOCK, configures given segment with the event information. */
1153 static void
1154 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1155     GstEvent * event, GstSegment * segment)
1156 {
1157   gboolean update;
1158   gdouble rate, arate;
1159   GstFormat format;
1160   gint64 start;
1161   gint64 stop;
1162   gint64 time;
1163
1164   /* the newsegment event is needed to bring the buffer timestamps to the
1165    * stream time and to drop samples outside of the playback segment. */
1166   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1167       &start, &stop, &time);
1168
1169   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1170    * We protect with the OBJECT_LOCK so that we can use the values to
1171    * safely answer a POSITION query. */
1172   GST_OBJECT_LOCK (basesink);
1173   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1174       stop, time);
1175
1176   if (format == GST_FORMAT_TIME) {
1177     GST_DEBUG_OBJECT (basesink,
1178         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1179         "format GST_FORMAT_TIME, "
1180         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1181         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1182         update, rate, arate, GST_TIME_ARGS (segment->start),
1183         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1184         GST_TIME_ARGS (segment->accum));
1185   } else {
1186     GST_DEBUG_OBJECT (basesink,
1187         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1188         "format %d, "
1189         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1190         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1191         segment->format, segment->start, segment->stop, segment->time,
1192         segment->accum);
1193   }
1194   GST_OBJECT_UNLOCK (basesink);
1195 }
1196
1197 /* with PREROLL_LOCK, STREAM_LOCK */
1198 static gboolean
1199 gst_base_sink_commit_state (GstBaseSink * basesink)
1200 {
1201   /* commit state and proceed to next pending state */
1202   GstState current, next, pending, post_pending;
1203   gboolean post_paused = FALSE;
1204   gboolean post_async_done = FALSE;
1205   gboolean post_playing = FALSE;
1206   gboolean sync;
1207
1208   /* we are certainly not playing async anymore now */
1209   basesink->playing_async = FALSE;
1210
1211   GST_OBJECT_LOCK (basesink);
1212   current = GST_STATE (basesink);
1213   next = GST_STATE_NEXT (basesink);
1214   pending = GST_STATE_PENDING (basesink);
1215   post_pending = pending;
1216   sync = basesink->sync;
1217
1218   switch (pending) {
1219     case GST_STATE_PLAYING:
1220     {
1221       GstBaseSinkClass *bclass;
1222       GstStateChangeReturn ret;
1223
1224       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1225
1226       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1227
1228       basesink->need_preroll = FALSE;
1229       post_async_done = TRUE;
1230       basesink->priv->commited = TRUE;
1231       post_playing = TRUE;
1232       /* post PAUSED too when we were READY */
1233       if (current == GST_STATE_READY) {
1234         post_paused = TRUE;
1235       }
1236
1237       /* make sure we notify the subclass of async playing */
1238       if (bclass->async_play) {
1239         ret = bclass->async_play (basesink);
1240         if (ret == GST_STATE_CHANGE_FAILURE)
1241           goto async_failed;
1242       }
1243       break;
1244     }
1245     case GST_STATE_PAUSED:
1246       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1247       post_paused = TRUE;
1248       post_async_done = TRUE;
1249       basesink->priv->commited = TRUE;
1250       post_pending = GST_STATE_VOID_PENDING;
1251       break;
1252     case GST_STATE_READY:
1253     case GST_STATE_NULL:
1254       goto stopping;
1255     case GST_STATE_VOID_PENDING:
1256       goto nothing_pending;
1257     default:
1258       break;
1259   }
1260
1261   /* we can report latency queries now */
1262   basesink->priv->have_latency = TRUE;
1263
1264   GST_STATE (basesink) = pending;
1265   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1266   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1267   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1268   GST_OBJECT_UNLOCK (basesink);
1269
1270   if (post_paused) {
1271     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1272     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1273         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1274             current, next, post_pending));
1275   }
1276   if (post_async_done) {
1277     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1278     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1279         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1280   }
1281   if (post_playing) {
1282     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1283     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1284         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1285             next, pending, GST_STATE_VOID_PENDING));
1286   }
1287
1288   GST_STATE_BROADCAST (basesink);
1289
1290   return TRUE;
1291
1292 nothing_pending:
1293   {
1294     /* Depending on the state, set our vars. We get in this situation when the
1295      * state change function got a change to update the state vars before the
1296      * streaming thread did. This is fine but we need to make sure that we
1297      * update the need_preroll var since it was TRUE when we got here and might
1298      * become FALSE if we got to PLAYING. */
1299     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1300         gst_element_state_get_name (current));
1301     switch (current) {
1302       case GST_STATE_PLAYING:
1303         basesink->need_preroll = FALSE;
1304         break;
1305       case GST_STATE_PAUSED:
1306         basesink->need_preroll = TRUE;
1307         break;
1308       default:
1309         basesink->need_preroll = FALSE;
1310         basesink->flushing = TRUE;
1311         break;
1312     }
1313     /* we can report latency queries now */
1314     basesink->priv->have_latency = TRUE;
1315     GST_OBJECT_UNLOCK (basesink);
1316     return TRUE;
1317   }
1318 stopping:
1319   {
1320     /* app is going to READY */
1321     GST_DEBUG_OBJECT (basesink, "stopping");
1322     basesink->need_preroll = FALSE;
1323     basesink->flushing = TRUE;
1324     GST_OBJECT_UNLOCK (basesink);
1325     return FALSE;
1326   }
1327 async_failed:
1328   {
1329     GST_DEBUG_OBJECT (basesink, "async commit failed");
1330     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1331     GST_OBJECT_UNLOCK (basesink);
1332     return FALSE;
1333   }
1334 }
1335
1336
1337 /* with STREAM_LOCK, PREROLL_LOCK
1338  *
1339  * Returns TRUE if the object needs synchronisation and takes therefore
1340  * part in prerolling.
1341  *
1342  * rsstart/rsstop contain the start/stop in stream time.
1343  * rrstart/rrstop contain the start/stop in running time.
1344  */
1345 static gboolean
1346 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1347     GstClockTime * rsstart, GstClockTime * rsstop,
1348     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1349     GstSegment * segment)
1350 {
1351   GstBaseSinkClass *bclass;
1352   GstBuffer *buffer;
1353   GstClockTime start, stop;     /* raw start/stop timestamps */
1354   gint64 cstart, cstop;         /* clipped raw timestamps */
1355   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1356   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1357   GstFormat format;
1358   GstBaseSinkPrivate *priv;
1359
1360   priv = basesink->priv;
1361
1362   /* start with nothing */
1363   start = stop = sstart = sstop = rstart = rstop = -1;
1364
1365   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1366     GstEvent *event = GST_EVENT_CAST (obj);
1367
1368     switch (GST_EVENT_TYPE (event)) {
1369         /* EOS event needs syncing */
1370       case GST_EVENT_EOS:
1371       {
1372         if (basesink->segment.rate >= 0.0) {
1373           sstart = sstop = priv->current_sstop;
1374           if (sstart == -1) {
1375             /* we have not seen a buffer yet, use the segment values */
1376             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1377                 basesink->segment.format, basesink->segment.stop);
1378           }
1379         } else {
1380           sstart = sstop = priv->current_sstart;
1381           if (sstart == -1) {
1382             /* we have not seen a buffer yet, use the segment values */
1383             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1384                 basesink->segment.format, basesink->segment.start);
1385           }
1386         }
1387
1388         rstart = rstop = priv->eos_rtime;
1389         *do_sync = rstart != -1;
1390         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1391             GST_TIME_ARGS (rstart));
1392         goto done;
1393       }
1394       default:
1395         /* other events do not need syncing */
1396         /* FIXME, maybe NEWSEGMENT might need synchronisation
1397          * since the POSITION query depends on accumulated times and
1398          * we cannot accumulate the current segment before the previous
1399          * one completed.
1400          */
1401         return FALSE;
1402     }
1403   }
1404
1405   /* else do buffer sync code */
1406   buffer = GST_BUFFER_CAST (obj);
1407
1408   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1409
1410   /* just get the times to see if we need syncing */
1411   if (bclass->get_times)
1412     bclass->get_times (basesink, buffer, &start, &stop);
1413
1414   if (start == -1) {
1415     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1416     *do_sync = FALSE;
1417   } else {
1418     *do_sync = TRUE;
1419   }
1420
1421   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1422       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1423       GST_TIME_ARGS (stop), *do_sync);
1424
1425   /* collect segment and format for code clarity */
1426   format = segment->format;
1427
1428   /* no timestamp clipping if we did not * get a TIME segment format */
1429   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1430     cstart = start;
1431     cstop = stop;
1432     /* do running and stream time in TIME format */
1433     format = GST_FORMAT_TIME;
1434     goto do_times;
1435   }
1436
1437   /* clip */
1438   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1439               (gint64) start, (gint64) stop, &cstart, &cstop)))
1440     goto out_of_segment;
1441
1442   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1443     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1444         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1445         GST_TIME_ARGS (cstop));
1446   }
1447
1448   /* set last stop position */
1449   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1450     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1451   else
1452     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1453
1454 do_times:
1455   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1456    * upstream is behaving very badly */
1457   sstart = gst_segment_to_stream_time (segment, format, cstart);
1458   sstop = gst_segment_to_stream_time (segment, format, cstop);
1459   rstart = gst_segment_to_running_time (segment, format, cstart);
1460   rstop = gst_segment_to_running_time (segment, format, cstop);
1461
1462 done:
1463   /* save times */
1464   *rsstart = sstart;
1465   *rsstop = sstop;
1466   *rrstart = rstart;
1467   *rrstop = rstop;
1468
1469   /* buffers and EOS always need syncing and preroll */
1470   return TRUE;
1471
1472   /* special cases */
1473 out_of_segment:
1474   {
1475     /* should not happen since we clip them in the chain function already, 
1476      * we return FALSE so that we don't try to sync on it. */
1477     GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
1478         (NULL), ("unexpected buffer out of segment found."));
1479     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1480     return FALSE;
1481   }
1482 }
1483
1484 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1485  * adjust a timestamp with the latency and timestamp offset */
1486 static GstClockTime
1487 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1488 {
1489   GstClockTimeDiff ts_offset;
1490
1491   /* don't do anything funny with invalid timestamps */
1492   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1493     return time;
1494
1495   time += basesink->priv->latency;
1496
1497   /* apply offset, be carefull for underflows */
1498   ts_offset = basesink->priv->ts_offset;
1499   if (ts_offset < 0) {
1500     ts_offset = -ts_offset;
1501     if (ts_offset < time)
1502       time -= ts_offset;
1503     else
1504       time = 0;
1505   } else
1506     time += ts_offset;
1507
1508   return time;
1509 }
1510
1511 /* gst_base_sink_wait_clock:
1512  * @sink: the sink
1513  * @time: the running_time to be reached
1514  * @jitter: the jitter to be filled with time diff (can be NULL)
1515  *
1516  * This function will block until @time is reached. It is usually called by
1517  * subclasses that use their own internal synchronisation.
1518  *
1519  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1520  * returned. Likewise, if synchronisation is disabled in the element or there
1521  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1522  *
1523  * This function should only be called with the PREROLL_LOCK held, like when
1524  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1525  * the ::render vmethod.
1526  *
1527  * The @time argument should be the running_time of when this method should
1528  * return and is not adjusted with any latency or offset configured in the
1529  * sink.
1530  *
1531  * Since 0.10.20
1532  *
1533  * Returns: #GstClockReturn
1534  */
1535 GstClockReturn
1536 gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time,
1537     GstClockTimeDiff * jitter)
1538 {
1539   GstClockID id;
1540   GstClockReturn ret;
1541   GstClock *clock;
1542
1543   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1544     goto invalid_time;
1545
1546   GST_OBJECT_LOCK (basesink);
1547   if (G_UNLIKELY (!basesink->sync))
1548     goto no_sync;
1549
1550   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
1551     goto no_clock;
1552
1553   /* add base_time to running_time to get the time against the clock */
1554   time += GST_ELEMENT_CAST (basesink)->base_time;
1555
1556   id = gst_clock_new_single_shot_id (clock, time);
1557   GST_OBJECT_UNLOCK (basesink);
1558
1559   /* A blocking wait is performed on the clock. We save the ClockID
1560    * so we can unlock the entry at any time. While we are blocking, we 
1561    * release the PREROLL_LOCK so that other threads can interrupt the
1562    * entry. */
1563   basesink->clock_id = id;
1564   /* release the preroll lock while waiting */
1565   GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1566
1567   ret = gst_clock_id_wait (id, jitter);
1568
1569   GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1570   gst_clock_id_unref (id);
1571   basesink->clock_id = NULL;
1572
1573   return ret;
1574
1575   /* no syncing needed */
1576 invalid_time:
1577   {
1578     GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed");
1579     return GST_CLOCK_BADTIME;
1580   }
1581 no_sync:
1582   {
1583     GST_DEBUG_OBJECT (basesink, "sync disabled");
1584     GST_OBJECT_UNLOCK (basesink);
1585     return GST_CLOCK_BADTIME;
1586   }
1587 no_clock:
1588   {
1589     GST_DEBUG_OBJECT (basesink, "no clock, can't sync");
1590     GST_OBJECT_UNLOCK (basesink);
1591     return GST_CLOCK_BADTIME;
1592   }
1593 }
1594
1595 /**
1596  * gst_base_sink_wait_preroll:
1597  * @sink: the sink
1598  *
1599  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1600  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1601  * this method before continuing to render the remaining data.
1602  *
1603  * This function will block until a state change to PLAYING happens (in which
1604  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1605  * to a state change to READY or a FLUSH event (in which case this function
1606  * returns #GST_FLOW_WRONG_STATE).
1607  *
1608  * Since: 0.10.11
1609  *
1610  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1611  * continue. Any other return value should be returned from the render vmethod.
1612  */
1613 GstFlowReturn
1614 gst_base_sink_wait_preroll (GstBaseSink * sink)
1615 {
1616   sink->have_preroll = TRUE;
1617   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1618   /* block until the state changes, or we get a flush, or something */
1619   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1620   sink->have_preroll = FALSE;
1621   if (G_UNLIKELY (sink->flushing))
1622     goto stopping;
1623   GST_DEBUG_OBJECT (sink, "continue after preroll");
1624
1625   return GST_FLOW_OK;
1626
1627   /* ERRORS */
1628 stopping:
1629   {
1630     GST_DEBUG_OBJECT (sink, "preroll interrupted");
1631     return GST_FLOW_WRONG_STATE;
1632   }
1633 }
1634
1635 /**
1636  * gst_base_sink_wait_eos:
1637  * @sink: the sink
1638  * @time: the running_time to be reached
1639  * @jitter: the jitter to be filled with time diff (can be NULL)
1640  *
1641  * This function will block until @time is reached. It is usually called by
1642  * subclasses that use their own internal synchronisation but want to let the
1643  * EOS be handled by the base class.
1644  *
1645  * This function should only be called with the PREROLL_LOCK held, like when
1646  * receiving an EOS event in the ::event vmethod.
1647  *
1648  * The @time argument should be the running_time of when the EOS should happen
1649  * and will be adjusted with any latency and offset configured in the sink.
1650  *
1651  * Since 0.10.15
1652  *
1653  * Returns: #GstFlowReturn
1654  */
1655 GstFlowReturn
1656 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
1657     GstClockTimeDiff * jitter)
1658 {
1659   GstClockReturn status;
1660   GstFlowReturn ret;
1661
1662   do {
1663     GstClockTime stime;
1664
1665     GST_DEBUG_OBJECT (sink, "checking preroll");
1666
1667     /* first wait for the playing state before we can continue */
1668     if (G_UNLIKELY (sink->need_preroll)) {
1669       ret = gst_base_sink_wait_preroll (sink);
1670       if (ret != GST_FLOW_OK)
1671         goto flushing;
1672     }
1673
1674     /* preroll done, we can sync since we are in PLAYING now. */
1675     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
1676         GST_TIME_FORMAT, GST_TIME_ARGS (time));
1677
1678     /* compensate for latency and ts_offset. We don't adjust for render delay
1679      * because we don't interact with the device on EOS normally. */
1680     stime = gst_base_sink_adjust_time (sink, time);
1681
1682     /* wait for the clock, this can be interrupted because we got shut down or 
1683      * we PAUSED. */
1684     status = gst_base_sink_wait_clock (sink, stime, jitter);
1685
1686     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
1687
1688     /* invalid time, no clock or sync disabled, just continue then */
1689     if (status == GST_CLOCK_BADTIME)
1690       break;
1691
1692     /* waiting could have been interrupted and we can be flushing now */
1693     if (G_UNLIKELY (sink->flushing))
1694       goto flushing;
1695
1696     /* retry if we got unscheduled, which means we did not reach the timeout
1697      * yet. if some other error occures, we continue. */
1698   } while (status == GST_CLOCK_UNSCHEDULED);
1699
1700   GST_DEBUG_OBJECT (sink, "end of stream");
1701
1702   return GST_FLOW_OK;
1703
1704   /* ERRORS */
1705 flushing:
1706   {
1707     GST_DEBUG_OBJECT (sink, "we are flushing");
1708     return GST_FLOW_WRONG_STATE;
1709   }
1710 }
1711
1712 /* with STREAM_LOCK, PREROLL_LOCK
1713  *
1714  * Make sure we are in PLAYING and synchronize an object to the clock.
1715  *
1716  * If we need preroll, we are not in PLAYING. We try to commit the state
1717  * if needed and then block if we still are not PLAYING.
1718  *
1719  * We start waiting on the clock in PLAYING. If we got interrupted, we
1720  * immediatly try to re-preroll.
1721  *
1722  * Some objects do not need synchronisation (most events) and so this function
1723  * immediatly returns GST_FLOW_OK.
1724  *
1725  * for objects that arrive later than max-lateness to be synchronized to the 
1726  * clock have the @late boolean set to TRUE.
1727  *
1728  * This function keeps a running average of the jitter (the diff between the
1729  * clock time and the requested sync time). The jitter is negative for
1730  * objects that arrive in time and positive for late buffers.
1731  *
1732  * does not take ownership of obj.
1733  */
1734 static GstFlowReturn
1735 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
1736     GstMiniObject * obj, gboolean * late)
1737 {
1738   GstClockTimeDiff jitter;
1739   gboolean syncable;
1740   GstClockReturn status = GST_CLOCK_OK;
1741   GstClockTime rstart, rstop, sstart, sstop, stime;
1742   gboolean do_sync;
1743   GstBaseSinkPrivate *priv;
1744
1745   priv = basesink->priv;
1746
1747   sstart = sstop = rstart = rstop = -1;
1748   do_sync = TRUE;
1749
1750   priv->current_rstart = -1;
1751
1752   /* get timing information for this object against the render segment */
1753   syncable = gst_base_sink_get_sync_times (basesink, obj,
1754       &sstart, &sstop, &rstart, &rstop, &do_sync, &basesink->segment);
1755
1756   /* a syncable object needs to participate in preroll and
1757    * clocking. All buffers and EOS are syncable. */
1758   if (G_UNLIKELY (!syncable))
1759     goto not_syncable;
1760
1761   /* store timing info for current object */
1762   priv->current_rstart = rstart;
1763   priv->current_rstop = (rstop != -1 ? rstop : rstart);
1764   /* save sync time for eos when the previous object needed sync */
1765   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
1766
1767 again:
1768   /* first do preroll, this makes sure we commit our state
1769    * to PAUSED and can continue to PLAYING. We cannot perform
1770    * any clock sync in PAUSED because there is no clock. 
1771    */
1772   while (G_UNLIKELY (basesink->need_preroll)) {
1773     GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
1774
1775     if (G_LIKELY (basesink->playing_async)) {
1776       /* commit state */
1777       if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
1778         goto stopping;
1779     }
1780
1781     /* need to recheck here because the commit state could have
1782      * made us not need the preroll anymore */
1783     if (G_LIKELY (basesink->need_preroll)) {
1784       /* block until the state changes, or we get a flush, or something */
1785       if (gst_base_sink_wait_preroll (basesink) != GST_FLOW_OK)
1786         goto flushing;
1787     }
1788   }
1789
1790   /* After rendering we store the position of the last buffer so that we can use
1791    * it to report the position. We need to take the lock here. */
1792   GST_OBJECT_LOCK (basesink);
1793   priv->current_sstart = sstart;
1794   priv->current_sstop = (sstop != -1 ? sstop : sstart);
1795   GST_OBJECT_UNLOCK (basesink);
1796
1797   if (!do_sync)
1798     goto done;
1799
1800   /* adjust for latency */
1801   stime = gst_base_sink_adjust_time (basesink, rstart);
1802
1803   /* adjust for render-delay, avoid underflows */
1804   if (stime != -1) {
1805     if (stime > priv->render_delay)
1806       stime -= priv->render_delay;
1807     else
1808       stime = 0;
1809   }
1810
1811   /* preroll done, we can sync since we are in PLAYING now. */
1812   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
1813       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
1814       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
1815
1816   /* This function will return immediatly if start == -1, no clock
1817    * or sync is disabled with GST_CLOCK_BADTIME. */
1818   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
1819
1820   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
1821
1822   /* invalid time, no clock or sync disabled, just render */
1823   if (status == GST_CLOCK_BADTIME)
1824     goto done;
1825
1826   /* waiting could have been interrupted and we can be flushing now */
1827   if (G_UNLIKELY (basesink->flushing))
1828     goto flushing;
1829
1830   /* check for unlocked by a state change, we are not flushing so
1831    * we can try to preroll on the current buffer. */
1832   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
1833     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
1834     goto again;
1835   }
1836
1837   /* successful syncing done, record observation */
1838   priv->current_jitter = jitter;
1839
1840   /* check if the object should be dropped */
1841   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
1842       status, jitter);
1843
1844 done:
1845   return GST_FLOW_OK;
1846
1847   /* ERRORS */
1848 not_syncable:
1849   {
1850     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
1851     return GST_FLOW_OK;
1852   }
1853 flushing:
1854   {
1855     GST_DEBUG_OBJECT (basesink, "we are flushing");
1856     return GST_FLOW_WRONG_STATE;
1857   }
1858 stopping:
1859   {
1860     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
1861     return GST_FLOW_WRONG_STATE;
1862   }
1863 }
1864
1865 static gboolean
1866 gst_base_sink_send_qos (GstBaseSink * basesink,
1867     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
1868 {
1869   GstEvent *event;
1870   gboolean res;
1871
1872   /* generate Quality-of-Service event */
1873   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
1874       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1875       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
1876
1877   event = gst_event_new_qos (proportion, diff, time);
1878
1879   /* send upstream */
1880   res = gst_pad_push_event (basesink->sinkpad, event);
1881
1882   return res;
1883 }
1884
1885 static void
1886 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
1887 {
1888   GstBaseSinkPrivate *priv;
1889   GstClockTime start, stop;
1890   GstClockTimeDiff jitter;
1891   GstClockTime pt, entered, left;
1892   GstClockTime duration;
1893   gdouble rate;
1894
1895   priv = sink->priv;
1896
1897   start = priv->current_rstart;
1898
1899   /* if Quality-of-Service disabled, do nothing */
1900   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
1901     return;
1902
1903   stop = priv->current_rstop;
1904   jitter = priv->current_jitter;
1905
1906   if (jitter < 0) {
1907     /* this is the time the buffer entered the sink */
1908     if (start < -jitter)
1909       entered = 0;
1910     else
1911       entered = start + jitter;
1912     left = start;
1913   } else {
1914     /* this is the time the buffer entered the sink */
1915     entered = start + jitter;
1916     /* this is the time the buffer left the sink */
1917     left = start + jitter;
1918   }
1919
1920   /* calculate duration of the buffer */
1921   if (stop != -1)
1922     duration = stop - start;
1923   else
1924     duration = -1;
1925
1926   /* if we have the time when the last buffer left us, calculate
1927    * processing time */
1928   if (priv->last_left != -1) {
1929     if (entered > priv->last_left) {
1930       pt = entered - priv->last_left;
1931     } else {
1932       pt = 0;
1933     }
1934   } else {
1935     pt = priv->avg_pt;
1936   }
1937
1938   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
1939       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
1940       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
1941       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
1942       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
1943       jitter);
1944
1945   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
1946       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
1947       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
1948       priv->avg_rate);
1949
1950   /* collect running averages. for first observations, we copy the
1951    * values */
1952   if (priv->avg_duration == -1)
1953     priv->avg_duration = duration;
1954   else
1955     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
1956
1957   if (priv->avg_pt == -1)
1958     priv->avg_pt = pt;
1959   else
1960     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
1961
1962   if (priv->avg_duration != 0)
1963     rate =
1964         gst_guint64_to_gdouble (priv->avg_pt) /
1965         gst_guint64_to_gdouble (priv->avg_duration);
1966   else
1967     rate = 0.0;
1968
1969   if (priv->last_left != -1) {
1970     if (dropped || priv->avg_rate < 0.0) {
1971       priv->avg_rate = rate;
1972     } else {
1973       if (rate > 1.0)
1974         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
1975       else
1976         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
1977     }
1978   }
1979
1980   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
1981       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
1982       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
1983       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
1984
1985
1986   if (priv->avg_rate >= 0.0) {
1987     /* if we have a valid rate, start sending QoS messages */
1988     if (priv->current_jitter < 0) {
1989       /* make sure we never go below 0 when adding the jitter to the
1990        * timestamp. */
1991       if (priv->current_rstart < -priv->current_jitter)
1992         priv->current_jitter = -priv->current_rstart;
1993     }
1994     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
1995         priv->current_jitter);
1996   }
1997
1998   /* record when this buffer will leave us */
1999   priv->last_left = left;
2000 }
2001
2002 /* reset all qos measuring */
2003 static void
2004 gst_base_sink_reset_qos (GstBaseSink * sink)
2005 {
2006   GstBaseSinkPrivate *priv;
2007
2008   priv = sink->priv;
2009
2010   priv->last_in_time = -1;
2011   priv->last_left = -1;
2012   priv->avg_duration = -1;
2013   priv->avg_pt = -1;
2014   priv->avg_rate = -1.0;
2015   priv->avg_render = -1;
2016   priv->rendered = 0;
2017   priv->dropped = 0;
2018
2019 }
2020
2021 /* Checks if the object was scheduled too late.
2022  *
2023  * start/stop contain the raw timestamp start and stop values
2024  * of the object.
2025  *
2026  * status and jitter contain the return values from the clock wait.
2027  *
2028  * returns TRUE if the buffer was too late.
2029  */
2030 static gboolean
2031 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2032     GstClockTime start, GstClockTime stop,
2033     GstClockReturn status, GstClockTimeDiff jitter)
2034 {
2035   gboolean late;
2036   gint64 max_lateness;
2037   GstBaseSinkPrivate *priv;
2038
2039   priv = basesink->priv;
2040
2041   late = FALSE;
2042
2043   /* only for objects that were too late */
2044   if (G_LIKELY (status != GST_CLOCK_EARLY))
2045     goto in_time;
2046
2047   max_lateness = basesink->abidata.ABI.max_lateness;
2048
2049   /* check if frame dropping is enabled */
2050   if (max_lateness == -1)
2051     goto no_drop;
2052
2053   /* only check for buffers */
2054   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2055     goto not_buffer;
2056
2057   /* can't do check if we don't have a timestamp */
2058   if (G_UNLIKELY (start == -1))
2059     goto no_timestamp;
2060
2061   /* we can add a valid stop time */
2062   if (stop != -1)
2063     max_lateness += stop;
2064   else
2065     max_lateness += start;
2066
2067   /* if the jitter bigger than duration and lateness we are too late */
2068   if ((late = start + jitter > max_lateness)) {
2069     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2070         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2071         GST_TIME_ARGS (max_lateness));
2072     /* !!emergency!!, if we did not receive anything valid for more than a 
2073      * second, render it anyway so the user sees something */
2074     if (priv->last_in_time && start - priv->last_in_time > GST_SECOND) {
2075       late = FALSE;
2076       GST_DEBUG_OBJECT (basesink,
2077           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2078           GST_TIME_ARGS (priv->last_in_time));
2079     }
2080   }
2081
2082 done:
2083   if (!late) {
2084     priv->last_in_time = start;
2085   }
2086   return late;
2087
2088   /* all is fine */
2089 in_time:
2090   {
2091     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2092     goto done;
2093   }
2094 no_drop:
2095   {
2096     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2097     goto done;
2098   }
2099 not_buffer:
2100   {
2101     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2102     return FALSE;
2103   }
2104 no_timestamp:
2105   {
2106     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2107     return FALSE;
2108   }
2109 }
2110
2111 /* called before and after calling the render vmethod. It keeps track of how
2112  * much time was spent in the render method and is used to check if we are
2113  * flooded */
2114 static void
2115 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2116 {
2117   GstBaseSinkPrivate *priv;
2118
2119   priv = basesink->priv;
2120
2121   if (start) {
2122     priv->start = gst_util_get_timestamp ();
2123   } else {
2124     GstClockTime elapsed;
2125
2126     priv->stop = gst_util_get_timestamp ();
2127
2128     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2129
2130     if (priv->avg_render == -1)
2131       priv->avg_render = elapsed;
2132     else
2133       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2134
2135     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2136         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2137   }
2138 }
2139
2140 /* with STREAM_LOCK, PREROLL_LOCK,
2141  *
2142  * Synchronize the object on the clock and then render it.
2143  *
2144  * takes ownership of obj.
2145  */
2146 static GstFlowReturn
2147 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2148     GstMiniObject * obj)
2149 {
2150   GstFlowReturn ret = GST_FLOW_OK;
2151   GstBaseSinkClass *bclass;
2152   gboolean late = FALSE;
2153   GstBaseSinkPrivate *priv;
2154
2155   priv = basesink->priv;
2156
2157   /* synchronize this object, non syncable objects return OK
2158    * immediatly. */
2159   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2160   if (G_UNLIKELY (ret != GST_FLOW_OK))
2161     goto sync_failed;
2162
2163   /* and now render, event or buffer. */
2164   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2165     GstBuffer *buf;
2166
2167     /* drop late buffers unconditionally, let's hope it's unlikely */
2168     if (G_UNLIKELY (late))
2169       goto dropped;
2170
2171     buf = GST_BUFFER_CAST (obj);
2172
2173     gst_base_sink_set_last_buffer (basesink, buf);
2174
2175     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2176
2177     if (G_LIKELY (bclass->render)) {
2178       gint do_qos;
2179
2180       /* read once, to get same value before and after */
2181       do_qos = g_atomic_int_get (&priv->qos_enabled);
2182
2183       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2184
2185       /* record rendering time for QoS and stats */
2186       if (do_qos)
2187         gst_base_sink_do_render_stats (basesink, TRUE);
2188
2189       ret = bclass->render (basesink, buf);
2190
2191       priv->rendered++;
2192
2193       if (do_qos)
2194         gst_base_sink_do_render_stats (basesink, FALSE);
2195     }
2196   } else {
2197     GstEvent *event = GST_EVENT_CAST (obj);
2198     gboolean event_res = TRUE;
2199     GstEventType type;
2200
2201     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2202
2203     type = GST_EVENT_TYPE (event);
2204
2205     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2206         gst_event_type_get_name (type));
2207
2208     if (bclass->event)
2209       event_res = bclass->event (basesink, event);
2210
2211     /* when we get here we could be flushing again when the event handler calls
2212      * _wait_eos(). We have to ignore this object in that case. */
2213     if (G_UNLIKELY (basesink->flushing))
2214       goto flushing;
2215
2216     if (G_LIKELY (event_res)) {
2217       switch (type) {
2218         case GST_EVENT_EOS:
2219           /* the EOS event is completely handled so we mark
2220            * ourselves as being in the EOS state. eos is also 
2221            * protected by the object lock so we can read it when 
2222            * answering the POSITION query. */
2223           GST_OBJECT_LOCK (basesink);
2224           basesink->eos = TRUE;
2225           GST_OBJECT_UNLOCK (basesink);
2226           /* ok, now we can post the message */
2227           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2228           gst_element_post_message (GST_ELEMENT_CAST (basesink),
2229               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
2230           break;
2231         case GST_EVENT_NEWSEGMENT:
2232           /* configure the segment */
2233           gst_base_sink_configure_segment (basesink, pad, event,
2234               &basesink->segment);
2235           break;
2236         default:
2237           break;
2238       }
2239     }
2240   }
2241
2242 done:
2243   gst_base_sink_perform_qos (basesink, late);
2244
2245   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2246   gst_mini_object_unref (obj);
2247
2248   return ret;
2249
2250   /* ERRORS */
2251 sync_failed:
2252   {
2253     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2254     goto done;
2255   }
2256 dropped:
2257   {
2258     priv->dropped++;
2259     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2260     goto done;
2261   }
2262 flushing:
2263   {
2264     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2265     gst_mini_object_unref (obj);
2266     return GST_FLOW_WRONG_STATE;
2267   }
2268 }
2269
2270 /* with STREAM_LOCK, PREROLL_LOCK
2271  *
2272  * Perform preroll on the given object. For buffers this means 
2273  * calling the preroll subclass method. 
2274  * If that succeeds, the state will be commited.
2275  *
2276  * function does not take ownership of obj.
2277  */
2278 static GstFlowReturn
2279 gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad,
2280     GstMiniObject * obj)
2281 {
2282   GstFlowReturn ret;
2283
2284   GST_DEBUG_OBJECT (basesink, "do preroll %p", obj);
2285
2286   /* if it's a buffer, we need to call the preroll method */
2287   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2288     GstBaseSinkClass *bclass;
2289     GstBuffer *buf;
2290     GstClockTime timestamp;
2291
2292     buf = GST_BUFFER_CAST (obj);
2293     timestamp = GST_BUFFER_TIMESTAMP (buf);
2294
2295     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2296         GST_TIME_ARGS (timestamp));
2297
2298     gst_base_sink_set_last_buffer (basesink, buf);
2299
2300     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2301     if (bclass->preroll)
2302       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2303         goto preroll_failed;
2304   }
2305
2306   /* commit state */
2307   if (G_LIKELY (basesink->playing_async)) {
2308     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2309       goto stopping;
2310   }
2311
2312   return GST_FLOW_OK;
2313
2314   /* ERRORS */
2315 preroll_failed:
2316   {
2317     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2318     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2319     return ret;
2320   }
2321 stopping:
2322   {
2323     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2324     return GST_FLOW_WRONG_STATE;
2325   }
2326 }
2327
2328 /* with STREAM_LOCK, PREROLL_LOCK 
2329  *
2330  * Queue an object for rendering.
2331  * The first prerollable object queued will complete the preroll. If the
2332  * preroll queue if filled, we render all the objects in the queue.
2333  *
2334  * This function takes ownership of the object.
2335  */
2336 static GstFlowReturn
2337 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2338     GstMiniObject * obj, gboolean prerollable)
2339 {
2340   GstFlowReturn ret = GST_FLOW_OK;
2341   gint length;
2342   GQueue *q;
2343
2344   if (G_UNLIKELY (basesink->need_preroll)) {
2345     if (G_LIKELY (prerollable))
2346       basesink->preroll_queued++;
2347
2348     length = basesink->preroll_queued;
2349
2350     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2351
2352     /* first prerollable item needs to finish the preroll */
2353     if (length == 1) {
2354       ret = gst_base_sink_preroll_object (basesink, pad, obj);
2355       if (G_UNLIKELY (ret != GST_FLOW_OK))
2356         goto preroll_failed;
2357     }
2358     /* need to recheck if we need preroll, commmit state during preroll 
2359      * could have made us not need more preroll. */
2360     if (G_UNLIKELY (basesink->need_preroll)) {
2361       /* see if we can render now, if we can't add the object to the preroll
2362        * queue. */
2363       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2364         goto more_preroll;
2365     }
2366   }
2367
2368   /* we can start rendering (or blocking) the queued object
2369    * if any. */
2370   q = basesink->preroll_queue;
2371   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2372     GstMiniObject *o;
2373
2374     o = g_queue_pop_head (q);
2375     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2376
2377     /* do something with the return value */
2378     ret = gst_base_sink_render_object (basesink, pad, o);
2379     if (ret != GST_FLOW_OK)
2380       goto dequeue_failed;
2381   }
2382
2383   /* now render the object */
2384   ret = gst_base_sink_render_object (basesink, pad, obj);
2385   basesink->preroll_queued = 0;
2386
2387   return ret;
2388
2389   /* special cases */
2390 preroll_failed:
2391   {
2392     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2393         gst_flow_get_name (ret));
2394     gst_mini_object_unref (obj);
2395     return ret;
2396   }
2397 more_preroll:
2398   {
2399     /* add object to the queue and return */
2400     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2401         length, basesink->preroll_queue_max_len);
2402     g_queue_push_tail (basesink->preroll_queue, obj);
2403     return GST_FLOW_OK;
2404   }
2405 dequeue_failed:
2406   {
2407     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2408         gst_flow_get_name (ret));
2409     gst_mini_object_unref (obj);
2410     return ret;
2411   }
2412 }
2413
2414 /* with STREAM_LOCK
2415  *
2416  * This function grabs the PREROLL_LOCK and adds the object to
2417  * the queue.
2418  *
2419  * This function takes ownership of obj.
2420  */
2421 static GstFlowReturn
2422 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2423     GstMiniObject * obj, gboolean prerollable)
2424 {
2425   GstFlowReturn ret;
2426
2427   GST_PAD_PREROLL_LOCK (pad);
2428   if (G_UNLIKELY (basesink->flushing))
2429     goto flushing;
2430
2431   if (G_UNLIKELY (basesink->priv->received_eos))
2432     goto was_eos;
2433
2434   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2435   GST_PAD_PREROLL_UNLOCK (pad);
2436
2437   return ret;
2438
2439   /* ERRORS */
2440 flushing:
2441   {
2442     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2443     GST_PAD_PREROLL_UNLOCK (pad);
2444     gst_mini_object_unref (obj);
2445     return GST_FLOW_WRONG_STATE;
2446   }
2447 was_eos:
2448   {
2449     GST_DEBUG_OBJECT (basesink,
2450         "we are EOS, dropping object, return UNEXPECTED");
2451     GST_PAD_PREROLL_UNLOCK (pad);
2452     gst_mini_object_unref (obj);
2453     return GST_FLOW_UNEXPECTED;
2454   }
2455 }
2456
2457 static gboolean
2458 gst_base_sink_event (GstPad * pad, GstEvent * event)
2459 {
2460   GstBaseSink *basesink;
2461   gboolean result = TRUE;
2462   GstBaseSinkClass *bclass;
2463
2464   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2465
2466   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2467
2468   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2469       GST_EVENT_TYPE_NAME (event));
2470
2471   switch (GST_EVENT_TYPE (event)) {
2472     case GST_EVENT_EOS:
2473     {
2474       GstFlowReturn ret;
2475
2476       GST_PAD_PREROLL_LOCK (pad);
2477       if (G_UNLIKELY (basesink->flushing))
2478         goto flushing;
2479
2480       if (G_UNLIKELY (basesink->priv->received_eos)) {
2481         /* we can't accept anything when we are EOS */
2482         result = FALSE;
2483         gst_event_unref (event);
2484       } else {
2485         /* we set the received EOS flag here so that we can use it when testing if
2486          * we are prerolled and to refure more buffers. */
2487         basesink->priv->received_eos = TRUE;
2488
2489         /* EOS is a prerollable object, we call the unlocked version because it
2490          * does not check the received_eos flag. */
2491         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2492             GST_MINI_OBJECT_CAST (event), TRUE);
2493         if (G_UNLIKELY (ret != GST_FLOW_OK))
2494           result = FALSE;
2495       }
2496       GST_PAD_PREROLL_UNLOCK (pad);
2497       break;
2498     }
2499     case GST_EVENT_NEWSEGMENT:
2500     {
2501       GstFlowReturn ret;
2502
2503       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2504
2505       GST_PAD_PREROLL_LOCK (pad);
2506       if (G_UNLIKELY (basesink->flushing))
2507         goto flushing;
2508
2509       if (G_UNLIKELY (basesink->priv->received_eos)) {
2510         /* we can't accept anything when we are EOS */
2511         result = FALSE;
2512         gst_event_unref (event);
2513       } else {
2514         /* the new segment is a non prerollable item and does not block anything,
2515          * we need to configure the current clipping segment and insert the event 
2516          * in the queue to serialize it with the buffers for rendering. */
2517         gst_base_sink_configure_segment (basesink, pad, event,
2518             basesink->abidata.ABI.clip_segment);
2519
2520         ret =
2521             gst_base_sink_queue_object_unlocked (basesink, pad,
2522             GST_MINI_OBJECT_CAST (event), FALSE);
2523         if (G_UNLIKELY (ret != GST_FLOW_OK))
2524           result = FALSE;
2525         else {
2526           GST_OBJECT_LOCK (basesink);
2527           basesink->have_newsegment = TRUE;
2528           GST_OBJECT_UNLOCK (basesink);
2529         }
2530       }
2531       GST_PAD_PREROLL_UNLOCK (pad);
2532       break;
2533     }
2534     case GST_EVENT_FLUSH_START:
2535       if (bclass->event)
2536         bclass->event (basesink, event);
2537
2538       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2539
2540       /* make sure we are not blocked on the clock also clear any pending
2541        * eos state. */
2542       gst_base_sink_set_flushing (basesink, pad, TRUE);
2543
2544       /* we grab the stream lock but that is not needed since setting the
2545        * sink to flushing would make sure no state commit is being done
2546        * anymore */
2547       GST_PAD_STREAM_LOCK (pad);
2548       gst_base_sink_reset_qos (basesink);
2549       if (basesink->priv->async_enabled) {
2550         /* and we need to commit our state again on the next
2551          * prerolled buffer */
2552         basesink->playing_async = TRUE;
2553         gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2554       } else {
2555         basesink->priv->have_latency = TRUE;
2556         basesink->need_preroll = FALSE;
2557       }
2558       gst_base_sink_set_last_buffer (basesink, NULL);
2559       GST_PAD_STREAM_UNLOCK (pad);
2560
2561       gst_event_unref (event);
2562       break;
2563     case GST_EVENT_FLUSH_STOP:
2564       if (bclass->event)
2565         bclass->event (basesink, event);
2566
2567       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
2568
2569       /* unset flushing so we can accept new data, this also flushes out any EOS
2570        * event. */
2571       gst_base_sink_set_flushing (basesink, pad, FALSE);
2572
2573       /* for position reporting */
2574       GST_OBJECT_LOCK (basesink);
2575       basesink->priv->current_sstart = -1;
2576       basesink->priv->current_sstop = -1;
2577       basesink->priv->eos_rtime = -1;
2578       basesink->have_newsegment = FALSE;
2579       GST_OBJECT_UNLOCK (basesink);
2580
2581       /* we need new segment info after the flush. */
2582       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2583       gst_segment_init (basesink->abidata.ABI.clip_segment,
2584           GST_FORMAT_UNDEFINED);
2585
2586       gst_event_unref (event);
2587       break;
2588     default:
2589       /* other events are sent to queue or subclass depending on if they
2590        * are serialized. */
2591       if (GST_EVENT_IS_SERIALIZED (event)) {
2592         gst_base_sink_queue_object (basesink, pad,
2593             GST_MINI_OBJECT_CAST (event), FALSE);
2594       } else {
2595         if (bclass->event)
2596           bclass->event (basesink, event);
2597         gst_event_unref (event);
2598       }
2599       break;
2600   }
2601 done:
2602   gst_object_unref (basesink);
2603
2604   return result;
2605
2606   /* ERRORS */
2607 flushing:
2608   {
2609     GST_DEBUG_OBJECT (basesink, "we are flushing");
2610     GST_PAD_PREROLL_UNLOCK (pad);
2611     result = FALSE;
2612     gst_event_unref (event);
2613     goto done;
2614   }
2615 }
2616
2617 /* default implementation to calculate the start and end
2618  * timestamps on a buffer, subclasses can override
2619  */
2620 static void
2621 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
2622     GstClockTime * start, GstClockTime * end)
2623 {
2624   GstClockTime timestamp, duration;
2625
2626   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2627   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2628
2629     /* get duration to calculate end time */
2630     duration = GST_BUFFER_DURATION (buffer);
2631     if (GST_CLOCK_TIME_IS_VALID (duration)) {
2632       *end = timestamp + duration;
2633     }
2634     *start = timestamp;
2635   }
2636 }
2637
2638 /* must be called with PREROLL_LOCK */
2639 static gboolean
2640 gst_base_sink_needs_preroll (GstBaseSink * basesink)
2641 {
2642   gboolean is_prerolled, res;
2643
2644   /* we have 2 cases where the PREROLL_LOCK is released:
2645    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
2646    *  2) we are syncing on the clock
2647    */
2648   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
2649   res = !is_prerolled && basesink->pad_mode != GST_ACTIVATE_PULL;
2650   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
2651       basesink->have_preroll, basesink->priv->received_eos, res);
2652
2653   return res;
2654 }
2655
2656 /* with STREAM_LOCK, PREROLL_LOCK 
2657  *
2658  * Takes a buffer and compare the timestamps with the last segment.
2659  * If the buffer falls outside of the segment boundaries, drop it.
2660  * Else queue the buffer for preroll and rendering.
2661  *
2662  * This function takes ownership of the buffer.
2663  */
2664 static GstFlowReturn
2665 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
2666     GstBuffer * buf)
2667 {
2668   GstBaseSinkClass *bclass;
2669   GstFlowReturn result;
2670   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
2671   GstSegment *clip_segment;
2672
2673   if (G_UNLIKELY (basesink->flushing))
2674     goto flushing;
2675
2676   if (G_UNLIKELY (basesink->priv->received_eos))
2677     goto was_eos;
2678
2679   /* for code clarity */
2680   clip_segment = basesink->abidata.ABI.clip_segment;
2681
2682   if (G_UNLIKELY (!basesink->have_newsegment)) {
2683     gboolean sync;
2684
2685     sync = gst_base_sink_get_sync (basesink);
2686     if (sync) {
2687       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
2688           (_("Internal data flow problem.")),
2689           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
2690     }
2691
2692     /* this means this sink will assume timestamps start from 0 */
2693     GST_OBJECT_LOCK (basesink);
2694     clip_segment->start = 0;
2695     clip_segment->stop = -1;
2696     basesink->segment.start = 0;
2697     basesink->segment.stop = -1;
2698     basesink->have_newsegment = TRUE;
2699     GST_OBJECT_UNLOCK (basesink);
2700   }
2701
2702   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2703
2704   /* check if the buffer needs to be dropped, we first ask the subclass for the
2705    * start and end */
2706   if (bclass->get_times)
2707     bclass->get_times (basesink, buf, &start, &end);
2708
2709   if (start == -1) {
2710     /* if the subclass does not want sync, we use our own values so that we at
2711      * least clip the buffer to the segment */
2712     gst_base_sink_get_times (basesink, buf, &start, &end);
2713   }
2714
2715   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2716       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
2717
2718   /* a dropped buffer does not participate in anything */
2719   if (GST_CLOCK_TIME_IS_VALID (start) &&
2720       (clip_segment->format == GST_FORMAT_TIME)) {
2721     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
2722                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
2723       goto out_of_segment;
2724   }
2725
2726   /* now we can process the buffer in the queue, this function takes ownership
2727    * of the buffer */
2728   result = gst_base_sink_queue_object_unlocked (basesink, pad,
2729       GST_MINI_OBJECT_CAST (buf), TRUE);
2730
2731   return result;
2732
2733   /* ERRORS */
2734 flushing:
2735   {
2736     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2737     gst_buffer_unref (buf);
2738     return GST_FLOW_WRONG_STATE;
2739   }
2740 was_eos:
2741   {
2742     GST_DEBUG_OBJECT (basesink,
2743         "we are EOS, dropping object, return UNEXPECTED");
2744     gst_buffer_unref (buf);
2745     return GST_FLOW_UNEXPECTED;
2746   }
2747 out_of_segment:
2748   {
2749     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
2750     gst_buffer_unref (buf);
2751     return GST_FLOW_OK;
2752   }
2753 }
2754
2755 /* with STREAM_LOCK
2756  */
2757 static GstFlowReturn
2758 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
2759 {
2760   GstBaseSink *basesink;
2761   GstFlowReturn result;
2762
2763   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2764
2765   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
2766     goto wrong_mode;
2767
2768   GST_PAD_PREROLL_LOCK (pad);
2769   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2770   GST_PAD_PREROLL_UNLOCK (pad);
2771
2772 done:
2773   return result;
2774
2775   /* ERRORS */
2776 wrong_mode:
2777   {
2778     GST_OBJECT_LOCK (pad);
2779     GST_WARNING_OBJECT (basesink,
2780         "Push on pad %s:%s, but it was not activated in push mode",
2781         GST_DEBUG_PAD_NAME (pad));
2782     GST_OBJECT_UNLOCK (pad);
2783     gst_buffer_unref (buf);
2784     /* we don't post an error message this will signal to the peer
2785      * pushing that EOS is reached. */
2786     result = GST_FLOW_UNEXPECTED;
2787     goto done;
2788   }
2789 }
2790
2791 /* with STREAM_LOCK
2792  */
2793 static void
2794 gst_base_sink_loop (GstPad * pad)
2795 {
2796   GstBaseSink *basesink;
2797   GstBuffer *buf = NULL;
2798   GstFlowReturn result;
2799
2800   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2801
2802   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
2803
2804   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
2805       basesink->offset, (guint) DEFAULT_SIZE);
2806
2807   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
2808   if (G_UNLIKELY (result != GST_FLOW_OK))
2809     goto paused;
2810
2811   if (G_UNLIKELY (buf == NULL))
2812     goto no_buffer;
2813
2814   basesink->offset += GST_BUFFER_SIZE (buf);
2815
2816   GST_PAD_PREROLL_LOCK (pad);
2817   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2818   GST_PAD_PREROLL_UNLOCK (pad);
2819   if (G_UNLIKELY (result != GST_FLOW_OK))
2820     goto paused;
2821
2822   return;
2823
2824   /* ERRORS */
2825 paused:
2826   {
2827     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
2828         gst_flow_get_name (result));
2829     gst_pad_pause_task (pad);
2830     /* fatal errors and NOT_LINKED cause EOS */
2831     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
2832       /* FIXME, we shouldn't post EOS when we are operating in segment mode */
2833       gst_base_sink_event (pad, gst_event_new_eos ());
2834       /* EOS does not cause an ERROR message */
2835       if (result != GST_FLOW_UNEXPECTED) {
2836         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
2837             (_("Internal data stream error.")),
2838             ("stream stopped, reason %s", gst_flow_get_name (result)));
2839       }
2840     }
2841     return;
2842   }
2843 no_buffer:
2844   {
2845     GST_LOG_OBJECT (basesink, "no buffer, pausing");
2846     result = GST_FLOW_ERROR;
2847     goto paused;
2848   }
2849 }
2850
2851 static gboolean
2852 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
2853     gboolean flushing)
2854 {
2855   GstBaseSinkClass *bclass;
2856
2857   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2858
2859   if (flushing) {
2860     /* unlock any subclasses, we need to do this before grabbing the
2861      * PREROLL_LOCK since we hold this lock before going into ::render. */
2862     if (bclass->unlock)
2863       bclass->unlock (basesink);
2864   }
2865
2866   GST_PAD_PREROLL_LOCK (pad);
2867   basesink->flushing = flushing;
2868   if (flushing) {
2869     /* step 1, now that we have the PREROLL lock, clear our unlock request */
2870     if (bclass->unlock_stop)
2871       bclass->unlock_stop (basesink);
2872
2873     /* set need_preroll before we unblock the clock. If the clock is unblocked
2874      * before timing out, we can reuse the buffer for preroll. */
2875     basesink->need_preroll = TRUE;
2876
2877     /* step 2, unblock clock sync (if any) or any other blocking thing */
2878     if (basesink->clock_id) {
2879       gst_clock_id_unschedule (basesink->clock_id);
2880     }
2881
2882     /* flush out the data thread if it's locked in finish_preroll, this will
2883      * also flush out the EOS state */
2884     GST_DEBUG_OBJECT (basesink,
2885         "flushing out data thread, need preroll to TRUE");
2886     gst_base_sink_preroll_queue_flush (basesink, pad);
2887   }
2888   GST_PAD_PREROLL_UNLOCK (pad);
2889
2890   return TRUE;
2891 }
2892
2893 static gboolean
2894 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
2895 {
2896   gboolean result;
2897
2898   if (active) {
2899     /* start task */
2900     result = gst_pad_start_task (basesink->sinkpad,
2901         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
2902   } else {
2903     /* step 2, make sure streaming finishes */
2904     result = gst_pad_stop_task (basesink->sinkpad);
2905   }
2906
2907   return result;
2908 }
2909
2910 static gboolean
2911 gst_base_sink_pad_activate (GstPad * pad)
2912 {
2913   gboolean result = FALSE;
2914   GstBaseSink *basesink;
2915
2916   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2917
2918   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
2919
2920   gst_base_sink_set_flushing (basesink, pad, FALSE);
2921
2922   if (basesink->can_activate_pull && gst_pad_check_pull_range (pad)
2923       && gst_pad_activate_pull (pad, TRUE)) {
2924     GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
2925     result = TRUE;
2926   } else {
2927     GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
2928     if (gst_pad_activate_push (pad, TRUE)) {
2929       GST_DEBUG_OBJECT (basesink, "Success activating push mode");
2930       result = TRUE;
2931     }
2932   }
2933
2934   if (!result) {
2935     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
2936     gst_base_sink_set_flushing (basesink, pad, TRUE);
2937   }
2938
2939   gst_object_unref (basesink);
2940
2941   return result;
2942 }
2943
2944 static gboolean
2945 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
2946 {
2947   gboolean result;
2948   GstBaseSink *basesink;
2949
2950   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2951
2952   if (active) {
2953     if (!basesink->can_activate_push) {
2954       result = FALSE;
2955       basesink->pad_mode = GST_ACTIVATE_NONE;
2956     } else {
2957       result = TRUE;
2958       basesink->pad_mode = GST_ACTIVATE_PUSH;
2959     }
2960   } else {
2961     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
2962       g_warning ("Internal GStreamer activation error!!!");
2963       result = FALSE;
2964     } else {
2965       gst_base_sink_set_flushing (basesink, pad, TRUE);
2966       result = TRUE;
2967       basesink->pad_mode = GST_ACTIVATE_NONE;
2968     }
2969   }
2970
2971   gst_object_unref (basesink);
2972
2973   return result;
2974 }
2975
2976 static gboolean
2977 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
2978 {
2979   GstCaps *caps;
2980   gboolean result;
2981
2982   result = FALSE;
2983
2984   /* this returns the intersection between our caps and the peer caps. If there
2985    * is no peer, it returns NULL and we can't operate in pull mode so we can
2986    * fail the negotiation. */
2987   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
2988   if (caps == NULL || gst_caps_is_empty (caps))
2989     goto no_caps_possible;
2990
2991   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
2992
2993   caps = gst_caps_make_writable (caps);
2994   /* get the first (prefered) format */
2995   gst_caps_truncate (caps);
2996   /* try to fixate */
2997   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
2998
2999   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3000
3001   if (gst_caps_is_any (caps)) {
3002     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3003         "allowing pull()");
3004     /* neither side has template caps in this case, so they are prepared for
3005        pull() without setcaps() */
3006     result = TRUE;
3007   } else if (gst_caps_is_fixed (caps)) {
3008     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3009       goto could_not_set_caps;
3010     result = TRUE;
3011   }
3012
3013   gst_caps_unref (caps);
3014
3015   return result;
3016
3017 no_caps_possible:
3018   {
3019     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3020     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3021     if (caps)
3022       gst_caps_unref (caps);
3023     return FALSE;
3024   }
3025 could_not_set_caps:
3026   {
3027     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3028     gst_caps_unref (caps);
3029     return FALSE;
3030   }
3031 }
3032
3033 /* this won't get called until we implement an activate function */
3034 static gboolean
3035 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3036 {
3037   gboolean result = FALSE;
3038   GstBaseSink *basesink;
3039   GstBaseSinkClass *bclass;
3040
3041   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3042   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3043
3044   if (active) {
3045     if (!basesink->can_activate_pull) {
3046       result = FALSE;
3047       basesink->pad_mode = GST_ACTIVATE_NONE;
3048     } else {
3049       GstPad *peer = gst_pad_get_peer (pad);
3050
3051       if (G_UNLIKELY (peer == NULL)) {
3052         g_warning ("Trying to activate pad in pull mode, but no peer");
3053         result = FALSE;
3054         basesink->pad_mode = GST_ACTIVATE_NONE;
3055       } else {
3056         if (gst_pad_activate_pull (peer, TRUE)) {
3057           /* we mark we have a newsegment here because pull based
3058            * mode works just fine without having a newsegment before the
3059            * first buffer */
3060           gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3061           gst_segment_init (basesink->abidata.ABI.clip_segment,
3062               GST_FORMAT_UNDEFINED);
3063           GST_OBJECT_LOCK (basesink);
3064           basesink->have_newsegment = TRUE;
3065           GST_OBJECT_UNLOCK (basesink);
3066
3067           /* set the pad mode before starting the task so that it's in the
3068              correct state for the new thread. also the sink set_caps function
3069              checks this */
3070           basesink->pad_mode = GST_ACTIVATE_PULL;
3071           if ((result = gst_base_sink_negotiate_pull (basesink))) {
3072             if (bclass->activate_pull)
3073               result = bclass->activate_pull (basesink, TRUE);
3074             else
3075               result = FALSE;
3076           }
3077           /* but if starting the thread fails, set it back */
3078           if (!result)
3079             basesink->pad_mode = GST_ACTIVATE_NONE;
3080         } else {
3081           GST_DEBUG_OBJECT (pad, "Failed to activate peer in pull mode");
3082           result = FALSE;
3083           basesink->pad_mode = GST_ACTIVATE_NONE;
3084         }
3085         gst_object_unref (peer);
3086       }
3087     }
3088   } else {
3089     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3090       g_warning ("Internal GStreamer activation error!!!");
3091       result = FALSE;
3092     } else {
3093       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3094       if (bclass->activate_pull)
3095         result &= bclass->activate_pull (basesink, FALSE);
3096       basesink->pad_mode = GST_ACTIVATE_NONE;
3097     }
3098   }
3099
3100   gst_object_unref (basesink);
3101
3102   return result;
3103 }
3104
3105 /* send an event to our sinkpad peer. */
3106 static gboolean
3107 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3108 {
3109   GstPad *pad;
3110   GstBaseSink *basesink = GST_BASE_SINK (element);
3111   gboolean forward, result = TRUE;
3112
3113   /* only push UPSTREAM events upstream */
3114   forward = GST_EVENT_IS_UPSTREAM (event);
3115
3116   switch (GST_EVENT_TYPE (event)) {
3117     case GST_EVENT_LATENCY:
3118     {
3119       GstClockTime latency;
3120
3121       gst_event_parse_latency (event, &latency);
3122
3123       /* store the latency. We use this to adjust the running_time before syncing
3124        * it to the clock. */
3125       GST_OBJECT_LOCK (element);
3126       basesink->priv->latency = latency;
3127       GST_OBJECT_UNLOCK (element);
3128       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3129           GST_TIME_ARGS (latency));
3130
3131       /* don't forward, yet. FIXME. The latency event should likely be forwarded
3132        * to upstream element so that they can configure themselves. Each element
3133        * would subtract the amount of LATENCY it can maximally compensate for. 
3134        * It's currently not very useful; even if this sink cannot compensate for
3135        * all the latency, upstream will block while this sink waits which will
3136        * trigger implicit buffering and latency there. */
3137       forward = FALSE;
3138       break;
3139     }
3140     default:
3141       break;
3142   }
3143
3144   if (forward) {
3145     GST_OBJECT_LOCK (element);
3146     pad = gst_object_ref (basesink->sinkpad);
3147     GST_OBJECT_UNLOCK (element);
3148
3149     result = gst_pad_push_event (pad, event);
3150
3151     gst_object_unref (pad);
3152   } else {
3153     /* not forwarded, unref the event */
3154     gst_event_unref (event);
3155   }
3156   return result;
3157 }
3158
3159 static gboolean
3160 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3161 {
3162   GstPad *peer;
3163   gboolean res = FALSE;
3164
3165   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3166     res = gst_pad_query (peer, query);
3167     gst_object_unref (peer);
3168   }
3169   return res;
3170 }
3171
3172 /* get the end position of the last seen object, this is used
3173  * for EOS and for making sure that we don't report a position we
3174  * have not reached yet. */
3175 static gboolean
3176 gst_base_sink_get_position_last (GstBaseSink * basesink, gint64 * cur)
3177 {
3178   /* return last observed stream time */
3179   *cur = basesink->priv->current_sstop;
3180
3181   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
3182       GST_TIME_ARGS (*cur));
3183   return TRUE;
3184 }
3185
3186 /* get the position when we are PAUSED, this is the stream time of the buffer
3187  * that prerolled. If no buffer is prerolled (we are still flushing), this
3188  * value will be -1. */
3189 static gboolean
3190 gst_base_sink_get_position_paused (GstBaseSink * basesink, gint64 * cur)
3191 {
3192   gboolean res;
3193   gint64 time;
3194   GstSegment *segment;
3195
3196   *cur = basesink->priv->current_sstart;
3197   segment = basesink->abidata.ABI.clip_segment;
3198
3199   time = segment->time;
3200
3201   if (*cur != -1) {
3202     *cur = MAX (*cur, time);
3203     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
3204         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
3205   } else {
3206     /* we have no buffer, use the segment times. */
3207     if (segment->rate >= 0.0) {
3208       /* forward, next position is always the time of the segment */
3209       *cur = time;
3210       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
3211           GST_TIME_ARGS (*cur));
3212     } else {
3213       /* reverse, next expected timestamp is segment->stop. We use the function
3214        * to get things right for negative applied_rates. */
3215       *cur =
3216           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, segment->stop);
3217       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
3218           GST_TIME_ARGS (*cur));
3219     }
3220   }
3221   res = (*cur != -1);
3222
3223   return res;
3224 }
3225
3226 static gboolean
3227 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
3228     gint64 * cur, gboolean * upstream)
3229 {
3230   GstClock *clock;
3231   gboolean res = FALSE;
3232
3233   switch (format) {
3234       /* we can answer time format */
3235     case GST_FORMAT_TIME:
3236     {
3237       GstClockTime now, base, latency;
3238       gint64 time, accum, duration;
3239       gdouble rate;
3240       gint64 last;
3241
3242       GST_OBJECT_LOCK (basesink);
3243
3244       /* can only give answer based on the clock if not EOS */
3245       if (G_UNLIKELY (basesink->eos))
3246         goto in_eos;
3247
3248       /* we can only get the segment when we are not NULL or READY */
3249       if (!basesink->have_newsegment)
3250         goto wrong_state;
3251
3252       /* when not in PLAYING or when we're busy with a state change, we
3253        * cannot read from the clock so we report time based on the
3254        * last seen timestamp. */
3255       if (GST_STATE (basesink) != GST_STATE_PLAYING ||
3256           GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
3257         goto in_pause;
3258
3259       /* we need to sync on the clock. */
3260       if (basesink->sync == FALSE)
3261         goto no_sync;
3262
3263       /* and we need a clock */
3264       if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
3265         goto no_sync;
3266
3267       /* collect all data we need holding the lock */
3268       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
3269         time = basesink->segment.time;
3270       else
3271         time = 0;
3272
3273       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
3274         duration = basesink->segment.stop - basesink->segment.start;
3275       else
3276         duration = 0;
3277
3278       base = GST_ELEMENT_CAST (basesink)->base_time;
3279       accum = basesink->segment.accum;
3280       rate = basesink->segment.rate * basesink->segment.applied_rate;
3281       gst_base_sink_get_position_last (basesink, &last);
3282       latency = basesink->priv->latency;
3283
3284       gst_object_ref (clock);
3285       /* need to release the object lock before we can get the time, 
3286        * a clock might take the LOCK of the provider, which could be
3287        * a basesink subclass. */
3288       GST_OBJECT_UNLOCK (basesink);
3289
3290       now = gst_clock_get_time (clock);
3291
3292       /* subtract base time and accumulated time from the clock time. 
3293        * Make sure we don't go negative. This is the current time in
3294        * the segment which we need to scale with the combined 
3295        * rate and applied rate. */
3296       base += accum;
3297       base += latency;
3298       base = MIN (now, base);
3299
3300       /* for negative rates we need to count back from from the segment
3301        * duration. */
3302       if (rate < 0.0)
3303         time += duration;
3304
3305       *cur = time + gst_guint64_to_gdouble (now - base) * rate;
3306
3307       /* never report more than last seen position */
3308       if (last != -1)
3309         *cur = MIN (last, *cur);
3310
3311       gst_object_unref (clock);
3312
3313       res = TRUE;
3314
3315       GST_DEBUG_OBJECT (basesink,
3316           "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
3317           GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
3318           GST_TIME_ARGS (now), GST_TIME_ARGS (base),
3319           GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
3320       break;
3321     }
3322     default:
3323       /* cannot answer other than TIME, ask to send the query upstream. */
3324       *upstream = TRUE;
3325       break;
3326   }
3327
3328 done:
3329   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
3330       res, GST_TIME_ARGS (*cur));
3331   return res;
3332
3333   /* special cases */
3334 in_eos:
3335   {
3336     GST_DEBUG_OBJECT (basesink, "position in EOS");
3337     res = gst_base_sink_get_position_last (basesink, cur);
3338     GST_OBJECT_UNLOCK (basesink);
3339     goto done;
3340   }
3341 in_pause:
3342   {
3343     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
3344     res = gst_base_sink_get_position_paused (basesink, cur);
3345     GST_OBJECT_UNLOCK (basesink);
3346     goto done;
3347   }
3348 wrong_state:
3349   {
3350     /* in NULL or READY we always return FALSE and -1 */
3351     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
3352     res = FALSE;
3353     *cur = -1;
3354     GST_OBJECT_UNLOCK (basesink);
3355     goto done;
3356   }
3357 no_sync:
3358   {
3359     /* report last seen timestamp if any, else ask upstream to answer */
3360     if ((*cur = basesink->priv->current_sstart) != -1)
3361       res = TRUE;
3362     else
3363       *upstream = TRUE;
3364
3365     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
3366         res, GST_TIME_ARGS (*cur));
3367     GST_OBJECT_UNLOCK (basesink);
3368     return res;
3369   }
3370 }
3371
3372 static gboolean
3373 gst_base_sink_query (GstElement * element, GstQuery * query)
3374 {
3375   gboolean res = FALSE;
3376
3377   GstBaseSink *basesink = GST_BASE_SINK (element);
3378
3379   switch (GST_QUERY_TYPE (query)) {
3380     case GST_QUERY_POSITION:
3381     {
3382       gint64 cur = 0;
3383       GstFormat format;
3384       gboolean upstream = FALSE;
3385
3386       gst_query_parse_position (query, &format, NULL);
3387
3388       GST_DEBUG_OBJECT (basesink, "position format %d", format);
3389
3390       /* first try to get the position based on the clock */
3391       if ((res =
3392               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
3393         gst_query_set_position (query, format, cur);
3394       } else if (upstream) {
3395         /* fallback to peer query */
3396         res = gst_base_sink_peer_query (basesink, query);
3397       }
3398       break;
3399     }
3400     case GST_QUERY_DURATION:
3401       GST_DEBUG_OBJECT (basesink, "duration query");
3402       res = gst_base_sink_peer_query (basesink, query);
3403       break;
3404     case GST_QUERY_LATENCY:
3405     {
3406       gboolean live, us_live;
3407       GstClockTime min, max;
3408
3409       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
3410                   &max))) {
3411         gst_query_set_latency (query, live, min, max);
3412       }
3413       break;
3414     }
3415     case GST_QUERY_JITTER:
3416       break;
3417     case GST_QUERY_RATE:
3418       /* gst_query_set_rate (query, basesink->segment_rate); */
3419       res = TRUE;
3420       break;
3421     case GST_QUERY_SEGMENT:
3422     {
3423       /* FIXME, bring start/stop to stream time */
3424       gst_query_set_segment (query, basesink->segment.rate,
3425           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
3426       break;
3427     }
3428     case GST_QUERY_SEEKING:
3429     case GST_QUERY_CONVERT:
3430     case GST_QUERY_FORMATS:
3431     default:
3432       res = gst_base_sink_peer_query (basesink, query);
3433       break;
3434   }
3435   return res;
3436 }
3437
3438 static GstStateChangeReturn
3439 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
3440 {
3441   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3442   GstBaseSink *basesink = GST_BASE_SINK (element);
3443   GstBaseSinkClass *bclass;
3444   GstBaseSinkPrivate *priv;
3445
3446   priv = basesink->priv;
3447
3448   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3449
3450   switch (transition) {
3451     case GST_STATE_CHANGE_NULL_TO_READY:
3452       if (bclass->start)
3453         if (!bclass->start (basesink))
3454           goto start_failed;
3455       break;
3456     case GST_STATE_CHANGE_READY_TO_PAUSED:
3457       /* need to complete preroll before this state change completes, there
3458        * is no data flow in READY so we can safely assume we need to preroll. */
3459       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3460       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
3461       basesink->have_newsegment = FALSE;
3462       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3463       gst_segment_init (basesink->abidata.ABI.clip_segment,
3464           GST_FORMAT_UNDEFINED);
3465       basesink->offset = 0;
3466       basesink->have_preroll = FALSE;
3467       basesink->need_preroll = TRUE;
3468       basesink->playing_async = TRUE;
3469       priv->current_sstart = -1;
3470       priv->current_sstop = -1;
3471       priv->eos_rtime = -1;
3472       priv->latency = 0;
3473       basesink->eos = FALSE;
3474       priv->received_eos = FALSE;
3475       gst_base_sink_reset_qos (basesink);
3476       priv->commited = FALSE;
3477       if (priv->async_enabled) {
3478         GST_DEBUG_OBJECT (basesink, "doing async state change");
3479         /* when async enabled, post async-start message and return ASYNC from
3480          * the state change function */
3481         ret = GST_STATE_CHANGE_ASYNC;
3482         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3483             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3484       } else {
3485         priv->have_latency = TRUE;
3486       }
3487       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3488       break;
3489     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3490       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3491       if (!gst_base_sink_needs_preroll (basesink)) {
3492         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
3493         /* no preroll needed anymore now. */
3494         basesink->playing_async = FALSE;
3495         basesink->need_preroll = FALSE;
3496         if (basesink->eos) {
3497           /* need to post EOS message here */
3498           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3499           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3500               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
3501         } else {
3502           GST_DEBUG_OBJECT (basesink, "signal preroll");
3503           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
3504         }
3505       } else {
3506         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
3507         basesink->need_preroll = TRUE;
3508         basesink->playing_async = TRUE;
3509         priv->commited = FALSE;
3510         if (priv->async_enabled) {
3511           GST_DEBUG_OBJECT (basesink, "doing async state change");
3512           ret = GST_STATE_CHANGE_ASYNC;
3513           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3514               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3515         }
3516       }
3517       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3518       break;
3519     default:
3520       break;
3521   }
3522
3523   {
3524     GstStateChangeReturn bret;
3525
3526     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3527     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
3528       goto activate_failed;
3529   }
3530
3531   switch (transition) {
3532     case GST_STATE_CHANGE_READY_TO_PAUSED:
3533       /* note that this is the upward case, which doesn't follow most
3534          patterns */
3535       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
3536         GST_DEBUG_OBJECT (basesink, "basesink activated in pull mode, "
3537             "returning SUCCESS directly");
3538         GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3539         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3540             gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
3541         GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3542         ret = GST_STATE_CHANGE_SUCCESS;
3543       }
3544       break;
3545     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3546       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
3547       /* FIXME, make sure we cannot enter _render first */
3548
3549       /* we need to call ::unlock before locking PREROLL_LOCK
3550        * since we lock it before going into ::render */
3551       if (bclass->unlock)
3552         bclass->unlock (basesink);
3553
3554       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3555       /* now that we have the PREROLL lock, clear our unlock request */
3556       if (bclass->unlock_stop)
3557         bclass->unlock_stop (basesink);
3558
3559       /* we need preroll again and we set the flag before unlocking the clockid
3560        * because if the clockid is unlocked before a current buffer expired, we
3561        * can use that buffer to preroll with */
3562       basesink->need_preroll = TRUE;
3563
3564       if (basesink->clock_id) {
3565         gst_clock_id_unschedule (basesink->clock_id);
3566       }
3567
3568       /* if we don't have a preroll buffer we need to wait for a preroll and
3569        * return ASYNC. */
3570       if (!gst_base_sink_needs_preroll (basesink)) {
3571         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
3572         basesink->playing_async = FALSE;
3573       } else {
3574         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
3575           ret = GST_STATE_CHANGE_SUCCESS;
3576         } else {
3577           GST_DEBUG_OBJECT (basesink,
3578               "PLAYING to PAUSED, we are not prerolled");
3579           basesink->playing_async = TRUE;
3580           priv->commited = FALSE;
3581           if (priv->async_enabled) {
3582             GST_DEBUG_OBJECT (basesink, "doing async state change");
3583             ret = GST_STATE_CHANGE_ASYNC;
3584             gst_element_post_message (GST_ELEMENT_CAST (basesink),
3585                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
3586                     FALSE));
3587           }
3588         }
3589       }
3590       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
3591           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
3592
3593       gst_base_sink_reset_qos (basesink);
3594       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3595       break;
3596     case GST_STATE_CHANGE_PAUSED_TO_READY:
3597       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3598       /* start by reseting our position state with the object lock so that the
3599        * position query gets the right idea. We do this before we post the
3600        * messages so that the message handlers pick this up. */
3601       GST_OBJECT_LOCK (basesink);
3602       basesink->have_newsegment = FALSE;
3603       priv->current_sstart = -1;
3604       priv->current_sstop = -1;
3605       priv->have_latency = FALSE;
3606       GST_OBJECT_UNLOCK (basesink);
3607
3608       gst_base_sink_set_last_buffer (basesink, NULL);
3609
3610       if (!priv->commited) {
3611         if (priv->async_enabled) {
3612           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
3613
3614           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3615               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
3616                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
3617
3618           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3619               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
3620         }
3621         priv->commited = TRUE;
3622       } else {
3623         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
3624       }
3625       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3626       break;
3627     case GST_STATE_CHANGE_READY_TO_NULL:
3628       if (bclass->stop) {
3629         if (!bclass->stop (basesink)) {
3630           GST_WARNING_OBJECT (basesink, "failed to stop");
3631         }
3632       }
3633       gst_base_sink_set_last_buffer (basesink, NULL);
3634       break;
3635     default:
3636       break;
3637   }
3638
3639   return ret;
3640
3641   /* ERRORS */
3642 start_failed:
3643   {
3644     GST_DEBUG_OBJECT (basesink, "failed to start");
3645     return GST_STATE_CHANGE_FAILURE;
3646   }
3647 activate_failed:
3648   {
3649     GST_DEBUG_OBJECT (basesink,
3650         "element failed to change states -- activation problem?");
3651     return GST_STATE_CHANGE_FAILURE;
3652   }
3653 }