fix misleading log statement
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005,2006 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is send upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Last reviewed on 2006-09-27 (0.10.11)
134  */
135
136 #ifdef HAVE_CONFIG_H
137 #  include "config.h"
138 #endif
139
140 #include "gstbasesink.h"
141 #include <gst/gstmarshal.h>
142 #include <gst/gst_private.h>
143 #include <gst/gst-i18n-lib.h>
144
145 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
146 #define GST_CAT_DEFAULT gst_base_sink_debug
147
148 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
149    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
150
151 /* FIXME, some stuff in ABI.data and other in Private...
152  * Make up your mind please.
153  */
154 struct _GstBaseSinkPrivate
155 {
156   gint qos_enabled;             /* ATOMIC */
157
158   /* start, stop of current buffer, stream time, used to report position */
159   GstClockTime current_sstart;
160   GstClockTime current_sstop;
161
162   /* start, stop and jitter of current buffer, running time */
163   GstClockTime current_rstart;
164   GstClockTime current_rstop;
165   GstClockTimeDiff current_jitter;
166
167   /* EOS sync time in running time */
168   GstClockTime eos_rtime;
169
170   /* last buffer that arrived in time, running time */
171   GstClockTime last_in_time;
172   /* when the last buffer left the sink, running time */
173   GstClockTime last_left;
174
175   /* running averages go here these are done on running time */
176   GstClockTime avg_pt;
177   GstClockTime avg_duration;
178   gdouble avg_rate;
179
180   /* these are done on system time. avg_jitter and avg_render are
181    * compared to eachother to see if the rendering time takes a
182    * huge amount of the processing, If so we are flooded with
183    * buffers. */
184   GstClockTime last_left_systime;
185   GstClockTime avg_jitter;
186   GTimeVal start, stop;
187   GstClockTime avg_render;
188
189   /* number of rendered and dropped frames */
190   guint64 rendered;
191   guint64 dropped;
192
193   /* latency stuff */
194   GstClockTime latency;
195 };
196
197 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
198
199 /* generic running average, this has a neutral window size */
200 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
201
202 /* the windows for these running averages are experimentally obtained.
203  * possitive values get averaged more while negative values use a small
204  * window so we can react faster to badness. */
205 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
206 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
207
208 /* BaseSink signals and properties */
209 enum
210 {
211   /* FILL ME */
212   SIGNAL_HANDOFF,
213   LAST_SIGNAL
214 };
215
216 #define DEFAULT_SIZE 1024
217 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
218 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
219
220 #define DEFAULT_PREROLL_QUEUE_LEN       0
221 #define DEFAULT_SYNC                    TRUE
222 #define DEFAULT_MAX_LATENESS            -1
223 #define DEFAULT_QOS                     FALSE
224
225 enum
226 {
227   PROP_0,
228   PROP_PREROLL_QUEUE_LEN,
229   PROP_SYNC,
230   PROP_MAX_LATENESS,
231   PROP_QOS
232 };
233
234 static GstElementClass *parent_class = NULL;
235
236 static void gst_base_sink_base_init (gpointer g_class);
237 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
238 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
239 static void gst_base_sink_finalize (GObject * object);
240
241 GType
242 gst_base_sink_get_type (void)
243 {
244   static GType base_sink_type = 0;
245
246   if (G_UNLIKELY (base_sink_type == 0)) {
247     static const GTypeInfo base_sink_info = {
248       sizeof (GstBaseSinkClass),
249       (GBaseInitFunc) gst_base_sink_base_init,
250       NULL,
251       (GClassInitFunc) gst_base_sink_class_init,
252       NULL,
253       NULL,
254       sizeof (GstBaseSink),
255       0,
256       (GInstanceInitFunc) gst_base_sink_init,
257     };
258
259     base_sink_type = g_type_register_static (GST_TYPE_ELEMENT,
260         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
261   }
262   return base_sink_type;
263 }
264
265 static void gst_base_sink_set_property (GObject * object, guint prop_id,
266     const GValue * value, GParamSpec * pspec);
267 static void gst_base_sink_get_property (GObject * object, guint prop_id,
268     GValue * value, GParamSpec * pspec);
269
270 static gboolean gst_base_sink_send_event (GstElement * element,
271     GstEvent * event);
272 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
273
274 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
275 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
276 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
277     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
278 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
279     GstClockTime * start, GstClockTime * end);
280 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
281     GstPad * pad, gboolean flushing);
282 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
283     gboolean active);
284
285 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
286     GstStateChange transition);
287
288 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
289 static void gst_base_sink_loop (GstPad * pad);
290 static gboolean gst_base_sink_pad_activate (GstPad * pad);
291 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
292 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
293 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
294 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
295
296 /* check if an object was too late */
297 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
298     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
299     GstClockReturn status, GstClockTimeDiff jitter);
300
301 static void
302 gst_base_sink_base_init (gpointer g_class)
303 {
304   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
305       "basesink element");
306 }
307
308 static void
309 gst_base_sink_class_init (GstBaseSinkClass * klass)
310 {
311   GObjectClass *gobject_class;
312   GstElementClass *gstelement_class;
313
314   gobject_class = G_OBJECT_CLASS (klass);
315   gstelement_class = GST_ELEMENT_CLASS (klass);
316
317   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
318
319   parent_class = g_type_class_peek_parent (klass);
320
321   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
322   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
323   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
324
325   /* FIXME, this next value should be configured using an event from the
326    * upstream element, ie, the BUFFER_SIZE event. */
327   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
328       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
329           "Number of buffers to queue during preroll", 0, G_MAXUINT,
330           DEFAULT_PREROLL_QUEUE_LEN, G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
331
332   g_object_class_install_property (gobject_class, PROP_SYNC,
333       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
334           G_PARAM_READWRITE));
335
336   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
337       g_param_spec_int64 ("max-lateness", "Max Lateness",
338           "Maximum number of nanoseconds that a buffer can be late before it "
339           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
340           G_PARAM_READWRITE));
341
342   g_object_class_install_property (gobject_class, PROP_QOS,
343       g_param_spec_boolean ("qos", "Qos",
344           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
345           G_PARAM_READWRITE));
346
347   gstelement_class->change_state =
348       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
349   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
350   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
351
352   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
353   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
354   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
355   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
356   klass->activate_pull =
357       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
358 }
359
360 static GstCaps *
361 gst_base_sink_pad_getcaps (GstPad * pad)
362 {
363   GstBaseSinkClass *bclass;
364   GstBaseSink *bsink;
365   GstCaps *caps = NULL;
366
367   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
368   bclass = GST_BASE_SINK_GET_CLASS (bsink);
369   if (bclass->get_caps)
370     caps = bclass->get_caps (bsink);
371
372   if (caps == NULL) {
373     GstPadTemplate *pad_template;
374
375     pad_template =
376         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
377     if (pad_template != NULL) {
378       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
379     }
380   }
381   gst_object_unref (bsink);
382
383   return caps;
384 }
385
386 static gboolean
387 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
388 {
389   GstBaseSinkClass *bclass;
390   GstBaseSink *bsink;
391   gboolean res = TRUE;
392
393   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
394   bclass = GST_BASE_SINK_GET_CLASS (bsink);
395
396   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
397     GstPad *peer = gst_pad_get_peer (pad);
398
399     if (peer)
400       res = gst_pad_set_caps (peer, caps);
401     else
402       res = FALSE;
403
404     if (!res)
405       GST_DEBUG_OBJECT (bsink, "peer setcaps() failed");
406   }
407
408   if (res && bclass->set_caps)
409     res = bclass->set_caps (bsink, caps);
410
411   gst_object_unref (bsink);
412
413   return res;
414 }
415
416 static void
417 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
418 {
419   GstBaseSinkClass *bclass;
420   GstBaseSink *bsink;
421
422   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
423   bclass = GST_BASE_SINK_GET_CLASS (bsink);
424
425   if (bclass->fixate)
426     bclass->fixate (bsink, caps);
427
428   gst_object_unref (bsink);
429 }
430
431 static GstFlowReturn
432 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
433     GstCaps * caps, GstBuffer ** buf)
434 {
435   GstBaseSinkClass *bclass;
436   GstBaseSink *bsink;
437   GstFlowReturn result = GST_FLOW_OK;
438
439   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
440   bclass = GST_BASE_SINK_GET_CLASS (bsink);
441
442   if (bclass->buffer_alloc)
443     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
444   else
445     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
446
447   gst_object_unref (bsink);
448
449   return result;
450 }
451
452 static void
453 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
454 {
455   GstPadTemplate *pad_template;
456   GstBaseSinkPrivate *priv;
457
458   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
459
460   pad_template =
461       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
462   g_return_if_fail (pad_template != NULL);
463
464   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
465
466   gst_pad_set_getcaps_function (basesink->sinkpad,
467       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
468   gst_pad_set_setcaps_function (basesink->sinkpad,
469       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
470   gst_pad_set_fixatecaps_function (basesink->sinkpad,
471       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
472   gst_pad_set_bufferalloc_function (basesink->sinkpad,
473       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
474   gst_pad_set_activate_function (basesink->sinkpad,
475       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
476   gst_pad_set_activatepush_function (basesink->sinkpad,
477       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
478   gst_pad_set_activatepull_function (basesink->sinkpad,
479       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
480   gst_pad_set_event_function (basesink->sinkpad,
481       GST_DEBUG_FUNCPTR (gst_base_sink_event));
482   gst_pad_set_chain_function (basesink->sinkpad,
483       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
484   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
485
486   basesink->pad_mode = GST_ACTIVATE_NONE;
487   basesink->preroll_queue = g_queue_new ();
488   basesink->abidata.ABI.clip_segment = gst_segment_new ();
489
490   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
491   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
492
493   basesink->sync = DEFAULT_SYNC;
494   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
495   gst_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
496
497   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
498 }
499
500 static void
501 gst_base_sink_finalize (GObject * object)
502 {
503   GstBaseSink *basesink;
504
505   basesink = GST_BASE_SINK (object);
506
507   g_queue_free (basesink->preroll_queue);
508   gst_segment_free (basesink->abidata.ABI.clip_segment);
509
510   G_OBJECT_CLASS (parent_class)->finalize (object);
511 }
512
513 /**
514  * gst_base_sink_set_sync:
515  * @sink: the sink
516  * @sync: the new sync value.
517  *
518  * Configures @sink to synchronize on the clock or not. When
519  * @sync is FALSE, incomming samples will be played as fast as
520  * possible. If @sync is TRUE, the timestamps of the incomming
521  * buffers will be used to schedule the exact render time of its
522  * contents.
523  *
524  * Since: 0.10.4
525  */
526 void
527 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
528 {
529   GST_OBJECT_LOCK (sink);
530   sink->sync = sync;
531   GST_OBJECT_UNLOCK (sink);
532 }
533
534 /**
535  * gst_base_sink_get_sync:
536  * @sink: the sink
537  *
538  * Checks if @sink is currently configured to synchronize against the
539  * clock.
540  *
541  * Returns: TRUE if the sink is configured to synchronize against the clock.
542  *
543  * Since: 0.10.4
544  */
545 gboolean
546 gst_base_sink_get_sync (GstBaseSink * sink)
547 {
548   gboolean res;
549
550   GST_OBJECT_LOCK (sink);
551   res = sink->sync;
552   GST_OBJECT_UNLOCK (sink);
553
554   return res;
555 }
556
557 /**
558  * gst_base_sink_set_max_lateness:
559  * @sink: the sink
560  * @max_lateness: the new max lateness value.
561  *
562  * Sets the new max lateness value to @max_lateness. This value is
563  * used to decide if a buffer should be dropped or not based on the
564  * buffer timestamp and the current clock time. A value of -1 means
565  * an unlimited time.
566  *
567  * Since: 0.10.4
568  */
569 void
570 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
571 {
572   GST_OBJECT_LOCK (sink);
573   sink->abidata.ABI.max_lateness = max_lateness;
574   GST_OBJECT_UNLOCK (sink);
575 }
576
577 /**
578  * gst_base_sink_get_max_lateness:
579  * @sink: the sink
580  *
581  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
582  * more details.
583  *
584  * Returns: The maximum time in nanoseconds that a buffer can be late
585  * before it is dropped and not rendered. A value of -1 means an
586  * unlimited time.
587  *
588  * Since: 0.10.4
589  */
590 gint64
591 gst_base_sink_get_max_lateness (GstBaseSink * sink)
592 {
593   gint64 res;
594
595   GST_OBJECT_LOCK (sink);
596   res = sink->abidata.ABI.max_lateness;
597   GST_OBJECT_UNLOCK (sink);
598
599   return res;
600 }
601
602 /**
603  * gst_base_sink_set_qos_enabled:
604  * @sink: the sink
605  * @enabled: the new qos value.
606  *
607  * Configures @sink to send Quality-of-Service events upstream.
608  *
609  * Since: 0.10.5
610  */
611 void
612 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
613 {
614   gst_atomic_int_set (&sink->priv->qos_enabled, enabled);
615 }
616
617 /**
618  * gst_base_sink_is_qos_enabled:
619  * @sink: the sink
620  *
621  * Checks if @sink is currently configured to send Quality-of-Service events
622  * upstream.
623  *
624  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
625  *
626  * Since: 0.10.5
627  */
628 gboolean
629 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
630 {
631   gboolean res;
632
633   res = g_atomic_int_get (&sink->priv->qos_enabled);
634
635   return res;
636 }
637
638 /**
639  * gst_base_sink_get_latency:
640  * @sink: the sink
641  *
642  * Get the currently configured latency.
643  *
644  * Returns: The configured latency.
645  *
646  * Since: 0.10.12
647  */
648 GstClockTime
649 gst_base_sink_get_latency (GstBaseSink * sink)
650 {
651   GstClockTime res;
652
653   GST_OBJECT_LOCK (sink);
654   res = sink->priv->latency;
655   GST_OBJECT_UNLOCK (sink);
656
657   return res;
658 }
659
660 /**
661  * gst_base_sink_query_latency:
662  * @sink: the sink
663  * @live: if the sink is live
664  * @upstream_live: if an upstream element is live
665  * @min_latency: the min latency of the upstream elements
666  * @max_latency: the max latency of the upstream elements
667  *
668  * Query the sink for the latency parameters. The latency will be queried from
669  * the upstream elements. @live will be TRUE if @sink is configured to
670  * synchronize against the clock. @upstream_live will be TRUE if an upstream
671  * element is live. 
672  *
673  * If both @live and @upstream_live are TRUE, the sink will want to compensate
674  * for the latency introduced by the upstream elements by setting the
675  * @min_latency to a strictly possitive value.
676  *
677  * This function is mostly used by subclasses. 
678  *
679  * Returns: TRUE if the query succeeded.
680  *
681  * Since: 0.10.12
682  */
683 gboolean
684 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
685     gboolean * upstream_live, GstClockTime * min_latency,
686     GstClockTime * max_latency)
687 {
688   gboolean l, us_live, res;
689   GstClockTime min, max;
690   GstQuery *query;
691   GstClockTime us_min, us_max;
692
693   /* we are live when we sync to the clock */
694   l = gst_base_sink_get_sync (sink);
695
696   /* assume no latency */
697   min = 0;
698   max = -1;
699   us_live = FALSE;
700
701   query = gst_query_new_latency ();
702
703   /* ask the peer for the latency */
704   if (!(res = gst_base_sink_peer_query (sink, query)))
705     goto query_failed;
706
707   /* get upstream min and max latency */
708   gst_query_parse_latency (query, &us_live, &us_min, &us_max);
709   if (us_live) {
710     /* upstream live, use its latency, subclasses should use these
711      * values to create the complete latency. */
712     min = us_min;
713     max = us_max;
714   }
715
716   GST_DEBUG_OBJECT (sink, "latency query: live: %d, upstream: %d, min %"
717       GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l, us_live,
718       GST_TIME_ARGS (min), GST_TIME_ARGS (max));
719
720   if (live)
721     *live = l;
722   if (upstream_live)
723     *upstream_live = us_live;
724   if (min_latency)
725     *min_latency = min;
726   if (max_latency)
727     *max_latency = max;
728
729 done:
730   gst_query_unref (query);
731
732   return res;
733
734   /* ERRORS */
735 query_failed:
736   {
737     GST_DEBUG_OBJECT (sink, "latency query failed");
738     goto done;
739   }
740 }
741
742 static void
743 gst_base_sink_set_property (GObject * object, guint prop_id,
744     const GValue * value, GParamSpec * pspec)
745 {
746   GstBaseSink *sink = GST_BASE_SINK (object);
747
748   switch (prop_id) {
749     case PROP_PREROLL_QUEUE_LEN:
750       /* preroll lock necessary to serialize with finish_preroll */
751       GST_PAD_PREROLL_LOCK (sink->sinkpad);
752       sink->preroll_queue_max_len = g_value_get_uint (value);
753       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
754       break;
755     case PROP_SYNC:
756       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
757       break;
758     case PROP_MAX_LATENESS:
759       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
760       break;
761     case PROP_QOS:
762       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
763       break;
764     default:
765       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
766       break;
767   }
768 }
769
770 static void
771 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
772     GParamSpec * pspec)
773 {
774   GstBaseSink *sink = GST_BASE_SINK (object);
775
776   switch (prop_id) {
777     case PROP_PREROLL_QUEUE_LEN:
778       GST_PAD_PREROLL_LOCK (sink->sinkpad);
779       g_value_set_uint (value, sink->preroll_queue_max_len);
780       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
781       break;
782     case PROP_SYNC:
783       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
784       break;
785     case PROP_MAX_LATENESS:
786       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
787       break;
788     case PROP_QOS:
789       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
790       break;
791     default:
792       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
793       break;
794   }
795 }
796
797
798 static GstCaps *
799 gst_base_sink_get_caps (GstBaseSink * sink)
800 {
801   return NULL;
802 }
803
804 static gboolean
805 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
806 {
807   return TRUE;
808 }
809
810 static GstFlowReturn
811 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
812     GstCaps * caps, GstBuffer ** buf)
813 {
814   *buf = NULL;
815   return GST_FLOW_OK;
816 }
817
818 /* with PREROLL_LOCK, STREAM_LOCK */
819 static void
820 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
821 {
822   GstMiniObject *obj;
823
824   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
825   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
826     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
827     gst_mini_object_unref (obj);
828   }
829   /* we can't have EOS anymore now */
830   basesink->eos = FALSE;
831   basesink->eos_queued = FALSE;
832   basesink->preroll_queued = 0;
833   basesink->buffers_queued = 0;
834   basesink->events_queued = 0;
835   basesink->have_preroll = FALSE;
836   /* and signal any waiters now */
837   GST_PAD_PREROLL_SIGNAL (pad);
838 }
839
840 /* with STREAM_LOCK, configures given segment with the event information. */
841 static void
842 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
843     GstEvent * event, GstSegment * segment)
844 {
845   gboolean update;
846   gdouble rate, arate;
847   GstFormat format;
848   gint64 start;
849   gint64 stop;
850   gint64 time;
851
852   /* the newsegment event is needed to bring the buffer timestamps to the
853    * stream time and to drop samples outside of the playback segment. */
854   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
855       &start, &stop, &time);
856
857   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
858    * We protect with the OBJECT_LOCK so that we can use the values to
859    * safely answer a POSITION query. */
860   GST_OBJECT_LOCK (basesink);
861   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
862       stop, time);
863
864   if (format == GST_FORMAT_TIME) {
865     GST_DEBUG_OBJECT (basesink,
866         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
867         "format GST_FORMAT_TIME, "
868         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
869         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
870         update, rate, arate, GST_TIME_ARGS (segment->start),
871         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
872         GST_TIME_ARGS (segment->accum));
873   } else {
874     GST_DEBUG_OBJECT (basesink,
875         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
876         "format %d, "
877         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
878         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
879         segment->format, segment->start, segment->stop, segment->time,
880         segment->accum);
881   }
882   GST_OBJECT_UNLOCK (basesink);
883 }
884
885 /* with PREROLL_LOCK, STREAM_LOCK */
886 static gboolean
887 gst_base_sink_commit_state (GstBaseSink * basesink)
888 {
889   /* commit state and proceed to next pending state */
890   GstState current, next, pending, post_pending;
891   GstMessage *message;
892   gboolean post_paused = FALSE;
893   gboolean post_playing = FALSE;
894
895   /* we are certainly not playing async anymore now */
896   basesink->playing_async = FALSE;
897
898   GST_OBJECT_LOCK (basesink);
899   current = GST_STATE (basesink);
900   next = GST_STATE_NEXT (basesink);
901   pending = GST_STATE_PENDING (basesink);
902   post_pending = pending;
903
904   switch (pending) {
905     case GST_STATE_PLAYING:
906     {
907       GstBaseSinkClass *bclass;
908       GstStateChangeReturn ret;
909
910       bclass = GST_BASE_SINK_GET_CLASS (basesink);
911
912       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
913
914       basesink->need_preroll = FALSE;
915       post_playing = TRUE;
916       /* post PAUSED too when we were READY */
917       if (current == GST_STATE_READY) {
918         post_paused = TRUE;
919       }
920
921       /* make sure we notify the subclass of async playing */
922       if (bclass->async_play) {
923         ret = bclass->async_play (basesink);
924         if (ret == GST_STATE_CHANGE_FAILURE)
925           goto async_failed;
926       }
927       break;
928     }
929     case GST_STATE_PAUSED:
930       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
931       post_paused = TRUE;
932       post_pending = GST_STATE_VOID_PENDING;
933       break;
934     case GST_STATE_READY:
935     case GST_STATE_NULL:
936       goto stopping;
937     case GST_STATE_VOID_PENDING:
938       goto nothing_pending;
939     default:
940       break;
941   }
942
943   GST_STATE (basesink) = pending;
944   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
945   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
946   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
947   GST_OBJECT_UNLOCK (basesink);
948
949   if (post_paused) {
950     message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
951         current, next, post_pending);
952     gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
953   }
954   if (post_playing) {
955     message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
956         next, pending, GST_STATE_VOID_PENDING);
957     gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
958   }
959   /* and mark dirty */
960   if (post_paused || post_playing) {
961     gst_element_post_message (GST_ELEMENT_CAST (basesink),
962         gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
963   }
964
965   GST_STATE_BROADCAST (basesink);
966
967   return TRUE;
968
969 nothing_pending:
970   {
971     /* Depending on the state, set our vars. We get in this situation when the
972      * state change function got a change to update the state vars before the
973      * streaming thread did. This is fine but we need to make sure that we
974      * update the need_preroll var since it was TRUE when we got here and might
975      * become FALSE if we got to PLAYING. */
976     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
977         gst_element_state_get_name (current));
978     switch (current) {
979       case GST_STATE_PLAYING:
980         basesink->need_preroll = FALSE;
981         break;
982       case GST_STATE_PAUSED:
983         basesink->need_preroll = TRUE;
984         break;
985       default:
986         basesink->need_preroll = FALSE;
987         basesink->flushing = TRUE;
988         break;
989     }
990     GST_OBJECT_UNLOCK (basesink);
991     return TRUE;
992   }
993 stopping:
994   {
995     /* app is going to READY */
996     GST_DEBUG_OBJECT (basesink, "stopping");
997     basesink->need_preroll = FALSE;
998     basesink->flushing = TRUE;
999     GST_OBJECT_UNLOCK (basesink);
1000     return FALSE;
1001   }
1002 async_failed:
1003   {
1004     GST_DEBUG_OBJECT (basesink, "async commit failed");
1005     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1006     GST_OBJECT_UNLOCK (basesink);
1007     return FALSE;
1008   }
1009 }
1010
1011
1012 /* with STREAM_LOCK, PREROLL_LOCK
1013  *
1014  * Returns TRUE if the object needs synchronisation and takes therefore
1015  * part in prerolling.
1016  *
1017  * rsstart/rsstop contain the start/stop in stream time.
1018  * rrstart/rrstop contain the start/stop in running time.
1019  */
1020 static gboolean
1021 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1022     GstClockTime * rsstart, GstClockTime * rsstop,
1023     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync)
1024 {
1025   GstBaseSinkClass *bclass;
1026   GstBuffer *buffer;
1027   GstClockTime start, stop;     /* raw start/stop timestamps */
1028   gint64 cstart, cstop;         /* clipped raw timestamps */
1029   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1030   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1031   GstFormat format;
1032   GstSegment *segment;
1033
1034   /* start with nothing */
1035   start = stop = sstart = sstop = rstart = rstop = -1;
1036
1037   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1038     GstEvent *event = GST_EVENT_CAST (obj);
1039
1040     switch (GST_EVENT_TYPE (event)) {
1041         /* EOS event needs syncing */
1042       case GST_EVENT_EOS:
1043         sstart = sstop = basesink->priv->current_sstop;
1044         rstart = rstop = basesink->priv->eos_rtime;
1045         *do_sync = rstart != -1;
1046         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1047             GST_TIME_ARGS (rstart));
1048         goto done;
1049         /* other events do not need syncing */
1050         /* FIXME, maybe NEWSEGMENT might need synchronisation
1051          * since the POSITION query depends on accumulated times and
1052          * we cannot accumulate the current segment before the previous
1053          * one completed.
1054          */
1055       default:
1056         return FALSE;
1057     }
1058   }
1059
1060   /* else do buffer sync code */
1061   buffer = GST_BUFFER_CAST (obj);
1062
1063   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1064
1065   /* just get the times to see if we need syncing */
1066   if (bclass->get_times)
1067     bclass->get_times (basesink, buffer, &start, &stop);
1068
1069   if (start == -1) {
1070     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1071     *do_sync = FALSE;
1072   } else {
1073     *do_sync = TRUE;
1074   }
1075
1076   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1077       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1078       GST_TIME_ARGS (stop), *do_sync);
1079
1080   /* collect segment and format for code clarity */
1081   segment = &basesink->segment;
1082   format = segment->format;
1083
1084   /* no timestamp clipping if we did not * get a TIME segment format */
1085   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1086     cstart = start;
1087     cstop = stop;
1088     goto do_times;
1089   }
1090
1091   /* clip */
1092   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1093               (gint64) start, (gint64) stop, &cstart, &cstop)))
1094     goto out_of_segment;
1095
1096   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1097     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1098         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1099         GST_TIME_ARGS (cstop));
1100   }
1101
1102   /* set last stop position */
1103   gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1104
1105 do_times:
1106   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1107    * upstream is behaving very badly */
1108   sstart = gst_segment_to_stream_time (segment, format, cstart);
1109   sstop = gst_segment_to_stream_time (segment, format, cstop);
1110   rstart = gst_segment_to_running_time (segment, format, cstart);
1111   rstop = gst_segment_to_running_time (segment, format, cstop);
1112
1113 done:
1114   /* save times */
1115   *rsstart = sstart;
1116   *rsstop = sstop;
1117   *rrstart = rstart;
1118   *rrstop = rstop;
1119
1120   /* buffers and EOS always need syncing and preroll */
1121   return TRUE;
1122
1123   /* special cases */
1124 out_of_segment:
1125   {
1126     /* should not happen since we clip them in the chain function already, 
1127      * we return FALSE so that we don't try to sync on it. */
1128     GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
1129         (NULL), ("unexpected buffer out of segment found."));
1130     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1131     return FALSE;
1132   }
1133 }
1134
1135 /* with STREAM_LOCK, PREROLL_LOCK
1136  *
1137  * Waits for the clock to reach @time. If @time is not valid, no
1138  * synchronisation is done and BADTIME is returned. 
1139  * If synchronisation is disabled in the element or there is no
1140  * clock, no synchronisation is done and BADTIME is returned.
1141  *
1142  * Else a blocking wait is performed on the clock. We save the ClockID
1143  * so we can unlock the entry at any time. While we are blocking, we 
1144  * release the PREROLL_LOCK so that other threads can interrupt the entry.
1145  *
1146  * @time is expressed in running time.
1147  */
1148 static GstClockReturn
1149 gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time,
1150     GstClockTimeDiff * jitter)
1151 {
1152   GstClockID id;
1153   GstClockReturn ret;
1154   GstClock *clock;
1155
1156   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1157     goto invalid_time;
1158
1159   GST_OBJECT_LOCK (basesink);
1160   if (G_UNLIKELY (!basesink->sync))
1161     goto no_sync;
1162
1163   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
1164     goto no_clock;
1165
1166   /* add base time and latency */
1167   time += GST_ELEMENT_CAST (basesink)->base_time;
1168   time += basesink->priv->latency;
1169
1170   id = gst_clock_new_single_shot_id (clock, time);
1171   GST_OBJECT_UNLOCK (basesink);
1172
1173   basesink->clock_id = id;
1174   /* release the preroll lock while waiting */
1175   GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1176
1177   ret = gst_clock_id_wait (id, jitter);
1178
1179   GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1180   gst_clock_id_unref (id);
1181   basesink->clock_id = NULL;
1182
1183   return ret;
1184
1185   /* no syncing needed */
1186 invalid_time:
1187   {
1188     GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed");
1189     return GST_CLOCK_BADTIME;
1190   }
1191 no_sync:
1192   {
1193     GST_DEBUG_OBJECT (basesink, "sync disabled");
1194     GST_OBJECT_UNLOCK (basesink);
1195     return GST_CLOCK_BADTIME;
1196   }
1197 no_clock:
1198   {
1199     GST_DEBUG_OBJECT (basesink, "no clock, can't sync");
1200     GST_OBJECT_UNLOCK (basesink);
1201     return GST_CLOCK_BADTIME;
1202   }
1203 }
1204
1205 /**
1206  * gst_base_sink_wait_preroll:
1207  * @sink: the sink
1208  *
1209  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1210  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1211  * this method before continuing to render the remaining data.
1212  *
1213  * This function will block until a state change to PLAYING happens (in which
1214  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1215  * to a state change to READY or a FLUSH event (in which case this function
1216  * returns #GST_FLOW_WRONG_STATE).
1217  *
1218  * Since: 0.10.11
1219  *
1220  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1221  * continue. Any other return value should be returned from the render vmethod.
1222  */
1223 GstFlowReturn
1224 gst_base_sink_wait_preroll (GstBaseSink * sink)
1225 {
1226   /* block until the state changes, or we get a flush, or something */
1227   GST_DEBUG_OBJECT (sink, "wait for preroll...");
1228   sink->have_preroll = TRUE;
1229   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1230   sink->have_preroll = FALSE;
1231   GST_DEBUG_OBJECT (sink, "preroll done");
1232   if (G_UNLIKELY (sink->flushing))
1233     goto stopping;
1234   GST_DEBUG_OBJECT (sink, "continue after preroll");
1235
1236   return GST_FLOW_OK;
1237
1238   /* ERRORS */
1239 stopping:
1240   {
1241     GST_DEBUG_OBJECT (sink, "preroll interrupted");
1242     return GST_FLOW_WRONG_STATE;
1243   }
1244 }
1245
1246 /* with STREAM_LOCK, PREROLL_LOCK
1247  *
1248  * Make sure we are in PLAYING and synchronize an object to the clock.
1249  *
1250  * If we need preroll, we are not in PLAYING. We try to commit the state
1251  * if needed and then block if we still are not PLAYING.
1252  *
1253  * We start waiting on the clock in PLAYING. If we got interrupted, we
1254  * immediatly try to re-preroll.
1255  *
1256  * Some objects do not need synchronisation (most events) and so this function
1257  * immediatly returns GST_FLOW_OK.
1258  *
1259  * for objects that arrive later than max-lateness to be synchronized to the 
1260  * clock have the @late boolean set to TRUE.
1261  *
1262  * This function keeps a running average of the jitter (the diff between the
1263  * clock time and the requested sync time). The jitter is negative for
1264  * objects that arrive in time and positive for late buffers.
1265  *
1266  * does not take ownership of obj.
1267  */
1268 static GstFlowReturn
1269 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
1270     GstMiniObject * obj, gboolean * late)
1271 {
1272   GstClockTimeDiff jitter;
1273   gboolean syncable;
1274   GstClockReturn status = GST_CLOCK_OK;
1275   GstClockTime sstart, sstop, rstart, rstop;
1276   gboolean do_sync;
1277
1278   sstart = sstop = rstart = rstop = -1;
1279   do_sync = TRUE;
1280
1281   basesink->priv->current_rstart = -1;
1282
1283   /* update timing information for this object */
1284   syncable = gst_base_sink_get_sync_times (basesink, obj,
1285       &sstart, &sstop, &rstart, &rstop, &do_sync);
1286
1287   /* a syncable object needs to participate in preroll and
1288    * clocking. All buffers and EOS are syncable. */
1289   if (G_UNLIKELY (!syncable))
1290     goto not_syncable;
1291
1292   /* store timing info for current object */
1293   basesink->priv->current_rstart = rstart;
1294   basesink->priv->current_rstop = (rstop != -1 ? rstop : rstart);
1295   /* save sync time for eos when the previous object needed sync */
1296   basesink->priv->eos_rtime = (do_sync ? basesink->priv->current_rstop : -1);
1297
1298   /* lock because we read this when answering the POSITION 
1299    * query. */
1300   GST_OBJECT_LOCK (basesink);
1301   basesink->priv->current_sstart = sstart;
1302   basesink->priv->current_sstop = (sstop != -1 ? sstop : sstart);
1303   GST_OBJECT_UNLOCK (basesink);
1304
1305 again:
1306   /* first do preroll, this makes sure we commit our state
1307    * to PAUSED and can continue to PLAYING. We cannot perform
1308    * any clock sync in PAUSED because there is no clock. 
1309    */
1310   while (G_UNLIKELY (basesink->need_preroll)) {
1311     GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
1312
1313     if (G_LIKELY (basesink->playing_async)) {
1314       /* commit state */
1315       if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
1316         goto stopping;
1317     }
1318
1319     /* need to recheck here because the commit state could have
1320      * made us not need the preroll anymore */
1321     if (G_LIKELY (basesink->need_preroll)) {
1322       /* block until the state changes, or we get a flush, or something */
1323       if (gst_base_sink_wait_preroll (basesink) != GST_FLOW_OK)
1324         goto flushing;
1325     }
1326   }
1327
1328   if (!do_sync)
1329     goto done;
1330
1331   /* preroll done, we can sync since we are in PLAYING now. */
1332   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
1333       GST_TIME_FORMAT, GST_TIME_ARGS (rstart));
1334
1335   /* this function will return immediatly if start == -1, no clock
1336    * or sync is disabled with GST_CLOCK_BADTIME. */
1337   status = gst_base_sink_wait_clock (basesink, rstart, &jitter);
1338
1339   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
1340
1341   /* invalid time, no clock or sync disabled, just render */
1342   if (status == GST_CLOCK_BADTIME)
1343     goto done;
1344
1345   /* waiting could have been interrupted and we can be flushing now */
1346   if (G_UNLIKELY (basesink->flushing))
1347     goto flushing;
1348
1349   /* check for unlocked by a state change, we are not flushing so
1350    * we can try to preroll on the current buffer. */
1351   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
1352     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
1353     goto again;
1354   }
1355
1356   /* successful syncing done, record observation */
1357   basesink->priv->current_jitter = jitter;
1358
1359   /* check if the object should be dropped */
1360   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
1361       status, jitter);
1362
1363 done:
1364   return GST_FLOW_OK;
1365
1366   /* ERRORS */
1367 not_syncable:
1368   {
1369     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
1370     return GST_FLOW_OK;
1371   }
1372 flushing:
1373   {
1374     GST_DEBUG_OBJECT (basesink, "we are flushing");
1375     return GST_FLOW_WRONG_STATE;
1376   }
1377 stopping:
1378   {
1379     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
1380     return GST_FLOW_WRONG_STATE;
1381   }
1382 }
1383
1384 static gboolean
1385 gst_base_sink_send_qos (GstBaseSink * basesink,
1386     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
1387 {
1388   GstEvent *event;
1389   gboolean res;
1390
1391   /* generate Quality-of-Service event */
1392   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
1393       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1394       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
1395
1396   event = gst_event_new_qos (proportion, diff, time);
1397
1398   /* send upstream */
1399   res = gst_pad_push_event (basesink->sinkpad, event);
1400
1401   return res;
1402 }
1403
1404 static void
1405 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
1406 {
1407   GstBaseSinkPrivate *priv;
1408   GstClockTime start, stop;
1409   GstClockTimeDiff jitter;
1410   GstClockTime pt, entered, left;
1411   GstClockTime duration;
1412   gdouble rate;
1413
1414   priv = sink->priv;
1415
1416   start = priv->current_rstart;
1417
1418   /* if Quality-of-Service disabled, do nothing */
1419   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
1420     return;
1421
1422   stop = priv->current_rstop;
1423   jitter = priv->current_jitter;
1424
1425   /* this is the time the buffer entered the sink */
1426   entered = start + jitter;
1427   /* this is the time the buffer left the sink */
1428   left = start + (jitter < 0 ? 0 : jitter);
1429
1430   /* calculate duration of the buffer */
1431   if (stop != -1)
1432     duration = stop - start;
1433   else
1434     duration = -1;
1435
1436   /* if we have the time when the last buffer left us, calculate
1437    * processing time */
1438   if (priv->last_left != -1) {
1439     if (entered > priv->last_left) {
1440       pt = entered - priv->last_left;
1441     } else {
1442       pt = 0;
1443     }
1444   } else {
1445     pt = priv->avg_pt;
1446   }
1447
1448   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
1449       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
1450       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
1451       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
1452       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
1453       jitter);
1454
1455   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
1456       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
1457       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
1458       priv->avg_rate);
1459
1460   /* collect running averages. for first observations, we copy the
1461    * values */
1462   if (priv->avg_duration == -1)
1463     priv->avg_duration = duration;
1464   else
1465     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
1466
1467   if (priv->avg_pt == -1)
1468     priv->avg_pt = pt;
1469   else
1470     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
1471
1472   if (priv->avg_duration != 0)
1473     rate =
1474         gst_guint64_to_gdouble (priv->avg_pt) /
1475         gst_guint64_to_gdouble (priv->avg_duration);
1476   else
1477     rate = 0.0;
1478
1479   if (priv->last_left != -1) {
1480     if (dropped || priv->avg_rate < 0.0) {
1481       priv->avg_rate = rate;
1482     } else {
1483       if (rate > 1.0)
1484         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
1485       else
1486         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
1487     }
1488   }
1489
1490   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
1491       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
1492       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
1493       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
1494
1495
1496   /* if we have a valid rate, start sending QoS messages */
1497   if (priv->avg_rate >= 0.0) {
1498     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
1499         priv->current_jitter);
1500   }
1501
1502   /* record when this buffer will leave us */
1503   priv->last_left = left;
1504 }
1505
1506 /* reset all qos measuring */
1507 static void
1508 gst_base_sink_reset_qos (GstBaseSink * sink)
1509 {
1510   GstBaseSinkPrivate *priv;
1511
1512   priv = sink->priv;
1513
1514   priv->last_in_time = -1;
1515   priv->last_left = -1;
1516   priv->avg_duration = -1;
1517   priv->avg_pt = -1;
1518   priv->avg_rate = -1.0;
1519   priv->avg_render = -1;
1520   priv->rendered = 0;
1521   priv->dropped = 0;
1522 }
1523
1524 /* Checks if the object was scheduled too late.
1525  *
1526  * start/stop contain the raw timestamp start and stop values
1527  * of the object.
1528  *
1529  * status and jitter contain the return values from the clock wait.
1530  *
1531  * returns TRUE if the buffer was too late.
1532  */
1533 static gboolean
1534 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
1535     GstClockTime start, GstClockTime stop,
1536     GstClockReturn status, GstClockTimeDiff jitter)
1537 {
1538   gboolean late;
1539   gint64 max_lateness;
1540   GstBaseSinkPrivate *priv;
1541
1542   priv = basesink->priv;
1543
1544   late = FALSE;
1545
1546   /* only for objects that were too late */
1547   if (G_LIKELY (status != GST_CLOCK_EARLY))
1548     goto in_time;
1549
1550   max_lateness = basesink->abidata.ABI.max_lateness;
1551
1552   /* check if frame dropping is enabled */
1553   if (max_lateness == -1)
1554     goto no_drop;
1555
1556   /* only check for buffers */
1557   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
1558     goto not_buffer;
1559
1560   /* can't do check if we don't have a timestamp */
1561   if (G_UNLIKELY (start == -1))
1562     goto no_timestamp;
1563
1564   /* we can add a valid stop time */
1565   if (stop != -1)
1566     max_lateness += stop;
1567   else
1568     max_lateness += start;
1569
1570   /* if the jitter bigger than duration and lateness we are too late */
1571   if ((late = start + jitter > max_lateness)) {
1572     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
1573         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
1574         GST_TIME_ARGS (max_lateness));
1575     /* !!emergency!!, if we did not receive anything valid for more than a 
1576      * second, render it anyway so the user sees something */
1577     if (priv->last_in_time && start - priv->last_in_time > GST_SECOND) {
1578       late = FALSE;
1579       GST_DEBUG_OBJECT (basesink,
1580           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
1581           GST_TIME_ARGS (priv->last_in_time));
1582     }
1583   }
1584
1585 done:
1586   if (!late) {
1587     priv->last_in_time = start;
1588   }
1589   return late;
1590
1591   /* all is fine */
1592 in_time:
1593   {
1594     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
1595     goto done;
1596   }
1597 no_drop:
1598   {
1599     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
1600     goto done;
1601   }
1602 not_buffer:
1603   {
1604     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
1605     return FALSE;
1606   }
1607 no_timestamp:
1608   {
1609     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
1610     return FALSE;
1611   }
1612 }
1613
1614 static void
1615 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
1616 {
1617   GstBaseSinkPrivate *priv;
1618
1619   priv = basesink->priv;
1620
1621   if (start) {
1622     g_get_current_time (&priv->start);
1623   } else {
1624     GstClockTime elapsed;
1625
1626     g_get_current_time (&priv->stop);
1627
1628     elapsed =
1629         GST_TIMEVAL_TO_TIME (priv->stop) - GST_TIMEVAL_TO_TIME (priv->start);
1630
1631     if (priv->avg_render == -1)
1632       priv->avg_render = elapsed;
1633     else
1634       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
1635
1636     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
1637         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
1638   }
1639 }
1640
1641 /* with STREAM_LOCK, PREROLL_LOCK,
1642  *
1643  * Synchronize the object on the clock and then render it.
1644  *
1645  * takes ownership of obj.
1646  */
1647 static GstFlowReturn
1648 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
1649     GstMiniObject * obj)
1650 {
1651   GstFlowReturn ret = GST_FLOW_OK;
1652   GstBaseSinkClass *bclass;
1653   gboolean late = FALSE;
1654   GstBaseSinkPrivate *priv;
1655
1656   priv = basesink->priv;
1657
1658   /* synchronize this object, non syncable objects return OK
1659    * immediatly. */
1660   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
1661   if (G_UNLIKELY (ret != GST_FLOW_OK))
1662     goto sync_failed;
1663
1664   /* and now render, event or buffer. */
1665   if (G_LIKELY (GST_IS_BUFFER (obj))) {
1666     /* drop late buffers unconditionally, let's hope it's unlikely */
1667     if (G_UNLIKELY (late))
1668       goto dropped;
1669
1670     bclass = GST_BASE_SINK_GET_CLASS (basesink);
1671
1672     if (G_LIKELY (bclass->render)) {
1673       gint do_qos;
1674
1675       /* read once, to get same value before and after */
1676       do_qos = g_atomic_int_get (&priv->qos_enabled);
1677
1678       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
1679
1680       /* record rendering time for QoS and stats */
1681       if (do_qos)
1682         gst_base_sink_do_render_stats (basesink, TRUE);
1683
1684       ret = bclass->render (basesink, GST_BUFFER_CAST (obj));
1685
1686       priv->rendered++;
1687
1688       if (do_qos)
1689         gst_base_sink_do_render_stats (basesink, FALSE);
1690     }
1691   } else {
1692     GstEvent *event = GST_EVENT_CAST (obj);
1693     gboolean event_res = TRUE;
1694     GstEventType type;
1695
1696     bclass = GST_BASE_SINK_GET_CLASS (basesink);
1697
1698     type = GST_EVENT_TYPE (event);
1699
1700     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
1701         gst_event_type_get_name (type));
1702
1703     if (bclass->event)
1704       event_res = bclass->event (basesink, event);
1705
1706     if (G_LIKELY (event_res)) {
1707       switch (type) {
1708         case GST_EVENT_EOS:
1709           /* the EOS event is completely handled so we mark
1710            * ourselves as being in the EOS state. eos is also 
1711            * protected by the object lock so we can read it when 
1712            * answering the POSITION query. */
1713           GST_OBJECT_LOCK (basesink);
1714           basesink->eos = TRUE;
1715           GST_OBJECT_UNLOCK (basesink);
1716           /* ok, now we can post the message */
1717           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
1718           gst_element_post_message (GST_ELEMENT_CAST (basesink),
1719               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
1720           break;
1721         case GST_EVENT_NEWSEGMENT:
1722           /* configure the segment */
1723           gst_base_sink_configure_segment (basesink, pad, event,
1724               &basesink->segment);
1725         default:
1726           break;
1727       }
1728     }
1729   }
1730
1731 done:
1732   gst_base_sink_perform_qos (basesink, late);
1733
1734   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
1735   gst_mini_object_unref (obj);
1736
1737   return ret;
1738
1739   /* ERRORS */
1740 sync_failed:
1741   {
1742     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
1743     goto done;
1744   }
1745 dropped:
1746   {
1747     priv->dropped++;
1748     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
1749     goto done;
1750   }
1751 }
1752
1753 /* with STREAM_LOCK, PREROLL_LOCK
1754  *
1755  * Perform preroll on the given object. For buffers this means 
1756  * calling the preroll subclass method. 
1757  * If that succeeds, the state will be commited.
1758  *
1759  * function does not take ownership of obj.
1760  */
1761 static GstFlowReturn
1762 gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad,
1763     GstMiniObject * obj)
1764 {
1765   GstFlowReturn ret;
1766
1767   GST_DEBUG_OBJECT (basesink, "do preroll %p", obj);
1768
1769   /* if it's a buffer, we need to call the preroll method */
1770   if (G_LIKELY (GST_IS_BUFFER (obj))) {
1771     GstBaseSinkClass *bclass;
1772     GstBuffer *buf = GST_BUFFER_CAST (obj);
1773
1774     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
1775         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
1776
1777     bclass = GST_BASE_SINK_GET_CLASS (basesink);
1778     if (bclass->preroll)
1779       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
1780         goto preroll_failed;
1781   }
1782
1783   /* commit state */
1784   if (G_LIKELY (basesink->playing_async)) {
1785     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
1786       goto stopping;
1787   }
1788
1789   return GST_FLOW_OK;
1790
1791   /* ERRORS */
1792 preroll_failed:
1793   {
1794     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
1795     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
1796     return ret;
1797   }
1798 stopping:
1799   {
1800     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
1801     return GST_FLOW_WRONG_STATE;
1802   }
1803 }
1804
1805 /* with STREAM_LOCK, PREROLL_LOCK 
1806  *
1807  * Queue an object for rendering.
1808  * The first prerollable object queued will complete the preroll. If the
1809  * preroll queue if filled, we render all the objects in the queue.
1810  *
1811  * This function takes ownership of the object.
1812  */
1813 static GstFlowReturn
1814 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
1815     GstMiniObject * obj, gboolean prerollable)
1816 {
1817   GstFlowReturn ret = GST_FLOW_OK;
1818   gint length;
1819   GQueue *q;
1820
1821   if (G_UNLIKELY (basesink->need_preroll)) {
1822     if (G_LIKELY (prerollable))
1823       basesink->preroll_queued++;
1824
1825     length = basesink->preroll_queued;
1826
1827     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
1828
1829     /* first prerollable item needs to finish the preroll */
1830     if (length == 1) {
1831       ret = gst_base_sink_preroll_object (basesink, pad, obj);
1832       if (G_UNLIKELY (ret != GST_FLOW_OK))
1833         goto preroll_failed;
1834     }
1835     /* need to recheck if we need preroll, commmit state during preroll 
1836      * could have made us not need more preroll. */
1837     if (G_UNLIKELY (basesink->need_preroll)) {
1838       /* see if we can render now. */
1839       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
1840         goto more_preroll;
1841     }
1842   }
1843
1844   /* we can start rendering (or blocking) the queued object
1845    * if any. */
1846   q = basesink->preroll_queue;
1847   while (G_UNLIKELY (!g_queue_is_empty (q))) {
1848     GstMiniObject *o;
1849
1850     o = g_queue_pop_head (q);
1851     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
1852
1853     /* do something with the return value */
1854     ret = gst_base_sink_render_object (basesink, pad, o);
1855     if (ret != GST_FLOW_OK)
1856       goto dequeue_failed;
1857   }
1858
1859   /* now render the object */
1860   ret = gst_base_sink_render_object (basesink, pad, obj);
1861   basesink->preroll_queued = 0;
1862
1863   return ret;
1864
1865   /* special cases */
1866 preroll_failed:
1867   {
1868     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
1869         gst_flow_get_name (ret));
1870     gst_mini_object_unref (obj);
1871     return ret;
1872   }
1873 more_preroll:
1874   {
1875     /* add object to the queue and return */
1876     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
1877         length, basesink->preroll_queue_max_len);
1878     g_queue_push_tail (basesink->preroll_queue, obj);
1879     return GST_FLOW_OK;
1880   }
1881 dequeue_failed:
1882   {
1883     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
1884         gst_flow_get_name (ret));
1885     gst_mini_object_unref (obj);
1886     return ret;
1887   }
1888 }
1889
1890 /* with STREAM_LOCK
1891  *
1892  * This function grabs the PREROLL_LOCK and adds the object to
1893  * the queue.
1894  *
1895  * This function takes ownership of obj.
1896  */
1897 static GstFlowReturn
1898 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
1899     GstMiniObject * obj, gboolean prerollable)
1900 {
1901   GstFlowReturn ret;
1902
1903   GST_PAD_PREROLL_LOCK (pad);
1904   if (G_UNLIKELY (basesink->flushing))
1905     goto flushing;
1906
1907   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
1908   GST_PAD_PREROLL_UNLOCK (pad);
1909
1910   return ret;
1911
1912   /* ERRORS */
1913 flushing:
1914   {
1915     GST_DEBUG_OBJECT (basesink, "sink is flushing");
1916     GST_PAD_PREROLL_UNLOCK (pad);
1917     gst_mini_object_unref (obj);
1918     return GST_FLOW_WRONG_STATE;
1919   }
1920 }
1921
1922 static gboolean
1923 gst_base_sink_event (GstPad * pad, GstEvent * event)
1924 {
1925   GstBaseSink *basesink;
1926   gboolean result = TRUE;
1927   GstBaseSinkClass *bclass;
1928
1929   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
1930
1931   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1932
1933   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
1934       GST_EVENT_TYPE_NAME (event));
1935
1936   switch (GST_EVENT_TYPE (event)) {
1937     case GST_EVENT_EOS:
1938     {
1939       GstFlowReturn ret;
1940
1941       /* EOS is a prerollable object */
1942       ret =
1943           gst_base_sink_queue_object (basesink, pad,
1944           GST_MINI_OBJECT_CAST (event), TRUE);
1945
1946       if (G_UNLIKELY (ret != GST_FLOW_OK))
1947         result = FALSE;
1948       break;
1949     }
1950     case GST_EVENT_NEWSEGMENT:
1951     {
1952       GstFlowReturn ret;
1953
1954       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
1955
1956       basesink->have_newsegment = TRUE;
1957
1958       /* the new segment is a non prerollable item and does not block anything,
1959        * we need to configure the current clipping segment and insert the event 
1960        * in the queue to serialize it with the buffers for rendering. */
1961       gst_base_sink_configure_segment (basesink, pad, event,
1962           basesink->abidata.ABI.clip_segment);
1963
1964       ret =
1965           gst_base_sink_queue_object (basesink, pad,
1966           GST_MINI_OBJECT_CAST (event), FALSE);
1967       if (G_UNLIKELY (ret != GST_FLOW_OK))
1968         result = FALSE;
1969       break;
1970     }
1971     case GST_EVENT_FLUSH_START:
1972       if (bclass->event)
1973         bclass->event (basesink, event);
1974
1975       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
1976
1977       /* make sure we are not blocked on the clock also clear any pending
1978        * eos state. */
1979       gst_base_sink_set_flushing (basesink, pad, TRUE);
1980
1981       /* we grab the stream lock but that is not needed since setting the
1982        * sink to flushing would make sure no state commit is being done
1983        * anymore */
1984       GST_PAD_STREAM_LOCK (pad);
1985       gst_base_sink_reset_qos (basesink);
1986       /* and we need to commit our state again on the next
1987        * prerolled buffer */
1988       basesink->playing_async = TRUE;
1989       gst_element_lost_state (GST_ELEMENT_CAST (basesink));
1990       GST_PAD_STREAM_UNLOCK (pad);
1991
1992       gst_event_unref (event);
1993       break;
1994     case GST_EVENT_FLUSH_STOP:
1995       if (bclass->event)
1996         bclass->event (basesink, event);
1997
1998       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
1999
2000       /* unset flushing so we can accept new data */
2001       gst_base_sink_set_flushing (basesink, pad, FALSE);
2002
2003       /* we need new segment info after the flush. */
2004       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2005       gst_segment_init (basesink->abidata.ABI.clip_segment,
2006           GST_FORMAT_UNDEFINED);
2007       basesink->have_newsegment = FALSE;
2008
2009       gst_event_unref (event);
2010       break;
2011     default:
2012       /* other events are sent to queue or subclass depending on if they
2013        * are serialized. */
2014       if (GST_EVENT_IS_SERIALIZED (event)) {
2015         gst_base_sink_queue_object (basesink, pad,
2016             GST_MINI_OBJECT_CAST (event), FALSE);
2017       } else {
2018         if (bclass->event)
2019           bclass->event (basesink, event);
2020         gst_event_unref (event);
2021       }
2022       break;
2023   }
2024   gst_object_unref (basesink);
2025
2026   return result;
2027 }
2028
2029 /* default implementation to calculate the start and end
2030  * timestamps on a buffer, subclasses can override
2031  */
2032 static void
2033 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
2034     GstClockTime * start, GstClockTime * end)
2035 {
2036   GstClockTime timestamp, duration;
2037
2038   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2039   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2040
2041     /* get duration to calculate end time */
2042     duration = GST_BUFFER_DURATION (buffer);
2043     if (GST_CLOCK_TIME_IS_VALID (duration)) {
2044       *end = timestamp + duration;
2045     }
2046     *start = timestamp;
2047   }
2048 }
2049
2050 /* must be called with PREROLL_LOCK */
2051 static gboolean
2052 gst_base_sink_is_prerolled (GstBaseSink * basesink)
2053 {
2054   gboolean res;
2055
2056   res = basesink->have_preroll || basesink->eos;
2057   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => prerolled: %d",
2058       basesink->have_preroll, basesink->eos, res);
2059   return res;
2060 }
2061
2062 /* with STREAM_LOCK, PREROLL_LOCK 
2063  *
2064  * Takes a buffer and compare the timestamps with the last segment.
2065  * If the buffer falls outside of the segment boundaries, drop it.
2066  * Else queue the buffer for preroll and rendering.
2067  *
2068  * This function takes ownership of the buffer.
2069  */
2070 static GstFlowReturn
2071 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
2072     GstBuffer * buf)
2073 {
2074   GstFlowReturn result;
2075   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
2076   GstSegment *clip_segment;
2077
2078   if (G_UNLIKELY (basesink->flushing))
2079     goto flushing;
2080
2081   /* for code clarity */
2082   clip_segment = basesink->abidata.ABI.clip_segment;
2083
2084   if (G_UNLIKELY (!basesink->have_newsegment)) {
2085     gboolean sync;
2086
2087     sync = gst_base_sink_get_sync (basesink);
2088     if (sync) {
2089       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
2090           (_("Internal data flow problem.")),
2091           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
2092     }
2093
2094     basesink->have_newsegment = TRUE;
2095     /* this means this sink will assume timestamps start from 0 */
2096     clip_segment->start = 0;
2097     clip_segment->stop = -1;
2098     basesink->segment.start = 0;
2099     basesink->segment.stop = -1;
2100   }
2101
2102   /* check if the buffer needs to be dropped */
2103   /* we don't use the subclassed method as it may not return
2104    * valid values for our purpose here */
2105   gst_base_sink_get_times (basesink, buf, &start, &end);
2106
2107   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2108       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
2109
2110   /* a dropped buffer does not participate in anything */
2111   if (GST_CLOCK_TIME_IS_VALID (start) &&
2112       (clip_segment->format == GST_FORMAT_TIME)) {
2113     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
2114                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
2115       goto out_of_segment;
2116   }
2117
2118   /* now we can process the buffer in the queue, this function takes ownership
2119    * of the buffer */
2120   result = gst_base_sink_queue_object_unlocked (basesink, pad,
2121       GST_MINI_OBJECT_CAST (buf), TRUE);
2122
2123   return result;
2124
2125   /* ERRORS */
2126 flushing:
2127   {
2128     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2129     gst_buffer_unref (buf);
2130     return GST_FLOW_WRONG_STATE;
2131   }
2132 out_of_segment:
2133   {
2134     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
2135     gst_buffer_unref (buf);
2136     return GST_FLOW_OK;
2137   }
2138 }
2139
2140 /* with STREAM_LOCK
2141  */
2142 static GstFlowReturn
2143 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
2144 {
2145   GstBaseSink *basesink;
2146   GstFlowReturn result;
2147
2148   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2149
2150   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
2151     goto wrong_mode;
2152
2153   GST_PAD_PREROLL_LOCK (pad);
2154   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2155   GST_PAD_PREROLL_UNLOCK (pad);
2156
2157 done:
2158   gst_object_unref (basesink);
2159
2160   return result;
2161
2162   /* ERRORS */
2163 wrong_mode:
2164   {
2165     GST_OBJECT_LOCK (pad);
2166     GST_WARNING_OBJECT (basesink,
2167         "Push on pad %s:%s, but it was not activated in push mode",
2168         GST_DEBUG_PAD_NAME (pad));
2169     GST_OBJECT_UNLOCK (pad);
2170     gst_buffer_unref (buf);
2171     /* we don't post an error message this will signal to the peer
2172      * pushing that EOS is reached. */
2173     result = GST_FLOW_UNEXPECTED;
2174     goto done;
2175   }
2176 }
2177
2178 /* with STREAM_LOCK
2179  */
2180 static void
2181 gst_base_sink_loop (GstPad * pad)
2182 {
2183   GstBaseSink *basesink;
2184   GstBuffer *buf = NULL;
2185   GstFlowReturn result;
2186
2187   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2188
2189   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
2190
2191   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
2192   if (G_UNLIKELY (result != GST_FLOW_OK))
2193     goto paused;
2194
2195   if (G_UNLIKELY (buf == NULL))
2196     goto no_buffer;
2197
2198   basesink->offset += GST_BUFFER_SIZE (buf);
2199
2200   GST_PAD_PREROLL_LOCK (pad);
2201   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2202   GST_PAD_PREROLL_UNLOCK (pad);
2203   if (G_UNLIKELY (result != GST_FLOW_OK))
2204     goto paused;
2205
2206   gst_object_unref (basesink);
2207
2208   return;
2209
2210   /* ERRORS */
2211 paused:
2212   {
2213     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
2214         gst_flow_get_name (result));
2215     gst_pad_pause_task (pad);
2216     /* fatal errors and NOT_LINKED cause EOS */
2217     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
2218       gst_base_sink_event (pad, gst_event_new_eos ());
2219       /* EOS does not cause an ERROR message */
2220       if (result != GST_FLOW_UNEXPECTED) {
2221         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
2222             (_("Internal data stream error.")),
2223             ("stream stopped, reason %s", gst_flow_get_name (result)));
2224       }
2225     }
2226     gst_object_unref (basesink);
2227     return;
2228   }
2229 no_buffer:
2230   {
2231     GST_LOG_OBJECT (basesink, "no buffer, pausing");
2232     result = GST_FLOW_ERROR;
2233     goto paused;
2234   }
2235 }
2236
2237 static gboolean
2238 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
2239     gboolean flushing)
2240 {
2241
2242   if (flushing) {
2243     GstBaseSinkClass *bclass;
2244
2245     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2246
2247     /* unlock any subclasses, we need to do this before grabbing the
2248      * PREROLL_LOCK since we hold this lock before going into ::render. */
2249     if (bclass->unlock)
2250       bclass->unlock (basesink);
2251   }
2252
2253   GST_PAD_PREROLL_LOCK (pad);
2254   basesink->flushing = flushing;
2255   if (flushing) {
2256     /* step 1, unblock clock sync (if any) or any other blocking thing */
2257     basesink->need_preroll = TRUE;
2258     if (basesink->clock_id) {
2259       gst_clock_id_unschedule (basesink->clock_id);
2260     }
2261
2262     /* flush out the data thread if it's locked in finish_preroll */
2263     GST_DEBUG_OBJECT (basesink,
2264         "flushing out data thread, need preroll to TRUE");
2265     gst_base_sink_preroll_queue_flush (basesink, pad);
2266   }
2267   GST_PAD_PREROLL_UNLOCK (pad);
2268
2269   return TRUE;
2270 }
2271
2272 static gboolean
2273 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
2274 {
2275   gboolean result;
2276
2277   if (active) {
2278     /* start task */
2279     result = gst_pad_start_task (basesink->sinkpad,
2280         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
2281   } else {
2282     /* step 2, make sure streaming finishes */
2283     result = gst_pad_stop_task (basesink->sinkpad);
2284   }
2285
2286   return result;
2287 }
2288
2289 static gboolean
2290 gst_base_sink_pad_activate (GstPad * pad)
2291 {
2292   gboolean result = FALSE;
2293   GstBaseSink *basesink;
2294
2295   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2296
2297   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
2298
2299   gst_base_sink_set_flushing (basesink, pad, FALSE);
2300
2301   if (basesink->can_activate_pull && gst_pad_check_pull_range (pad)
2302       && gst_pad_activate_pull (pad, TRUE)) {
2303     GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
2304     result = TRUE;
2305   } else {
2306     GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
2307     if (gst_pad_activate_push (pad, TRUE)) {
2308       GST_DEBUG_OBJECT (basesink, "Success activating push mode");
2309       result = TRUE;
2310     }
2311   }
2312
2313   if (!result) {
2314     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
2315     gst_base_sink_set_flushing (basesink, pad, TRUE);
2316   }
2317
2318   gst_object_unref (basesink);
2319
2320   return result;
2321 }
2322
2323 static gboolean
2324 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
2325 {
2326   gboolean result;
2327   GstBaseSink *basesink;
2328
2329   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2330
2331   if (active) {
2332     if (!basesink->can_activate_push) {
2333       result = FALSE;
2334       basesink->pad_mode = GST_ACTIVATE_NONE;
2335     } else {
2336       result = TRUE;
2337       basesink->pad_mode = GST_ACTIVATE_PUSH;
2338     }
2339   } else {
2340     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
2341       g_warning ("Internal GStreamer activation error!!!");
2342       result = FALSE;
2343     } else {
2344       gst_base_sink_set_flushing (basesink, pad, TRUE);
2345       result = TRUE;
2346       basesink->pad_mode = GST_ACTIVATE_NONE;
2347     }
2348   }
2349
2350   gst_object_unref (basesink);
2351
2352   return result;
2353 }
2354
2355 static gboolean
2356 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
2357 {
2358   GstCaps *caps;
2359   GstPad *pad;
2360
2361   GST_OBJECT_LOCK (basesink);
2362   pad = basesink->sinkpad;
2363   gst_object_ref (pad);
2364   GST_OBJECT_UNLOCK (basesink);
2365
2366   caps = gst_pad_get_allowed_caps (pad);
2367   if (gst_caps_is_empty (caps))
2368     goto no_caps_possible;
2369
2370   caps = gst_caps_make_writable (caps);
2371   gst_caps_truncate (caps);
2372   gst_pad_fixate_caps (pad, caps);
2373
2374   if (gst_caps_is_any (caps)) {
2375     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
2376         "allowing pull()");
2377     /* neither side has template caps in this case, so they are prepared for
2378        pull() without setcaps() */
2379   } else {
2380     if (!gst_pad_set_caps (pad, caps))
2381       goto could_not_set_caps;
2382   }
2383
2384   gst_caps_unref (caps);
2385   gst_object_unref (pad);
2386
2387   return TRUE;
2388
2389 no_caps_possible:
2390   {
2391     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
2392     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
2393     gst_object_unref (pad);
2394     return FALSE;
2395   }
2396 could_not_set_caps:
2397   {
2398     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
2399     gst_caps_unref (caps);
2400     gst_object_unref (pad);
2401     return FALSE;
2402   }
2403 }
2404
2405 /* this won't get called until we implement an activate function */
2406 static gboolean
2407 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
2408 {
2409   gboolean result = FALSE;
2410   GstBaseSink *basesink;
2411   GstBaseSinkClass *bclass;
2412
2413   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2414   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2415
2416   if (active) {
2417     if (!basesink->can_activate_pull) {
2418       result = FALSE;
2419       basesink->pad_mode = GST_ACTIVATE_NONE;
2420     } else {
2421       GstPad *peer = gst_pad_get_peer (pad);
2422
2423       if (G_UNLIKELY (peer == NULL)) {
2424         g_warning ("Trying to activate pad in pull mode, but no peer");
2425         result = FALSE;
2426         basesink->pad_mode = GST_ACTIVATE_NONE;
2427       } else {
2428         if (gst_pad_activate_pull (peer, TRUE)) {
2429           /* we mark we have a newsegment here because pull based
2430            * mode works just fine without having a newsegment before the
2431            * first buffer */
2432           gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2433           gst_segment_init (basesink->abidata.ABI.clip_segment,
2434               GST_FORMAT_UNDEFINED);
2435           basesink->have_newsegment = TRUE;
2436
2437           /* set the pad mode before starting the task so that it's in the
2438              correct state for the new thread. also the sink set_caps function
2439              checks this */
2440           basesink->pad_mode = GST_ACTIVATE_PULL;
2441           if ((result = gst_base_sink_negotiate_pull (basesink))) {
2442             if (bclass->activate_pull)
2443               result = bclass->activate_pull (basesink, TRUE);
2444             else
2445               result = FALSE;
2446           }
2447           /* but if starting the thread fails, set it back */
2448           if (!result)
2449             basesink->pad_mode = GST_ACTIVATE_NONE;
2450         } else {
2451           GST_DEBUG_OBJECT (pad, "Failed to activate peer in pull mode");
2452           result = FALSE;
2453           basesink->pad_mode = GST_ACTIVATE_NONE;
2454         }
2455         gst_object_unref (peer);
2456       }
2457     }
2458   } else {
2459     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
2460       g_warning ("Internal GStreamer activation error!!!");
2461       result = FALSE;
2462     } else {
2463       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
2464       if (bclass->activate_pull)
2465         result &= bclass->activate_pull (basesink, FALSE);
2466       basesink->pad_mode = GST_ACTIVATE_NONE;
2467     }
2468   }
2469
2470   gst_object_unref (basesink);
2471
2472   return result;
2473 }
2474
2475 /* send an event to our sinkpad peer. */
2476 static gboolean
2477 gst_base_sink_send_event (GstElement * element, GstEvent * event)
2478 {
2479   GstPad *pad;
2480   GstBaseSink *basesink = GST_BASE_SINK (element);
2481   gboolean forward = TRUE, result = TRUE;
2482
2483   switch (GST_EVENT_TYPE (event)) {
2484     case GST_EVENT_LATENCY:
2485     {
2486       GstClockTime latency;
2487
2488       gst_event_parse_latency (event, &latency);
2489
2490       GST_OBJECT_LOCK (element);
2491       basesink->priv->latency = latency;
2492       GST_OBJECT_UNLOCK (element);
2493       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
2494           GST_TIME_ARGS (latency));
2495
2496       /* don't forward, yet */
2497       forward = FALSE;
2498       gst_event_unref (event);
2499       break;
2500     }
2501     default:
2502       break;
2503   }
2504
2505   if (forward) {
2506     GST_OBJECT_LOCK (element);
2507     pad = gst_object_ref (basesink->sinkpad);
2508     GST_OBJECT_UNLOCK (element);
2509
2510     result = gst_pad_push_event (pad, event);
2511
2512     gst_object_unref (pad);
2513   }
2514   return result;
2515 }
2516
2517 static gboolean
2518 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
2519 {
2520   GstPad *peer;
2521   gboolean res = FALSE;
2522
2523   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
2524     res = gst_pad_query (peer, query);
2525     gst_object_unref (peer);
2526   }
2527   return res;
2528 }
2529
2530 /* get the end position of the last seen object, this is used
2531  * for EOS and for making sure that we don't report a position we
2532  * have not reached yet. */
2533 static gboolean
2534 gst_base_sink_get_position_last (GstBaseSink * basesink, gint64 * cur)
2535 {
2536   /* return last observed stream time */
2537   *cur = basesink->priv->current_sstop;
2538   return TRUE;
2539 }
2540
2541 /* get the position when we are PAUSED */
2542 /* FIXME, not entirely correct if we have preroll_queue_len > 1 and
2543  * there are multiple segments in the queue since we calculate on the
2544  * total segments, not the first one. */
2545 static gboolean
2546 gst_base_sink_get_position_paused (GstBaseSink * basesink, gint64 * cur)
2547 {
2548   *cur = basesink->priv->current_sstart;
2549
2550   if (*cur != -1)
2551     *cur = MAX (*cur, basesink->abidata.ABI.clip_segment->time);
2552   else
2553     *cur = basesink->abidata.ABI.clip_segment->time;
2554
2555   return TRUE;
2556 }
2557
2558 static gboolean
2559 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
2560     gint64 * cur)
2561 {
2562   GstClock *clock;
2563   gboolean res = FALSE;
2564
2565   switch (format) {
2566       /* we can answer time format */
2567     case GST_FORMAT_TIME:
2568     {
2569       GstClockTime now, base;
2570       gint64 time, accum, duration;
2571       gdouble rate;
2572       gint64 last;
2573
2574       GST_OBJECT_LOCK (basesink);
2575
2576       /* can only give answer based on the clock if not EOS */
2577       if (G_UNLIKELY (basesink->eos))
2578         goto in_eos;
2579
2580       /* in PAUSE we cannot read from the clock so we
2581        * report time based on the last seen timestamp. */
2582       if (GST_STATE (basesink) == GST_STATE_PAUSED)
2583         goto in_pause;
2584
2585       /* We get position from clock only in PLAYING, we checked
2586        * the PAUSED case above, so this is check is to test 
2587        * READY and NULL, where the position is always 0 */
2588       if (GST_STATE (basesink) != GST_STATE_PLAYING)
2589         goto wrong_state;
2590
2591       /* we need to sync on the clock. */
2592       if (basesink->sync == FALSE)
2593         goto no_sync;
2594
2595       /* and we need a clock */
2596       if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
2597         goto no_sync;
2598
2599       /* collect all data we need holding the lock */
2600       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
2601         time = basesink->segment.time;
2602       else
2603         time = 0;
2604
2605       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
2606         duration = basesink->segment.stop - basesink->segment.start;
2607       else
2608         duration = 0;
2609
2610       base = GST_ELEMENT_CAST (basesink)->base_time;
2611       accum = basesink->segment.accum;
2612       rate = basesink->segment.rate * basesink->segment.applied_rate;
2613       gst_base_sink_get_position_last (basesink, &last);
2614
2615       gst_object_ref (clock);
2616       /* need to release the object lock before we can get the time, 
2617        * a clock might take the LOCK of the provider, which could be
2618        * a basesink subclass. */
2619       GST_OBJECT_UNLOCK (basesink);
2620
2621       now = gst_clock_get_time (clock);
2622       /* subtract base time and accumulated time from the clock time. 
2623        * Make sure we don't go negative. This is the current time in
2624        * the segment which we need to scale with the combined 
2625        * rate and applied rate. */
2626       base += accum;
2627       base = MIN (now, base);
2628
2629       /* for negative rates we need to count back from from the segment
2630        * duration. */
2631       if (rate < 0.0)
2632         time += duration;
2633       *cur = time + gst_guint64_to_gdouble (now - base) * rate;
2634
2635       /* never report more than last seen position */
2636       if (last != -1)
2637         *cur = MIN (last, *cur);
2638
2639       gst_object_unref (clock);
2640
2641       res = TRUE;
2642
2643       GST_DEBUG_OBJECT (basesink,
2644           "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
2645           GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
2646           GST_TIME_ARGS (now), GST_TIME_ARGS (base),
2647           GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
2648     }
2649     default:
2650       /* cannot answer other than TIME, we return FALSE, which will
2651        * send the query upstream. */
2652       break;
2653   }
2654
2655 done:
2656   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
2657       res, GST_TIME_ARGS (*cur));
2658   return res;
2659
2660   /* special cases */
2661 in_eos:
2662   {
2663     res = gst_base_sink_get_position_last (basesink, cur);
2664     GST_OBJECT_UNLOCK (basesink);
2665     goto done;
2666   }
2667 in_pause:
2668   {
2669     res = gst_base_sink_get_position_paused (basesink, cur);
2670     GST_OBJECT_UNLOCK (basesink);
2671     goto done;
2672   }
2673 wrong_state:
2674   {
2675     /* in NULL or READY we always return 0 */
2676     res = TRUE;
2677     *cur = 0;
2678     GST_OBJECT_UNLOCK (basesink);
2679     goto done;
2680   }
2681 no_sync:
2682   {
2683     /* report last seen timestamp if any, else return FALSE so
2684      * that upstream can answer */
2685     if ((*cur = basesink->priv->current_sstart) != -1)
2686       res = TRUE;
2687     GST_OBJECT_UNLOCK (basesink);
2688     return res;
2689   }
2690 }
2691
2692 static gboolean
2693 gst_base_sink_query (GstElement * element, GstQuery * query)
2694 {
2695   gboolean res = FALSE;
2696
2697   GstBaseSink *basesink = GST_BASE_SINK (element);
2698
2699   switch (GST_QUERY_TYPE (query)) {
2700     case GST_QUERY_POSITION:
2701     {
2702       gint64 cur = 0;
2703       GstFormat format;
2704
2705       gst_query_parse_position (query, &format, NULL);
2706
2707       GST_DEBUG_OBJECT (basesink, "position format %d", format);
2708
2709       /* first try to get the position based on the clock */
2710       if ((res = gst_base_sink_get_position (basesink, format, &cur))) {
2711         gst_query_set_position (query, format, cur);
2712       } else {
2713         /* fallback to peer query */
2714         res = gst_base_sink_peer_query (basesink, query);
2715       }
2716       break;
2717     }
2718     case GST_QUERY_DURATION:
2719       GST_DEBUG_OBJECT (basesink, "duration query");
2720       res = gst_base_sink_peer_query (basesink, query);
2721       break;
2722     case GST_QUERY_LATENCY:
2723     {
2724       gboolean live, us_live;
2725       GstClockTime min, max;
2726
2727       if ((res =
2728               gst_base_sink_query_latency (basesink, &live, &us_live, &min,
2729                   &max))) {
2730         /* if we or the upstream elements are not live, we don't need latency
2731          * compensation */
2732         if (!live || !us_live) {
2733           GST_DEBUG_OBJECT (basesink,
2734               "no latency compensation, we or upstream are not live");
2735           min = 0;
2736           max = -1;
2737         }
2738         gst_query_set_latency (query, live, min, max);
2739       }
2740       break;
2741     }
2742     case GST_QUERY_JITTER:
2743       break;
2744     case GST_QUERY_RATE:
2745       //gst_query_set_rate (query, basesink->segment_rate);
2746       res = TRUE;
2747       break;
2748     case GST_QUERY_SEGMENT:
2749     {
2750       /* FIXME, bring start/stop to stream time */
2751       gst_query_set_segment (query, basesink->segment.rate,
2752           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
2753       break;
2754     }
2755     case GST_QUERY_SEEKING:
2756     case GST_QUERY_CONVERT:
2757     case GST_QUERY_FORMATS:
2758     default:
2759       res = gst_base_sink_peer_query (basesink, query);
2760       break;
2761   }
2762   return res;
2763 }
2764
2765 static GstStateChangeReturn
2766 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
2767 {
2768   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
2769   GstBaseSink *basesink = GST_BASE_SINK (element);
2770   GstBaseSinkClass *bclass;
2771
2772   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2773
2774   switch (transition) {
2775     case GST_STATE_CHANGE_NULL_TO_READY:
2776       if (bclass->start)
2777         if (!bclass->start (basesink))
2778           goto start_failed;
2779       break;
2780     case GST_STATE_CHANGE_READY_TO_PAUSED:
2781       /* need to complete preroll before this state change completes, there
2782        * is no data flow in READY so we can safely assume we need to preroll. */
2783       GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll");
2784       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2785       gst_segment_init (basesink->abidata.ABI.clip_segment,
2786           GST_FORMAT_UNDEFINED);
2787       basesink->have_newsegment = FALSE;
2788       basesink->offset = 0;
2789       basesink->have_preroll = FALSE;
2790       basesink->need_preroll = TRUE;
2791       basesink->playing_async = TRUE;
2792       basesink->priv->current_sstart = 0;
2793       basesink->priv->current_sstop = 0;
2794       basesink->priv->eos_rtime = -1;
2795       basesink->priv->latency = 0;
2796       basesink->eos = FALSE;
2797       gst_base_sink_reset_qos (basesink);
2798       ret = GST_STATE_CHANGE_ASYNC;
2799       break;
2800     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2801       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
2802       if (gst_base_sink_is_prerolled (basesink)) {
2803         /* no preroll needed anymore now. */
2804         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
2805         basesink->playing_async = FALSE;
2806         basesink->need_preroll = FALSE;
2807         if (basesink->eos) {
2808           /* need to post EOS message here */
2809           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2810           gst_element_post_message (GST_ELEMENT_CAST (basesink),
2811               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
2812         } else {
2813           GST_DEBUG_OBJECT (basesink, "signal preroll");
2814           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
2815         }
2816       } else {
2817         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll");
2818         basesink->need_preroll = TRUE;
2819         basesink->playing_async = TRUE;
2820         ret = GST_STATE_CHANGE_ASYNC;
2821       }
2822       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
2823       break;
2824     default:
2825       break;
2826   }
2827
2828   {
2829     GstStateChangeReturn bret;
2830
2831     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2832     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
2833       goto activate_failed;
2834   }
2835
2836   switch (transition) {
2837     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2838       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
2839       /* FIXME, make sure we cannot enter _render first */
2840
2841       /* we need to call ::unlock before locking PREROLL_LOCK
2842        * since we lock it before going into ::render */
2843       if (bclass->unlock)
2844         bclass->unlock (basesink);
2845
2846       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
2847       basesink->need_preroll = TRUE;
2848       if (basesink->clock_id) {
2849         gst_clock_id_unschedule (basesink->clock_id);
2850       }
2851
2852       /* if we don't have a preroll buffer we need to wait for a preroll and
2853        * return ASYNC. */
2854       if (gst_base_sink_is_prerolled (basesink)) {
2855         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
2856         basesink->playing_async = FALSE;
2857       } else {
2858         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll");
2859         basesink->playing_async = TRUE;
2860         ret = GST_STATE_CHANGE_ASYNC;
2861       }
2862       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
2863           ", dropped: %" G_GUINT64_FORMAT, basesink->priv->rendered,
2864           basesink->priv->dropped);
2865
2866       gst_base_sink_reset_qos (basesink);
2867       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
2868       break;
2869     case GST_STATE_CHANGE_PAUSED_TO_READY:
2870       basesink->priv->current_sstart = 0;
2871       basesink->priv->current_sstop = 0;
2872       break;
2873     case GST_STATE_CHANGE_READY_TO_NULL:
2874       if (bclass->stop)
2875         if (!bclass->stop (basesink)) {
2876           GST_WARNING_OBJECT (basesink, "failed to stop");
2877         }
2878       break;
2879     default:
2880       break;
2881   }
2882
2883   return ret;
2884
2885   /* ERRORS */
2886 start_failed:
2887   {
2888     GST_DEBUG_OBJECT (basesink, "failed to start");
2889     return GST_STATE_CHANGE_FAILURE;
2890   }
2891 activate_failed:
2892   {
2893     GST_DEBUG_OBJECT (basesink,
2894         "element failed to change states -- activation problem?");
2895     return GST_STATE_CHANGE_FAILURE;
2896   }
2897 }