libs/gst/base/gstbasesink.c: Implement more seeking in pull mode.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 /* FIXME, some stuff in ABI.data and other in Private...
156  * Make up your mind please.
157  */
158 struct _GstBaseSinkPrivate
159 {
160   gint qos_enabled;             /* ATOMIC */
161   gboolean async_enabled;
162   GstClockTimeDiff ts_offset;
163   GstClockTime render_delay;
164
165   /* start, stop of current buffer, stream time, used to report position */
166   GstClockTime current_sstart;
167   GstClockTime current_sstop;
168
169   /* start, stop and jitter of current buffer, running time */
170   GstClockTime current_rstart;
171   GstClockTime current_rstop;
172   GstClockTimeDiff current_jitter;
173
174   /* EOS sync time in running time */
175   GstClockTime eos_rtime;
176
177   /* last buffer that arrived in time, running time */
178   GstClockTime last_in_time;
179   /* when the last buffer left the sink, running time */
180   GstClockTime last_left;
181
182   /* running averages go here these are done on running time */
183   GstClockTime avg_pt;
184   GstClockTime avg_duration;
185   gdouble avg_rate;
186
187   /* these are done on system time. avg_jitter and avg_render are
188    * compared to eachother to see if the rendering time takes a
189    * huge amount of the processing, If so we are flooded with
190    * buffers. */
191   GstClockTime last_left_systime;
192   GstClockTime avg_jitter;
193   GstClockTime start, stop;
194   GstClockTime avg_render;
195
196   /* number of rendered and dropped frames */
197   guint64 rendered;
198   guint64 dropped;
199
200   /* latency stuff */
201   GstClockTime latency;
202
203   /* if we already commited the state */
204   gboolean commited;
205
206   /* when we received EOS */
207   gboolean received_eos;
208
209   /* when we are prerolled and able to report latency */
210   gboolean have_latency;
211
212   /* the last buffer we prerolled or rendered. Useful for making snapshots */
213   GstBuffer *last_buffer;
214
215   /* caps for pull based scheduling */
216   GstCaps *pull_caps;
217
218   guint blocksize;
219
220   gboolean discont;
221 };
222
223 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
224
225 /* generic running average, this has a neutral window size */
226 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
227
228 /* the windows for these running averages are experimentally obtained.
229  * possitive values get averaged more while negative values use a small
230  * window so we can react faster to badness. */
231 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
232 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
233
234 /* BaseSink properties */
235
236 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
237 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
238
239 #define DEFAULT_PREROLL_QUEUE_LEN       0
240 #define DEFAULT_SYNC                    TRUE
241 #define DEFAULT_MAX_LATENESS            -1
242 #define DEFAULT_QOS                     FALSE
243 #define DEFAULT_ASYNC                   TRUE
244 #define DEFAULT_TS_OFFSET               0
245 #define DEFAULT_BLOCKSIZE               4096
246
247 enum
248 {
249   PROP_0,
250   PROP_PREROLL_QUEUE_LEN,
251   PROP_SYNC,
252   PROP_MAX_LATENESS,
253   PROP_QOS,
254   PROP_ASYNC,
255   PROP_TS_OFFSET,
256   PROP_LAST_BUFFER,
257   PROP_BLOCKSIZE,
258   PROP_LAST
259 };
260
261 static GstElementClass *parent_class = NULL;
262
263 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
264 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
265 static void gst_base_sink_finalize (GObject * object);
266
267 GType
268 gst_base_sink_get_type (void)
269 {
270   static GType base_sink_type = 0;
271
272   if (G_UNLIKELY (base_sink_type == 0)) {
273     static const GTypeInfo base_sink_info = {
274       sizeof (GstBaseSinkClass),
275       NULL,
276       NULL,
277       (GClassInitFunc) gst_base_sink_class_init,
278       NULL,
279       NULL,
280       sizeof (GstBaseSink),
281       0,
282       (GInstanceInitFunc) gst_base_sink_init,
283     };
284
285     base_sink_type = g_type_register_static (GST_TYPE_ELEMENT,
286         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
287   }
288   return base_sink_type;
289 }
290
291 static void gst_base_sink_set_property (GObject * object, guint prop_id,
292     const GValue * value, GParamSpec * pspec);
293 static void gst_base_sink_get_property (GObject * object, guint prop_id,
294     GValue * value, GParamSpec * pspec);
295
296 static gboolean gst_base_sink_send_event (GstElement * element,
297     GstEvent * event);
298 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
299
300 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
301 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
302 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
303     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
304 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
305     GstClockTime * start, GstClockTime * end);
306 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
307     GstPad * pad, gboolean flushing);
308 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
309     gboolean active);
310 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
311     GstSegment * segment);
312 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
313     GstEvent * event, GstSegment * segment);
314
315 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
316     GstStateChange transition);
317
318 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
319 static void gst_base_sink_loop (GstPad * pad);
320 static gboolean gst_base_sink_pad_activate (GstPad * pad);
321 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
322 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
323 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
324 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
325
326 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
327
328 /* check if an object was too late */
329 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
330     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
331     GstClockReturn status, GstClockTimeDiff jitter);
332
333 static void
334 gst_base_sink_class_init (GstBaseSinkClass * klass)
335 {
336   GObjectClass *gobject_class;
337   GstElementClass *gstelement_class;
338
339   gobject_class = G_OBJECT_CLASS (klass);
340   gstelement_class = GST_ELEMENT_CLASS (klass);
341
342   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
343       "basesink element");
344
345   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
346
347   parent_class = g_type_class_peek_parent (klass);
348
349   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
350   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
351   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
352
353   /* FIXME, this next value should be configured using an event from the
354    * upstream element, ie, the BUFFER_SIZE event. */
355   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
356       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
357           "Number of buffers to queue during preroll", 0, G_MAXUINT,
358           DEFAULT_PREROLL_QUEUE_LEN,
359           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
360
361   g_object_class_install_property (gobject_class, PROP_SYNC,
362       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
366       g_param_spec_int64 ("max-lateness", "Max Lateness",
367           "Maximum number of nanoseconds that a buffer can be late before it "
368           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
369           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371   g_object_class_install_property (gobject_class, PROP_QOS,
372       g_param_spec_boolean ("qos", "Qos",
373           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   /**
376    * GstBaseSink:async
377    *
378    * If set to #TRUE, the basesink will perform asynchronous state changes.
379    * When set to #FALSE, the sink will not signal the parent when it prerolls.
380    * Use this option when dealing with sparse streams or when synchronisation is
381    * not required.
382    *
383    * Since: 0.10.15
384    */
385   g_object_class_install_property (gobject_class, PROP_ASYNC,
386       g_param_spec_boolean ("async", "Async",
387           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
388           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389   /**
390    * GstBaseSink:ts-offset
391    *
392    * Controls the final synchronisation, a negative value will render the buffer
393    * earlier while a positive value delays playback. This property can be 
394    * used to fix synchronisation in bad files.
395    *
396    * Since: 0.10.15
397    */
398   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
399       g_param_spec_int64 ("ts-offset", "TS Offset",
400           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
401           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
402
403   /**
404    * GstBaseSink:last-buffer
405    *
406    * The last buffer that arrived in the sink and was used for preroll or for
407    * rendering. This property can be used to generate thumbnails. This property
408    * can be NULL when the sink has not yet received a bufer.
409    *
410    * Since: 0.10.15
411    */
412   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
413       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
414           "The last buffer received in the sink", GST_TYPE_BUFFER,
415           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
416
417   /**
418    * GstBaseSink:blocksize
419    *
420    * The amount of bytes to pull when operating in pull mode.
421    *
422    * Since: 0.10.22
423    */
424   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
425       g_param_spec_uint ("blocksize", "Block size",
426           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
427           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428
429   gstelement_class->change_state =
430       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
431   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
432   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
433
434   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
435   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
436   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
437   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
438   klass->activate_pull =
439       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
440 }
441
442 static GstCaps *
443 gst_base_sink_pad_getcaps (GstPad * pad)
444 {
445   GstBaseSinkClass *bclass;
446   GstBaseSink *bsink;
447   GstCaps *caps = NULL;
448
449   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
450   bclass = GST_BASE_SINK_GET_CLASS (bsink);
451
452   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
453     /* if we are operating in pull mode we only accept the negotiated caps */
454     GST_OBJECT_LOCK (pad);
455     if ((caps = GST_PAD_CAPS (pad)))
456       gst_caps_ref (caps);
457     GST_OBJECT_UNLOCK (pad);
458   }
459   if (caps == NULL) {
460     if (bclass->get_caps)
461       caps = bclass->get_caps (bsink);
462
463     if (caps == NULL) {
464       GstPadTemplate *pad_template;
465
466       pad_template =
467           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
468           "sink");
469       if (pad_template != NULL) {
470         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
471       }
472     }
473   }
474   gst_object_unref (bsink);
475
476   return caps;
477 }
478
479 static gboolean
480 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
481 {
482   GstBaseSinkClass *bclass;
483   GstBaseSink *bsink;
484   gboolean res = TRUE;
485
486   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
487   bclass = GST_BASE_SINK_GET_CLASS (bsink);
488
489   if (res && bclass->set_caps)
490     res = bclass->set_caps (bsink, caps);
491
492   gst_object_unref (bsink);
493
494   return res;
495 }
496
497 static void
498 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
499 {
500   GstBaseSinkClass *bclass;
501   GstBaseSink *bsink;
502
503   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
504   bclass = GST_BASE_SINK_GET_CLASS (bsink);
505
506   if (bclass->fixate)
507     bclass->fixate (bsink, caps);
508
509   gst_object_unref (bsink);
510 }
511
512 static GstFlowReturn
513 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
514     GstCaps * caps, GstBuffer ** buf)
515 {
516   GstBaseSinkClass *bclass;
517   GstBaseSink *bsink;
518   GstFlowReturn result = GST_FLOW_OK;
519
520   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
521   bclass = GST_BASE_SINK_GET_CLASS (bsink);
522
523   if (bclass->buffer_alloc)
524     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
525   else
526     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
527
528   gst_object_unref (bsink);
529
530   return result;
531 }
532
533 static void
534 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
535 {
536   GstPadTemplate *pad_template;
537   GstBaseSinkPrivate *priv;
538
539   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
540
541   pad_template =
542       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
543   g_return_if_fail (pad_template != NULL);
544
545   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
546
547   gst_pad_set_getcaps_function (basesink->sinkpad,
548       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
549   gst_pad_set_setcaps_function (basesink->sinkpad,
550       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
551   gst_pad_set_fixatecaps_function (basesink->sinkpad,
552       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
553   gst_pad_set_bufferalloc_function (basesink->sinkpad,
554       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
555   gst_pad_set_activate_function (basesink->sinkpad,
556       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
557   gst_pad_set_activatepush_function (basesink->sinkpad,
558       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
559   gst_pad_set_activatepull_function (basesink->sinkpad,
560       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
561   gst_pad_set_event_function (basesink->sinkpad,
562       GST_DEBUG_FUNCPTR (gst_base_sink_event));
563   gst_pad_set_chain_function (basesink->sinkpad,
564       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
565   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
566
567   basesink->pad_mode = GST_ACTIVATE_NONE;
568   basesink->preroll_queue = g_queue_new ();
569   basesink->abidata.ABI.clip_segment = gst_segment_new ();
570   priv->have_latency = FALSE;
571
572   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
573   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
574
575   basesink->sync = DEFAULT_SYNC;
576   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
577   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
578   priv->async_enabled = DEFAULT_ASYNC;
579   priv->ts_offset = DEFAULT_TS_OFFSET;
580   priv->render_delay = 0;
581   priv->blocksize = DEFAULT_BLOCKSIZE;
582
583   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
584 }
585
586 static void
587 gst_base_sink_finalize (GObject * object)
588 {
589   GstBaseSink *basesink;
590
591   basesink = GST_BASE_SINK (object);
592
593   g_queue_free (basesink->preroll_queue);
594   gst_segment_free (basesink->abidata.ABI.clip_segment);
595
596   G_OBJECT_CLASS (parent_class)->finalize (object);
597 }
598
599 /**
600  * gst_base_sink_set_sync:
601  * @sink: the sink
602  * @sync: the new sync value.
603  *
604  * Configures @sink to synchronize on the clock or not. When
605  * @sync is FALSE, incomming samples will be played as fast as
606  * possible. If @sync is TRUE, the timestamps of the incomming
607  * buffers will be used to schedule the exact render time of its
608  * contents.
609  *
610  * Since: 0.10.4
611  */
612 void
613 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
614 {
615   g_return_if_fail (GST_IS_BASE_SINK (sink));
616
617   GST_OBJECT_LOCK (sink);
618   sink->sync = sync;
619   GST_OBJECT_UNLOCK (sink);
620 }
621
622 /**
623  * gst_base_sink_get_sync:
624  * @sink: the sink
625  *
626  * Checks if @sink is currently configured to synchronize against the
627  * clock.
628  *
629  * Returns: TRUE if the sink is configured to synchronize against the clock.
630  *
631  * Since: 0.10.4
632  */
633 gboolean
634 gst_base_sink_get_sync (GstBaseSink * sink)
635 {
636   gboolean res;
637
638   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
639
640   GST_OBJECT_LOCK (sink);
641   res = sink->sync;
642   GST_OBJECT_UNLOCK (sink);
643
644   return res;
645 }
646
647 /**
648  * gst_base_sink_set_max_lateness:
649  * @sink: the sink
650  * @max_lateness: the new max lateness value.
651  *
652  * Sets the new max lateness value to @max_lateness. This value is
653  * used to decide if a buffer should be dropped or not based on the
654  * buffer timestamp and the current clock time. A value of -1 means
655  * an unlimited time.
656  *
657  * Since: 0.10.4
658  */
659 void
660 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
661 {
662   g_return_if_fail (GST_IS_BASE_SINK (sink));
663
664   GST_OBJECT_LOCK (sink);
665   sink->abidata.ABI.max_lateness = max_lateness;
666   GST_OBJECT_UNLOCK (sink);
667 }
668
669 /**
670  * gst_base_sink_get_max_lateness:
671  * @sink: the sink
672  *
673  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
674  * more details.
675  *
676  * Returns: The maximum time in nanoseconds that a buffer can be late
677  * before it is dropped and not rendered. A value of -1 means an
678  * unlimited time.
679  *
680  * Since: 0.10.4
681  */
682 gint64
683 gst_base_sink_get_max_lateness (GstBaseSink * sink)
684 {
685   gint64 res;
686
687   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
688
689   GST_OBJECT_LOCK (sink);
690   res = sink->abidata.ABI.max_lateness;
691   GST_OBJECT_UNLOCK (sink);
692
693   return res;
694 }
695
696 /**
697  * gst_base_sink_set_qos_enabled:
698  * @sink: the sink
699  * @enabled: the new qos value.
700  *
701  * Configures @sink to send Quality-of-Service events upstream.
702  *
703  * Since: 0.10.5
704  */
705 void
706 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
707 {
708   g_return_if_fail (GST_IS_BASE_SINK (sink));
709
710   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
711 }
712
713 /**
714  * gst_base_sink_is_qos_enabled:
715  * @sink: the sink
716  *
717  * Checks if @sink is currently configured to send Quality-of-Service events
718  * upstream.
719  *
720  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
721  *
722  * Since: 0.10.5
723  */
724 gboolean
725 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
726 {
727   gboolean res;
728
729   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
730
731   res = g_atomic_int_get (&sink->priv->qos_enabled);
732
733   return res;
734 }
735
736 /**
737  * gst_base_sink_set_async_enabled:
738  * @sink: the sink
739  * @enabled: the new async value.
740  *
741  * Configures @sink to perform all state changes asynchronusly. When async is
742  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
743  * preroll buffer. This feature is usefull if the sink does not synchronize
744  * against the clock or when it is dealing with sparse streams.
745  *
746  * Since: 0.10.15
747  */
748 void
749 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
750 {
751   g_return_if_fail (GST_IS_BASE_SINK (sink));
752
753   GST_PAD_PREROLL_LOCK (sink->sinkpad);
754   sink->priv->async_enabled = enabled;
755   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
756   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
757 }
758
759 /**
760  * gst_base_sink_is_async_enabled:
761  * @sink: the sink
762  *
763  * Checks if @sink is currently configured to perform asynchronous state
764  * changes to PAUSED.
765  *
766  * Returns: TRUE if the sink is configured to perform asynchronous state
767  * changes.
768  *
769  * Since: 0.10.15
770  */
771 gboolean
772 gst_base_sink_is_async_enabled (GstBaseSink * sink)
773 {
774   gboolean res;
775
776   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
777
778   GST_PAD_PREROLL_LOCK (sink->sinkpad);
779   res = sink->priv->async_enabled;
780   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
781
782   return res;
783 }
784
785 /**
786  * gst_base_sink_set_ts_offset:
787  * @sink: the sink
788  * @offset: the new offset
789  *
790  * Adjust the synchronisation of @sink with @offset. A negative value will
791  * render buffers earlier than their timestamp. A positive value will delay
792  * rendering. This function can be used to fix playback of badly timestamped
793  * buffers.
794  *
795  * Since: 0.10.15
796  */
797 void
798 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
799 {
800   g_return_if_fail (GST_IS_BASE_SINK (sink));
801
802   GST_OBJECT_LOCK (sink);
803   sink->priv->ts_offset = offset;
804   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
805   GST_OBJECT_UNLOCK (sink);
806 }
807
808 /**
809  * gst_base_sink_get_ts_offset:
810  * @sink: the sink
811  *
812  * Get the synchronisation offset of @sink.
813  *
814  * Returns: The synchronisation offset.
815  *
816  * Since: 0.10.15
817  */
818 GstClockTimeDiff
819 gst_base_sink_get_ts_offset (GstBaseSink * sink)
820 {
821   GstClockTimeDiff res;
822
823   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
824
825   GST_OBJECT_LOCK (sink);
826   res = sink->priv->ts_offset;
827   GST_OBJECT_UNLOCK (sink);
828
829   return res;
830 }
831
832 /**
833  * gst_base_sink_get_last_buffer:
834  * @sink: the sink
835  *
836  * Get the last buffer that arrived in the sink and was used for preroll or for
837  * rendering. This property can be used to generate thumbnails.
838  *
839  * The #GstCaps on the buffer can be used to determine the type of the buffer.
840  * 
841  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
842  * NULL when no buffer has arrived in the sink yet or when the sink is not in
843  * PAUSED or PLAYING.
844  *
845  * Since: 0.10.15
846  */
847 GstBuffer *
848 gst_base_sink_get_last_buffer (GstBaseSink * sink)
849 {
850   GstBuffer *res;
851
852   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
853
854   GST_OBJECT_LOCK (sink);
855   if ((res = sink->priv->last_buffer))
856     gst_buffer_ref (res);
857   GST_OBJECT_UNLOCK (sink);
858
859   return res;
860 }
861
862 static void
863 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
864 {
865   GstBuffer *old;
866
867   if (buffer)
868     gst_buffer_ref (buffer);
869
870   GST_OBJECT_LOCK (sink);
871   old = sink->priv->last_buffer;
872   sink->priv->last_buffer = buffer;
873   GST_OBJECT_UNLOCK (sink);
874
875   if (old)
876     gst_buffer_unref (old);
877 }
878
879 /**
880  * gst_base_sink_get_latency:
881  * @sink: the sink
882  *
883  * Get the currently configured latency.
884  *
885  * Returns: The configured latency.
886  *
887  * Since: 0.10.12
888  */
889 GstClockTime
890 gst_base_sink_get_latency (GstBaseSink * sink)
891 {
892   GstClockTime res;
893
894   GST_OBJECT_LOCK (sink);
895   res = sink->priv->latency;
896   GST_OBJECT_UNLOCK (sink);
897
898   return res;
899 }
900
901 /**
902  * gst_base_sink_query_latency:
903  * @sink: the sink
904  * @live: if the sink is live
905  * @upstream_live: if an upstream element is live
906  * @min_latency: the min latency of the upstream elements
907  * @max_latency: the max latency of the upstream elements
908  *
909  * Query the sink for the latency parameters. The latency will be queried from
910  * the upstream elements. @live will be TRUE if @sink is configured to
911  * synchronize against the clock. @upstream_live will be TRUE if an upstream
912  * element is live. 
913  *
914  * If both @live and @upstream_live are TRUE, the sink will want to compensate
915  * for the latency introduced by the upstream elements by setting the
916  * @min_latency to a strictly possitive value.
917  *
918  * This function is mostly used by subclasses. 
919  *
920  * Returns: TRUE if the query succeeded.
921  *
922  * Since: 0.10.12
923  */
924 gboolean
925 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
926     gboolean * upstream_live, GstClockTime * min_latency,
927     GstClockTime * max_latency)
928 {
929   gboolean l, us_live, res, have_latency;
930   GstClockTime min, max, render_delay;
931   GstQuery *query;
932   GstClockTime us_min, us_max;
933
934   /* we are live when we sync to the clock */
935   GST_OBJECT_LOCK (sink);
936   l = sink->sync;
937   have_latency = sink->priv->have_latency;
938   render_delay = sink->priv->render_delay;
939   GST_OBJECT_UNLOCK (sink);
940
941   /* assume no latency */
942   min = 0;
943   max = -1;
944   us_live = FALSE;
945
946   if (have_latency) {
947     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
948     /* we are ready for a latency query this is when we preroll or when we are
949      * not async. */
950     query = gst_query_new_latency ();
951
952     /* ask the peer for the latency */
953     if ((res = gst_base_sink_peer_query (sink, query))) {
954       /* get upstream min and max latency */
955       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
956
957       if (us_live) {
958         /* upstream live, use its latency, subclasses should use these
959          * values to create the complete latency. */
960         min = us_min;
961         max = us_max;
962       }
963       if (l) {
964         /* we need to add the render delay if we are live */
965         if (min != -1)
966           min += render_delay;
967         if (max != -1)
968           max += render_delay;
969       }
970     }
971     gst_query_unref (query);
972   } else {
973     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
974     res = FALSE;
975   }
976
977   /* not live, we tried to do the query, if it failed we return TRUE anyway */
978   if (!res) {
979     if (!l) {
980       res = TRUE;
981       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
982     } else {
983       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
984     }
985   }
986
987   if (res) {
988     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
989         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
990         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
991
992     if (live)
993       *live = l;
994     if (upstream_live)
995       *upstream_live = us_live;
996     if (min_latency)
997       *min_latency = min;
998     if (max_latency)
999       *max_latency = max;
1000   }
1001   return res;
1002 }
1003
1004 /**
1005  * gst_base_sink_set_render_delay:
1006  * @sink: a #GstBaseSink
1007  * @delay: the new delay
1008  *
1009  * Set the render delay in @sink to @delay. The render delay is the time 
1010  * between actual rendering of a buffer and its synchronisation time. Some
1011  * devices might delay media rendering which can be compensated for with this
1012  * function. 
1013  *
1014  * After calling this function, this sink will report additional latency and
1015  * other sinks will adjust their latency to delay the rendering of their media.
1016  *
1017  * This function is usually called by subclasses.
1018  *
1019  * Since: 0.10.21
1020  */
1021 void
1022 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1023 {
1024   g_return_if_fail (GST_IS_BASE_SINK (sink));
1025
1026   GST_OBJECT_LOCK (sink);
1027   sink->priv->render_delay = delay;
1028   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1029       GST_TIME_ARGS (delay));
1030   GST_OBJECT_UNLOCK (sink);
1031 }
1032
1033 /**
1034  * gst_base_sink_get_render_delay:
1035  * @sink: a #GstBaseSink
1036  *
1037  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1038  * information about the render delay.
1039  *
1040  * Returns: the render delay of @sink.
1041  *
1042  * Since: 0.10.21
1043  */
1044 GstClockTime
1045 gst_base_sink_get_render_delay (GstBaseSink * sink)
1046 {
1047   GstClockTimeDiff res;
1048
1049   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1050
1051   GST_OBJECT_LOCK (sink);
1052   res = sink->priv->render_delay;
1053   GST_OBJECT_UNLOCK (sink);
1054
1055   return res;
1056 }
1057
1058 /**
1059  * gst_base_sink_set_blocksize:
1060  * @sink: a #GstBaseSink
1061  * @blocksize: the blocksize in bytes
1062  *
1063  * Set the number of bytes that the sink will pull when it is operating in pull
1064  * mode.
1065  *
1066  * Since: 0.10.22
1067  */
1068 void
1069 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1070 {
1071   g_return_if_fail (GST_IS_BASE_SINK (sink));
1072
1073   GST_OBJECT_LOCK (sink);
1074   sink->priv->blocksize = blocksize;
1075   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1076   GST_OBJECT_UNLOCK (sink);
1077 }
1078
1079 /**
1080  * gst_base_sink_get_blocksize:
1081  * @sink: a #GstBaseSink
1082  *
1083  * Get the number of bytes that the sink will pull when it is operating in pull
1084  * mode.
1085  *
1086  * Returns: the number of bytes @sink will pull in pull mode.
1087  *
1088  * Since: 0.10.22
1089  */
1090 guint
1091 gst_base_sink_get_blocksize (GstBaseSink * sink)
1092 {
1093   guint res;
1094
1095   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1096
1097   GST_OBJECT_LOCK (sink);
1098   res = sink->priv->blocksize;
1099   GST_OBJECT_UNLOCK (sink);
1100
1101   return res;
1102 }
1103
1104 static void
1105 gst_base_sink_set_property (GObject * object, guint prop_id,
1106     const GValue * value, GParamSpec * pspec)
1107 {
1108   GstBaseSink *sink = GST_BASE_SINK (object);
1109
1110   switch (prop_id) {
1111     case PROP_PREROLL_QUEUE_LEN:
1112       /* preroll lock necessary to serialize with finish_preroll */
1113       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1114       sink->preroll_queue_max_len = g_value_get_uint (value);
1115       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1116       break;
1117     case PROP_SYNC:
1118       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1119       break;
1120     case PROP_MAX_LATENESS:
1121       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1122       break;
1123     case PROP_QOS:
1124       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1125       break;
1126     case PROP_ASYNC:
1127       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1128       break;
1129     case PROP_TS_OFFSET:
1130       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1131       break;
1132     case PROP_BLOCKSIZE:
1133       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1134       break;
1135     default:
1136       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1137       break;
1138   }
1139 }
1140
1141 static void
1142 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1143     GParamSpec * pspec)
1144 {
1145   GstBaseSink *sink = GST_BASE_SINK (object);
1146
1147   switch (prop_id) {
1148     case PROP_PREROLL_QUEUE_LEN:
1149       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1150       g_value_set_uint (value, sink->preroll_queue_max_len);
1151       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1152       break;
1153     case PROP_SYNC:
1154       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1155       break;
1156     case PROP_MAX_LATENESS:
1157       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1158       break;
1159     case PROP_QOS:
1160       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1161       break;
1162     case PROP_ASYNC:
1163       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1164       break;
1165     case PROP_TS_OFFSET:
1166       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1167       break;
1168     case PROP_LAST_BUFFER:
1169       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1170       break;
1171     case PROP_BLOCKSIZE:
1172       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1173       break;
1174     default:
1175       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1176       break;
1177   }
1178 }
1179
1180
1181 static GstCaps *
1182 gst_base_sink_get_caps (GstBaseSink * sink)
1183 {
1184   return NULL;
1185 }
1186
1187 static gboolean
1188 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1189 {
1190   return TRUE;
1191 }
1192
1193 static GstFlowReturn
1194 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1195     GstCaps * caps, GstBuffer ** buf)
1196 {
1197   *buf = NULL;
1198   return GST_FLOW_OK;
1199 }
1200
1201 /* with PREROLL_LOCK, STREAM_LOCK */
1202 static void
1203 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1204 {
1205   GstMiniObject *obj;
1206
1207   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1208   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1209     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1210     gst_mini_object_unref (obj);
1211   }
1212   /* we can't have EOS anymore now */
1213   basesink->eos = FALSE;
1214   basesink->priv->received_eos = FALSE;
1215   basesink->have_preroll = FALSE;
1216   basesink->eos_queued = FALSE;
1217   basesink->preroll_queued = 0;
1218   basesink->buffers_queued = 0;
1219   basesink->events_queued = 0;
1220   /* can't report latency anymore until we preroll again */
1221   if (basesink->priv->async_enabled) {
1222     GST_OBJECT_LOCK (basesink);
1223     basesink->priv->have_latency = FALSE;
1224     GST_OBJECT_UNLOCK (basesink);
1225   }
1226   /* and signal any waiters now */
1227   GST_PAD_PREROLL_SIGNAL (pad);
1228 }
1229
1230 /* with STREAM_LOCK, configures given segment with the event information. */
1231 static void
1232 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1233     GstEvent * event, GstSegment * segment)
1234 {
1235   gboolean update;
1236   gdouble rate, arate;
1237   GstFormat format;
1238   gint64 start;
1239   gint64 stop;
1240   gint64 time;
1241
1242   /* the newsegment event is needed to bring the buffer timestamps to the
1243    * stream time and to drop samples outside of the playback segment. */
1244   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1245       &start, &stop, &time);
1246
1247   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1248    * We protect with the OBJECT_LOCK so that we can use the values to
1249    * safely answer a POSITION query. */
1250   GST_OBJECT_LOCK (basesink);
1251   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1252       stop, time);
1253
1254   if (format == GST_FORMAT_TIME) {
1255     GST_DEBUG_OBJECT (basesink,
1256         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1257         "format GST_FORMAT_TIME, "
1258         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1259         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1260         update, rate, arate, GST_TIME_ARGS (segment->start),
1261         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1262         GST_TIME_ARGS (segment->accum));
1263   } else {
1264     GST_DEBUG_OBJECT (basesink,
1265         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1266         "format %d, "
1267         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1268         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1269         segment->format, segment->start, segment->stop, segment->time,
1270         segment->accum);
1271   }
1272   GST_OBJECT_UNLOCK (basesink);
1273 }
1274
1275 /* with PREROLL_LOCK, STREAM_LOCK */
1276 static gboolean
1277 gst_base_sink_commit_state (GstBaseSink * basesink)
1278 {
1279   /* commit state and proceed to next pending state */
1280   GstState current, next, pending, post_pending;
1281   gboolean post_paused = FALSE;
1282   gboolean post_async_done = FALSE;
1283   gboolean post_playing = FALSE;
1284   gboolean sync;
1285
1286   /* we are certainly not playing async anymore now */
1287   basesink->playing_async = FALSE;
1288
1289   GST_OBJECT_LOCK (basesink);
1290   current = GST_STATE (basesink);
1291   next = GST_STATE_NEXT (basesink);
1292   pending = GST_STATE_PENDING (basesink);
1293   post_pending = pending;
1294   sync = basesink->sync;
1295
1296   switch (pending) {
1297     case GST_STATE_PLAYING:
1298     {
1299       GstBaseSinkClass *bclass;
1300       GstStateChangeReturn ret;
1301
1302       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1303
1304       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1305
1306       basesink->need_preroll = FALSE;
1307       post_async_done = TRUE;
1308       basesink->priv->commited = TRUE;
1309       post_playing = TRUE;
1310       /* post PAUSED too when we were READY */
1311       if (current == GST_STATE_READY) {
1312         post_paused = TRUE;
1313       }
1314
1315       /* make sure we notify the subclass of async playing */
1316       if (bclass->async_play) {
1317         ret = bclass->async_play (basesink);
1318         if (ret == GST_STATE_CHANGE_FAILURE)
1319           goto async_failed;
1320       }
1321       break;
1322     }
1323     case GST_STATE_PAUSED:
1324       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1325       post_paused = TRUE;
1326       post_async_done = TRUE;
1327       basesink->priv->commited = TRUE;
1328       post_pending = GST_STATE_VOID_PENDING;
1329       break;
1330     case GST_STATE_READY:
1331     case GST_STATE_NULL:
1332       goto stopping;
1333     case GST_STATE_VOID_PENDING:
1334       goto nothing_pending;
1335     default:
1336       break;
1337   }
1338
1339   /* we can report latency queries now */
1340   basesink->priv->have_latency = TRUE;
1341
1342   GST_STATE (basesink) = pending;
1343   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1344   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1345   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1346   GST_OBJECT_UNLOCK (basesink);
1347
1348   if (post_paused) {
1349     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1350     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1351         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1352             current, next, post_pending));
1353   }
1354   if (post_async_done) {
1355     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1356     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1357         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1358   }
1359   if (post_playing) {
1360     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1361     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1362         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1363             next, pending, GST_STATE_VOID_PENDING));
1364   }
1365
1366   GST_STATE_BROADCAST (basesink);
1367
1368   return TRUE;
1369
1370 nothing_pending:
1371   {
1372     /* Depending on the state, set our vars. We get in this situation when the
1373      * state change function got a change to update the state vars before the
1374      * streaming thread did. This is fine but we need to make sure that we
1375      * update the need_preroll var since it was TRUE when we got here and might
1376      * become FALSE if we got to PLAYING. */
1377     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1378         gst_element_state_get_name (current));
1379     switch (current) {
1380       case GST_STATE_PLAYING:
1381         basesink->need_preroll = FALSE;
1382         break;
1383       case GST_STATE_PAUSED:
1384         basesink->need_preroll = TRUE;
1385         break;
1386       default:
1387         basesink->need_preroll = FALSE;
1388         basesink->flushing = TRUE;
1389         break;
1390     }
1391     /* we can report latency queries now */
1392     basesink->priv->have_latency = TRUE;
1393     GST_OBJECT_UNLOCK (basesink);
1394     return TRUE;
1395   }
1396 stopping:
1397   {
1398     /* app is going to READY */
1399     GST_DEBUG_OBJECT (basesink, "stopping");
1400     basesink->need_preroll = FALSE;
1401     basesink->flushing = TRUE;
1402     GST_OBJECT_UNLOCK (basesink);
1403     return FALSE;
1404   }
1405 async_failed:
1406   {
1407     GST_DEBUG_OBJECT (basesink, "async commit failed");
1408     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1409     GST_OBJECT_UNLOCK (basesink);
1410     return FALSE;
1411   }
1412 }
1413
1414
1415 /* with STREAM_LOCK, PREROLL_LOCK
1416  *
1417  * Returns TRUE if the object needs synchronisation and takes therefore
1418  * part in prerolling.
1419  *
1420  * rsstart/rsstop contain the start/stop in stream time.
1421  * rrstart/rrstop contain the start/stop in running time.
1422  */
1423 static gboolean
1424 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1425     GstClockTime * rsstart, GstClockTime * rsstop,
1426     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1427     GstSegment * segment)
1428 {
1429   GstBaseSinkClass *bclass;
1430   GstBuffer *buffer;
1431   GstClockTime start, stop;     /* raw start/stop timestamps */
1432   gint64 cstart, cstop;         /* clipped raw timestamps */
1433   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1434   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1435   GstFormat format;
1436   GstBaseSinkPrivate *priv;
1437
1438   priv = basesink->priv;
1439
1440   /* start with nothing */
1441   start = stop = sstart = sstop = rstart = rstop = -1;
1442
1443   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1444     GstEvent *event = GST_EVENT_CAST (obj);
1445
1446     switch (GST_EVENT_TYPE (event)) {
1447         /* EOS event needs syncing */
1448       case GST_EVENT_EOS:
1449       {
1450         if (basesink->segment.rate >= 0.0) {
1451           sstart = sstop = priv->current_sstop;
1452           if (sstart == -1) {
1453             /* we have not seen a buffer yet, use the segment values */
1454             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1455                 basesink->segment.format, basesink->segment.stop);
1456           }
1457         } else {
1458           sstart = sstop = priv->current_sstart;
1459           if (sstart == -1) {
1460             /* we have not seen a buffer yet, use the segment values */
1461             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1462                 basesink->segment.format, basesink->segment.start);
1463           }
1464         }
1465
1466         rstart = rstop = priv->eos_rtime;
1467         *do_sync = rstart != -1;
1468         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1469             GST_TIME_ARGS (rstart));
1470         goto done;
1471       }
1472       default:
1473         /* other events do not need syncing */
1474         /* FIXME, maybe NEWSEGMENT might need synchronisation
1475          * since the POSITION query depends on accumulated times and
1476          * we cannot accumulate the current segment before the previous
1477          * one completed.
1478          */
1479         return FALSE;
1480     }
1481   }
1482
1483   /* else do buffer sync code */
1484   buffer = GST_BUFFER_CAST (obj);
1485
1486   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1487
1488   /* just get the times to see if we need syncing */
1489   if (bclass->get_times)
1490     bclass->get_times (basesink, buffer, &start, &stop);
1491
1492   if (start == -1) {
1493     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1494     *do_sync = FALSE;
1495   } else {
1496     *do_sync = TRUE;
1497   }
1498
1499   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1500       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1501       GST_TIME_ARGS (stop), *do_sync);
1502
1503   /* collect segment and format for code clarity */
1504   format = segment->format;
1505
1506   /* no timestamp clipping if we did not * get a TIME segment format */
1507   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1508     cstart = start;
1509     cstop = stop;
1510     /* do running and stream time in TIME format */
1511     format = GST_FORMAT_TIME;
1512     goto do_times;
1513   }
1514
1515   /* clip */
1516   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1517               (gint64) start, (gint64) stop, &cstart, &cstop)))
1518     goto out_of_segment;
1519
1520   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1521     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1522         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1523         GST_TIME_ARGS (cstop));
1524   }
1525
1526   /* set last stop position */
1527   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1528     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1529   else
1530     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1531
1532 do_times:
1533   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1534    * upstream is behaving very badly */
1535   sstart = gst_segment_to_stream_time (segment, format, cstart);
1536   sstop = gst_segment_to_stream_time (segment, format, cstop);
1537   rstart = gst_segment_to_running_time (segment, format, cstart);
1538   rstop = gst_segment_to_running_time (segment, format, cstop);
1539
1540 done:
1541   /* save times */
1542   *rsstart = sstart;
1543   *rsstop = sstop;
1544   *rrstart = rstart;
1545   *rrstop = rstop;
1546
1547   /* buffers and EOS always need syncing and preroll */
1548   return TRUE;
1549
1550   /* special cases */
1551 out_of_segment:
1552   {
1553     /* should not happen since we clip them in the chain function already, 
1554      * we return FALSE so that we don't try to sync on it. */
1555     GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
1556         (NULL), ("unexpected buffer out of segment found."));
1557     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1558     return FALSE;
1559   }
1560 }
1561
1562 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1563  * adjust a timestamp with the latency and timestamp offset */
1564 static GstClockTime
1565 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1566 {
1567   GstClockTimeDiff ts_offset;
1568
1569   /* don't do anything funny with invalid timestamps */
1570   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1571     return time;
1572
1573   time += basesink->priv->latency;
1574
1575   /* apply offset, be carefull for underflows */
1576   ts_offset = basesink->priv->ts_offset;
1577   if (ts_offset < 0) {
1578     ts_offset = -ts_offset;
1579     if (ts_offset < time)
1580       time -= ts_offset;
1581     else
1582       time = 0;
1583   } else
1584     time += ts_offset;
1585
1586   return time;
1587 }
1588
1589 /* gst_base_sink_wait_clock:
1590  * @sink: the sink
1591  * @time: the running_time to be reached
1592  * @jitter: the jitter to be filled with time diff (can be NULL)
1593  *
1594  * This function will block until @time is reached. It is usually called by
1595  * subclasses that use their own internal synchronisation.
1596  *
1597  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1598  * returned. Likewise, if synchronisation is disabled in the element or there
1599  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1600  *
1601  * This function should only be called with the PREROLL_LOCK held, like when
1602  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1603  * the ::render vmethod.
1604  *
1605  * The @time argument should be the running_time of when this method should
1606  * return and is not adjusted with any latency or offset configured in the
1607  * sink.
1608  *
1609  * Since 0.10.20
1610  *
1611  * Returns: #GstClockReturn
1612  */
1613 GstClockReturn
1614 gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time,
1615     GstClockTimeDiff * jitter)
1616 {
1617   GstClockID id;
1618   GstClockReturn ret;
1619   GstClock *clock;
1620
1621   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1622     goto invalid_time;
1623
1624   GST_OBJECT_LOCK (basesink);
1625   if (G_UNLIKELY (!basesink->sync))
1626     goto no_sync;
1627
1628   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
1629     goto no_clock;
1630
1631   /* add base_time to running_time to get the time against the clock */
1632   time += GST_ELEMENT_CAST (basesink)->base_time;
1633
1634   id = gst_clock_new_single_shot_id (clock, time);
1635   GST_OBJECT_UNLOCK (basesink);
1636
1637   /* A blocking wait is performed on the clock. We save the ClockID
1638    * so we can unlock the entry at any time. While we are blocking, we 
1639    * release the PREROLL_LOCK so that other threads can interrupt the
1640    * entry. */
1641   basesink->clock_id = id;
1642   /* release the preroll lock while waiting */
1643   GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1644
1645   ret = gst_clock_id_wait (id, jitter);
1646
1647   GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1648   gst_clock_id_unref (id);
1649   basesink->clock_id = NULL;
1650
1651   return ret;
1652
1653   /* no syncing needed */
1654 invalid_time:
1655   {
1656     GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed");
1657     return GST_CLOCK_BADTIME;
1658   }
1659 no_sync:
1660   {
1661     GST_DEBUG_OBJECT (basesink, "sync disabled");
1662     GST_OBJECT_UNLOCK (basesink);
1663     return GST_CLOCK_BADTIME;
1664   }
1665 no_clock:
1666   {
1667     GST_DEBUG_OBJECT (basesink, "no clock, can't sync");
1668     GST_OBJECT_UNLOCK (basesink);
1669     return GST_CLOCK_BADTIME;
1670   }
1671 }
1672
1673 /**
1674  * gst_base_sink_wait_preroll:
1675  * @sink: the sink
1676  *
1677  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1678  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1679  * this method before continuing to render the remaining data.
1680  *
1681  * This function will block until a state change to PLAYING happens (in which
1682  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1683  * to a state change to READY or a FLUSH event (in which case this function
1684  * returns #GST_FLOW_WRONG_STATE).
1685  *
1686  * This function should only be called with the PREROLL_LOCK held, like in the
1687  * render function.
1688  *
1689  * Since: 0.10.11
1690  *
1691  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1692  * continue. Any other return value should be returned from the render vmethod.
1693  */
1694 GstFlowReturn
1695 gst_base_sink_wait_preroll (GstBaseSink * sink)
1696 {
1697   sink->have_preroll = TRUE;
1698   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1699   /* block until the state changes, or we get a flush, or something */
1700   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1701   sink->have_preroll = FALSE;
1702   if (G_UNLIKELY (sink->flushing))
1703     goto stopping;
1704   GST_DEBUG_OBJECT (sink, "continue after preroll");
1705
1706   return GST_FLOW_OK;
1707
1708   /* ERRORS */
1709 stopping:
1710   {
1711     GST_DEBUG_OBJECT (sink, "preroll interrupted");
1712     return GST_FLOW_WRONG_STATE;
1713   }
1714 }
1715
1716 /**
1717  * gst_base_sink_do_preroll:
1718  * @sink: the sink
1719  * @obj: the object that caused the preroll
1720  *
1721  * If the @sink spawns its own thread for pulling buffers from upstream it
1722  * should call this method after it has pulled a buffer. If the element needed
1723  * to preroll, this function will perform the preroll and will then block
1724  * until the element state is changed.
1725  *
1726  * This function should be called with the PREROLL_LOCK held.
1727  *
1728  * Since 0.10.22
1729  *
1730  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1731  * continue. Any other return value should be returned from the render vmethod.
1732  */
1733 GstFlowReturn
1734 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
1735 {
1736   GstFlowReturn ret;
1737
1738   while (G_UNLIKELY (sink->need_preroll)) {
1739     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
1740
1741     if (G_LIKELY (sink->playing_async)) {
1742       /* commit state */
1743       if (G_UNLIKELY (!gst_base_sink_commit_state (sink)))
1744         goto stopping;
1745     }
1746
1747     /* need to recheck here because the commit state could have
1748      * made us not need the preroll anymore */
1749     if (G_LIKELY (sink->need_preroll)) {
1750       /* block until the state changes, or we get a flush, or something */
1751       ret = gst_base_sink_wait_preroll (sink);
1752       if (ret != GST_FLOW_OK)
1753         goto flushing;
1754     }
1755   }
1756   return GST_FLOW_OK;
1757
1758   /* ERRORS */
1759 flushing:
1760   {
1761     GST_DEBUG_OBJECT (sink, "we are flushing");
1762     return ret;
1763   }
1764 stopping:
1765   {
1766     GST_DEBUG_OBJECT (sink, "stopping while commiting state");
1767     return GST_FLOW_WRONG_STATE;
1768   }
1769 }
1770
1771 /**
1772  * gst_base_sink_wait_eos:
1773  * @sink: the sink
1774  * @time: the running_time to be reached
1775  * @jitter: the jitter to be filled with time diff (can be NULL)
1776  *
1777  * This function will block until @time is reached. It is usually called by
1778  * subclasses that use their own internal synchronisation but want to let the
1779  * EOS be handled by the base class.
1780  *
1781  * This function should only be called with the PREROLL_LOCK held, like when
1782  * receiving an EOS event in the ::event vmethod.
1783  *
1784  * The @time argument should be the running_time of when the EOS should happen
1785  * and will be adjusted with any latency and offset configured in the sink.
1786  *
1787  * Since 0.10.15
1788  *
1789  * Returns: #GstFlowReturn
1790  */
1791 GstFlowReturn
1792 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
1793     GstClockTimeDiff * jitter)
1794 {
1795   GstClockReturn status;
1796   GstFlowReturn ret;
1797
1798   do {
1799     GstClockTime stime;
1800
1801     GST_DEBUG_OBJECT (sink, "checking preroll");
1802
1803     /* first wait for the playing state before we can continue */
1804     if (G_UNLIKELY (sink->need_preroll)) {
1805       ret = gst_base_sink_wait_preroll (sink);
1806       if (ret != GST_FLOW_OK)
1807         goto flushing;
1808     }
1809
1810     /* preroll done, we can sync since we are in PLAYING now. */
1811     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
1812         GST_TIME_FORMAT, GST_TIME_ARGS (time));
1813
1814     /* compensate for latency and ts_offset. We don't adjust for render delay
1815      * because we don't interact with the device on EOS normally. */
1816     stime = gst_base_sink_adjust_time (sink, time);
1817
1818     /* wait for the clock, this can be interrupted because we got shut down or 
1819      * we PAUSED. */
1820     status = gst_base_sink_wait_clock (sink, stime, jitter);
1821
1822     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
1823
1824     /* invalid time, no clock or sync disabled, just continue then */
1825     if (status == GST_CLOCK_BADTIME)
1826       break;
1827
1828     /* waiting could have been interrupted and we can be flushing now */
1829     if (G_UNLIKELY (sink->flushing))
1830       goto flushing;
1831
1832     /* retry if we got unscheduled, which means we did not reach the timeout
1833      * yet. if some other error occures, we continue. */
1834   } while (status == GST_CLOCK_UNSCHEDULED);
1835
1836   GST_DEBUG_OBJECT (sink, "end of stream");
1837
1838   return GST_FLOW_OK;
1839
1840   /* ERRORS */
1841 flushing:
1842   {
1843     GST_DEBUG_OBJECT (sink, "we are flushing");
1844     return GST_FLOW_WRONG_STATE;
1845   }
1846 }
1847
1848 /* with STREAM_LOCK, PREROLL_LOCK
1849  *
1850  * Make sure we are in PLAYING and synchronize an object to the clock.
1851  *
1852  * If we need preroll, we are not in PLAYING. We try to commit the state
1853  * if needed and then block if we still are not PLAYING.
1854  *
1855  * We start waiting on the clock in PLAYING. If we got interrupted, we
1856  * immediatly try to re-preroll.
1857  *
1858  * Some objects do not need synchronisation (most events) and so this function
1859  * immediatly returns GST_FLOW_OK.
1860  *
1861  * for objects that arrive later than max-lateness to be synchronized to the 
1862  * clock have the @late boolean set to TRUE.
1863  *
1864  * This function keeps a running average of the jitter (the diff between the
1865  * clock time and the requested sync time). The jitter is negative for
1866  * objects that arrive in time and positive for late buffers.
1867  *
1868  * does not take ownership of obj.
1869  */
1870 static GstFlowReturn
1871 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
1872     GstMiniObject * obj, gboolean * late)
1873 {
1874   GstClockTimeDiff jitter;
1875   gboolean syncable;
1876   GstClockReturn status = GST_CLOCK_OK;
1877   GstClockTime rstart, rstop, sstart, sstop, stime;
1878   gboolean do_sync;
1879   GstBaseSinkPrivate *priv;
1880
1881   priv = basesink->priv;
1882
1883   sstart = sstop = rstart = rstop = -1;
1884   do_sync = TRUE;
1885
1886   priv->current_rstart = -1;
1887
1888   /* get timing information for this object against the render segment */
1889   syncable = gst_base_sink_get_sync_times (basesink, obj,
1890       &sstart, &sstop, &rstart, &rstop, &do_sync, &basesink->segment);
1891
1892   /* a syncable object needs to participate in preroll and
1893    * clocking. All buffers and EOS are syncable. */
1894   if (G_UNLIKELY (!syncable))
1895     goto not_syncable;
1896
1897   /* store timing info for current object */
1898   priv->current_rstart = rstart;
1899   priv->current_rstop = (rstop != -1 ? rstop : rstart);
1900   /* save sync time for eos when the previous object needed sync */
1901   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
1902
1903 again:
1904   /* first do preroll, this makes sure we commit our state
1905    * to PAUSED and can continue to PLAYING. We cannot perform
1906    * any clock sync in PAUSED because there is no clock. 
1907    */
1908   while (G_UNLIKELY (basesink->need_preroll)) {
1909     GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
1910
1911     if (G_LIKELY (basesink->playing_async)) {
1912       /* commit state */
1913       if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
1914         goto stopping;
1915     }
1916
1917     /* need to recheck here because the commit state could have
1918      * made us not need the preroll anymore */
1919     if (G_LIKELY (basesink->need_preroll)) {
1920       /* block until the state changes, or we get a flush, or something */
1921       if (gst_base_sink_wait_preroll (basesink) != GST_FLOW_OK)
1922         goto flushing;
1923     }
1924   }
1925
1926   /* After rendering we store the position of the last buffer so that we can use
1927    * it to report the position. We need to take the lock here. */
1928   GST_OBJECT_LOCK (basesink);
1929   priv->current_sstart = sstart;
1930   priv->current_sstop = (sstop != -1 ? sstop : sstart);
1931   GST_OBJECT_UNLOCK (basesink);
1932
1933   if (!do_sync)
1934     goto done;
1935
1936   /* adjust for latency */
1937   stime = gst_base_sink_adjust_time (basesink, rstart);
1938
1939   /* adjust for render-delay, avoid underflows */
1940   if (stime != -1) {
1941     if (stime > priv->render_delay)
1942       stime -= priv->render_delay;
1943     else
1944       stime = 0;
1945   }
1946
1947   /* preroll done, we can sync since we are in PLAYING now. */
1948   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
1949       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
1950       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
1951
1952   /* This function will return immediatly if start == -1, no clock
1953    * or sync is disabled with GST_CLOCK_BADTIME. */
1954   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
1955
1956   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
1957
1958   /* invalid time, no clock or sync disabled, just render */
1959   if (status == GST_CLOCK_BADTIME)
1960     goto done;
1961
1962   /* waiting could have been interrupted and we can be flushing now */
1963   if (G_UNLIKELY (basesink->flushing))
1964     goto flushing;
1965
1966   /* check for unlocked by a state change, we are not flushing so
1967    * we can try to preroll on the current buffer. */
1968   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
1969     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
1970     goto again;
1971   }
1972
1973   /* successful syncing done, record observation */
1974   priv->current_jitter = jitter;
1975
1976   /* check if the object should be dropped */
1977   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
1978       status, jitter);
1979
1980 done:
1981   return GST_FLOW_OK;
1982
1983   /* ERRORS */
1984 not_syncable:
1985   {
1986     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
1987     return GST_FLOW_OK;
1988   }
1989 flushing:
1990   {
1991     GST_DEBUG_OBJECT (basesink, "we are flushing");
1992     return GST_FLOW_WRONG_STATE;
1993   }
1994 stopping:
1995   {
1996     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
1997     return GST_FLOW_WRONG_STATE;
1998   }
1999 }
2000
2001 static gboolean
2002 gst_base_sink_send_qos (GstBaseSink * basesink,
2003     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2004 {
2005   GstEvent *event;
2006   gboolean res;
2007
2008   /* generate Quality-of-Service event */
2009   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2010       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2011       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2012
2013   event = gst_event_new_qos (proportion, diff, time);
2014
2015   /* send upstream */
2016   res = gst_pad_push_event (basesink->sinkpad, event);
2017
2018   return res;
2019 }
2020
2021 static void
2022 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2023 {
2024   GstBaseSinkPrivate *priv;
2025   GstClockTime start, stop;
2026   GstClockTimeDiff jitter;
2027   GstClockTime pt, entered, left;
2028   GstClockTime duration;
2029   gdouble rate;
2030
2031   priv = sink->priv;
2032
2033   start = priv->current_rstart;
2034
2035   /* if Quality-of-Service disabled, do nothing */
2036   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2037     return;
2038
2039   stop = priv->current_rstop;
2040   jitter = priv->current_jitter;
2041
2042   if (jitter < 0) {
2043     /* this is the time the buffer entered the sink */
2044     if (start < -jitter)
2045       entered = 0;
2046     else
2047       entered = start + jitter;
2048     left = start;
2049   } else {
2050     /* this is the time the buffer entered the sink */
2051     entered = start + jitter;
2052     /* this is the time the buffer left the sink */
2053     left = start + jitter;
2054   }
2055
2056   /* calculate duration of the buffer */
2057   if (stop != -1)
2058     duration = stop - start;
2059   else
2060     duration = -1;
2061
2062   /* if we have the time when the last buffer left us, calculate
2063    * processing time */
2064   if (priv->last_left != -1) {
2065     if (entered > priv->last_left) {
2066       pt = entered - priv->last_left;
2067     } else {
2068       pt = 0;
2069     }
2070   } else {
2071     pt = priv->avg_pt;
2072   }
2073
2074   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2075       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2076       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2077       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2078       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2079       jitter);
2080
2081   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2082       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2083       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2084       priv->avg_rate);
2085
2086   /* collect running averages. for first observations, we copy the
2087    * values */
2088   if (priv->avg_duration == -1)
2089     priv->avg_duration = duration;
2090   else
2091     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2092
2093   if (priv->avg_pt == -1)
2094     priv->avg_pt = pt;
2095   else
2096     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2097
2098   if (priv->avg_duration != 0)
2099     rate =
2100         gst_guint64_to_gdouble (priv->avg_pt) /
2101         gst_guint64_to_gdouble (priv->avg_duration);
2102   else
2103     rate = 0.0;
2104
2105   if (priv->last_left != -1) {
2106     if (dropped || priv->avg_rate < 0.0) {
2107       priv->avg_rate = rate;
2108     } else {
2109       if (rate > 1.0)
2110         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2111       else
2112         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2113     }
2114   }
2115
2116   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2117       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2118       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2119       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2120
2121
2122   if (priv->avg_rate >= 0.0) {
2123     /* if we have a valid rate, start sending QoS messages */
2124     if (priv->current_jitter < 0) {
2125       /* make sure we never go below 0 when adding the jitter to the
2126        * timestamp. */
2127       if (priv->current_rstart < -priv->current_jitter)
2128         priv->current_jitter = -priv->current_rstart;
2129     }
2130     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2131         priv->current_jitter);
2132   }
2133
2134   /* record when this buffer will leave us */
2135   priv->last_left = left;
2136 }
2137
2138 /* reset all qos measuring */
2139 static void
2140 gst_base_sink_reset_qos (GstBaseSink * sink)
2141 {
2142   GstBaseSinkPrivate *priv;
2143
2144   priv = sink->priv;
2145
2146   priv->last_in_time = -1;
2147   priv->last_left = -1;
2148   priv->avg_duration = -1;
2149   priv->avg_pt = -1;
2150   priv->avg_rate = -1.0;
2151   priv->avg_render = -1;
2152   priv->rendered = 0;
2153   priv->dropped = 0;
2154
2155 }
2156
2157 /* Checks if the object was scheduled too late.
2158  *
2159  * start/stop contain the raw timestamp start and stop values
2160  * of the object.
2161  *
2162  * status and jitter contain the return values from the clock wait.
2163  *
2164  * returns TRUE if the buffer was too late.
2165  */
2166 static gboolean
2167 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2168     GstClockTime start, GstClockTime stop,
2169     GstClockReturn status, GstClockTimeDiff jitter)
2170 {
2171   gboolean late;
2172   gint64 max_lateness;
2173   GstBaseSinkPrivate *priv;
2174
2175   priv = basesink->priv;
2176
2177   late = FALSE;
2178
2179   /* only for objects that were too late */
2180   if (G_LIKELY (status != GST_CLOCK_EARLY))
2181     goto in_time;
2182
2183   max_lateness = basesink->abidata.ABI.max_lateness;
2184
2185   /* check if frame dropping is enabled */
2186   if (max_lateness == -1)
2187     goto no_drop;
2188
2189   /* only check for buffers */
2190   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2191     goto not_buffer;
2192
2193   /* can't do check if we don't have a timestamp */
2194   if (G_UNLIKELY (start == -1))
2195     goto no_timestamp;
2196
2197   /* we can add a valid stop time */
2198   if (stop != -1)
2199     max_lateness += stop;
2200   else
2201     max_lateness += start;
2202
2203   /* if the jitter bigger than duration and lateness we are too late */
2204   if ((late = start + jitter > max_lateness)) {
2205     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2206         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2207         GST_TIME_ARGS (max_lateness));
2208     /* !!emergency!!, if we did not receive anything valid for more than a 
2209      * second, render it anyway so the user sees something */
2210     if (priv->last_in_time && start - priv->last_in_time > GST_SECOND) {
2211       late = FALSE;
2212       GST_DEBUG_OBJECT (basesink,
2213           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2214           GST_TIME_ARGS (priv->last_in_time));
2215     }
2216   }
2217
2218 done:
2219   if (!late) {
2220     priv->last_in_time = start;
2221   }
2222   return late;
2223
2224   /* all is fine */
2225 in_time:
2226   {
2227     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2228     goto done;
2229   }
2230 no_drop:
2231   {
2232     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2233     goto done;
2234   }
2235 not_buffer:
2236   {
2237     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2238     return FALSE;
2239   }
2240 no_timestamp:
2241   {
2242     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2243     return FALSE;
2244   }
2245 }
2246
2247 /* called before and after calling the render vmethod. It keeps track of how
2248  * much time was spent in the render method and is used to check if we are
2249  * flooded */
2250 static void
2251 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2252 {
2253   GstBaseSinkPrivate *priv;
2254
2255   priv = basesink->priv;
2256
2257   if (start) {
2258     priv->start = gst_util_get_timestamp ();
2259   } else {
2260     GstClockTime elapsed;
2261
2262     priv->stop = gst_util_get_timestamp ();
2263
2264     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2265
2266     if (priv->avg_render == -1)
2267       priv->avg_render = elapsed;
2268     else
2269       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2270
2271     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2272         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2273   }
2274 }
2275
2276 /* with STREAM_LOCK, PREROLL_LOCK,
2277  *
2278  * Synchronize the object on the clock and then render it.
2279  *
2280  * takes ownership of obj.
2281  */
2282 static GstFlowReturn
2283 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2284     GstMiniObject * obj)
2285 {
2286   GstFlowReturn ret = GST_FLOW_OK;
2287   GstBaseSinkClass *bclass;
2288   gboolean late = FALSE;
2289   GstBaseSinkPrivate *priv;
2290
2291   priv = basesink->priv;
2292
2293   /* synchronize this object, non syncable objects return OK
2294    * immediatly. */
2295   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2296   if (G_UNLIKELY (ret != GST_FLOW_OK))
2297     goto sync_failed;
2298
2299   /* and now render, event or buffer. */
2300   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2301     GstBuffer *buf;
2302
2303     /* drop late buffers unconditionally, let's hope it's unlikely */
2304     if (G_UNLIKELY (late))
2305       goto dropped;
2306
2307     buf = GST_BUFFER_CAST (obj);
2308
2309     gst_base_sink_set_last_buffer (basesink, buf);
2310
2311     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2312
2313     if (G_LIKELY (bclass->render)) {
2314       gint do_qos;
2315
2316       /* read once, to get same value before and after */
2317       do_qos = g_atomic_int_get (&priv->qos_enabled);
2318
2319       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2320
2321       /* record rendering time for QoS and stats */
2322       if (do_qos)
2323         gst_base_sink_do_render_stats (basesink, TRUE);
2324
2325       ret = bclass->render (basesink, buf);
2326
2327       priv->rendered++;
2328
2329       if (do_qos)
2330         gst_base_sink_do_render_stats (basesink, FALSE);
2331     }
2332   } else {
2333     GstEvent *event = GST_EVENT_CAST (obj);
2334     gboolean event_res = TRUE;
2335     GstEventType type;
2336
2337     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2338
2339     type = GST_EVENT_TYPE (event);
2340
2341     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2342         gst_event_type_get_name (type));
2343
2344     if (bclass->event)
2345       event_res = bclass->event (basesink, event);
2346
2347     /* when we get here we could be flushing again when the event handler calls
2348      * _wait_eos(). We have to ignore this object in that case. */
2349     if (G_UNLIKELY (basesink->flushing))
2350       goto flushing;
2351
2352     if (G_LIKELY (event_res)) {
2353       switch (type) {
2354         case GST_EVENT_EOS:
2355           /* the EOS event is completely handled so we mark
2356            * ourselves as being in the EOS state. eos is also 
2357            * protected by the object lock so we can read it when 
2358            * answering the POSITION query. */
2359           GST_OBJECT_LOCK (basesink);
2360           basesink->eos = TRUE;
2361           GST_OBJECT_UNLOCK (basesink);
2362           /* ok, now we can post the message */
2363           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2364           gst_element_post_message (GST_ELEMENT_CAST (basesink),
2365               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
2366           break;
2367         case GST_EVENT_NEWSEGMENT:
2368           /* configure the segment */
2369           gst_base_sink_configure_segment (basesink, pad, event,
2370               &basesink->segment);
2371           break;
2372         default:
2373           break;
2374       }
2375     }
2376   }
2377
2378 done:
2379   gst_base_sink_perform_qos (basesink, late);
2380
2381   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2382   gst_mini_object_unref (obj);
2383
2384   return ret;
2385
2386   /* ERRORS */
2387 sync_failed:
2388   {
2389     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2390     goto done;
2391   }
2392 dropped:
2393   {
2394     priv->dropped++;
2395     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2396     goto done;
2397   }
2398 flushing:
2399   {
2400     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2401     gst_mini_object_unref (obj);
2402     return GST_FLOW_WRONG_STATE;
2403   }
2404 }
2405
2406 /* with STREAM_LOCK, PREROLL_LOCK
2407  *
2408  * Perform preroll on the given object. For buffers this means 
2409  * calling the preroll subclass method. 
2410  * If that succeeds, the state will be commited.
2411  *
2412  * function does not take ownership of obj.
2413  */
2414 static GstFlowReturn
2415 gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad,
2416     GstMiniObject * obj)
2417 {
2418   GstFlowReturn ret;
2419
2420   GST_DEBUG_OBJECT (basesink, "do preroll %p", obj);
2421
2422   /* if it's a buffer, we need to call the preroll method */
2423   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2424     GstBaseSinkClass *bclass;
2425     GstBuffer *buf;
2426     GstClockTime timestamp;
2427
2428     buf = GST_BUFFER_CAST (obj);
2429     timestamp = GST_BUFFER_TIMESTAMP (buf);
2430
2431     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2432         GST_TIME_ARGS (timestamp));
2433
2434     gst_base_sink_set_last_buffer (basesink, buf);
2435
2436     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2437     if (bclass->preroll)
2438       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2439         goto preroll_failed;
2440   }
2441
2442   /* commit state */
2443   if (G_LIKELY (basesink->playing_async)) {
2444     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2445       goto stopping;
2446   }
2447
2448   return GST_FLOW_OK;
2449
2450   /* ERRORS */
2451 preroll_failed:
2452   {
2453     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2454     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2455     return ret;
2456   }
2457 stopping:
2458   {
2459     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2460     return GST_FLOW_WRONG_STATE;
2461   }
2462 }
2463
2464 /* with STREAM_LOCK, PREROLL_LOCK 
2465  *
2466  * Queue an object for rendering.
2467  * The first prerollable object queued will complete the preroll. If the
2468  * preroll queue if filled, we render all the objects in the queue.
2469  *
2470  * This function takes ownership of the object.
2471  */
2472 static GstFlowReturn
2473 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2474     GstMiniObject * obj, gboolean prerollable)
2475 {
2476   GstFlowReturn ret = GST_FLOW_OK;
2477   gint length;
2478   GQueue *q;
2479
2480   if (G_UNLIKELY (basesink->need_preroll)) {
2481     if (G_LIKELY (prerollable))
2482       basesink->preroll_queued++;
2483
2484     length = basesink->preroll_queued;
2485
2486     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2487
2488     /* first prerollable item needs to finish the preroll */
2489     if (length == 1) {
2490       ret = gst_base_sink_preroll_object (basesink, pad, obj);
2491       if (G_UNLIKELY (ret != GST_FLOW_OK))
2492         goto preroll_failed;
2493     }
2494     /* need to recheck if we need preroll, commmit state during preroll 
2495      * could have made us not need more preroll. */
2496     if (G_UNLIKELY (basesink->need_preroll)) {
2497       /* see if we can render now, if we can't add the object to the preroll
2498        * queue. */
2499       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2500         goto more_preroll;
2501     }
2502   }
2503
2504   /* we can start rendering (or blocking) the queued object
2505    * if any. */
2506   q = basesink->preroll_queue;
2507   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2508     GstMiniObject *o;
2509
2510     o = g_queue_pop_head (q);
2511     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2512
2513     /* do something with the return value */
2514     ret = gst_base_sink_render_object (basesink, pad, o);
2515     if (ret != GST_FLOW_OK)
2516       goto dequeue_failed;
2517   }
2518
2519   /* now render the object */
2520   ret = gst_base_sink_render_object (basesink, pad, obj);
2521   basesink->preroll_queued = 0;
2522
2523   return ret;
2524
2525   /* special cases */
2526 preroll_failed:
2527   {
2528     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2529         gst_flow_get_name (ret));
2530     gst_mini_object_unref (obj);
2531     return ret;
2532   }
2533 more_preroll:
2534   {
2535     /* add object to the queue and return */
2536     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2537         length, basesink->preroll_queue_max_len);
2538     g_queue_push_tail (basesink->preroll_queue, obj);
2539     return GST_FLOW_OK;
2540   }
2541 dequeue_failed:
2542   {
2543     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2544         gst_flow_get_name (ret));
2545     gst_mini_object_unref (obj);
2546     return ret;
2547   }
2548 }
2549
2550 /* with STREAM_LOCK
2551  *
2552  * This function grabs the PREROLL_LOCK and adds the object to
2553  * the queue.
2554  *
2555  * This function takes ownership of obj.
2556  */
2557 static GstFlowReturn
2558 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2559     GstMiniObject * obj, gboolean prerollable)
2560 {
2561   GstFlowReturn ret;
2562
2563   GST_PAD_PREROLL_LOCK (pad);
2564   if (G_UNLIKELY (basesink->flushing))
2565     goto flushing;
2566
2567   if (G_UNLIKELY (basesink->priv->received_eos))
2568     goto was_eos;
2569
2570   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2571   GST_PAD_PREROLL_UNLOCK (pad);
2572
2573   return ret;
2574
2575   /* ERRORS */
2576 flushing:
2577   {
2578     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2579     GST_PAD_PREROLL_UNLOCK (pad);
2580     gst_mini_object_unref (obj);
2581     return GST_FLOW_WRONG_STATE;
2582   }
2583 was_eos:
2584   {
2585     GST_DEBUG_OBJECT (basesink,
2586         "we are EOS, dropping object, return UNEXPECTED");
2587     GST_PAD_PREROLL_UNLOCK (pad);
2588     gst_mini_object_unref (obj);
2589     return GST_FLOW_UNEXPECTED;
2590   }
2591 }
2592
2593 static void
2594 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2595 {
2596   /* make sure we are not blocked on the clock also clear any pending
2597    * eos state. */
2598   gst_base_sink_set_flushing (basesink, pad, TRUE);
2599
2600   /* we grab the stream lock but that is not needed since setting the
2601    * sink to flushing would make sure no state commit is being done
2602    * anymore */
2603   GST_PAD_STREAM_LOCK (pad);
2604   gst_base_sink_reset_qos (basesink);
2605   if (basesink->priv->async_enabled) {
2606     /* and we need to commit our state again on the next
2607      * prerolled buffer */
2608     basesink->playing_async = TRUE;
2609     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2610   } else {
2611     basesink->priv->have_latency = TRUE;
2612     basesink->need_preroll = FALSE;
2613   }
2614   gst_base_sink_set_last_buffer (basesink, NULL);
2615   GST_PAD_STREAM_UNLOCK (pad);
2616 }
2617
2618 static void
2619 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2620 {
2621   /* unset flushing so we can accept new data, this also flushes out any EOS
2622    * event. */
2623   gst_base_sink_set_flushing (basesink, pad, FALSE);
2624
2625   /* for position reporting */
2626   GST_OBJECT_LOCK (basesink);
2627   basesink->priv->current_sstart = -1;
2628   basesink->priv->current_sstop = -1;
2629   basesink->priv->eos_rtime = -1;
2630   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2631     /* we need new segment info after the flush. */
2632     basesink->have_newsegment = FALSE;
2633     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2634     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2635   }
2636   GST_OBJECT_UNLOCK (basesink);
2637 }
2638
2639 static gboolean
2640 gst_base_sink_event (GstPad * pad, GstEvent * event)
2641 {
2642   GstBaseSink *basesink;
2643   gboolean result = TRUE;
2644   GstBaseSinkClass *bclass;
2645
2646   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2647
2648   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2649
2650   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2651       GST_EVENT_TYPE_NAME (event));
2652
2653   switch (GST_EVENT_TYPE (event)) {
2654     case GST_EVENT_EOS:
2655     {
2656       GstFlowReturn ret;
2657
2658       GST_PAD_PREROLL_LOCK (pad);
2659       if (G_UNLIKELY (basesink->flushing))
2660         goto flushing;
2661
2662       if (G_UNLIKELY (basesink->priv->received_eos)) {
2663         /* we can't accept anything when we are EOS */
2664         result = FALSE;
2665         gst_event_unref (event);
2666       } else {
2667         /* we set the received EOS flag here so that we can use it when testing if
2668          * we are prerolled and to refure more buffers. */
2669         basesink->priv->received_eos = TRUE;
2670
2671         /* EOS is a prerollable object, we call the unlocked version because it
2672          * does not check the received_eos flag. */
2673         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2674             GST_MINI_OBJECT_CAST (event), TRUE);
2675         if (G_UNLIKELY (ret != GST_FLOW_OK))
2676           result = FALSE;
2677       }
2678       GST_PAD_PREROLL_UNLOCK (pad);
2679       break;
2680     }
2681     case GST_EVENT_NEWSEGMENT:
2682     {
2683       GstFlowReturn ret;
2684
2685       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2686
2687       GST_PAD_PREROLL_LOCK (pad);
2688       if (G_UNLIKELY (basesink->flushing))
2689         goto flushing;
2690
2691       if (G_UNLIKELY (basesink->priv->received_eos)) {
2692         /* we can't accept anything when we are EOS */
2693         result = FALSE;
2694         gst_event_unref (event);
2695       } else {
2696         /* the new segment is a non prerollable item and does not block anything,
2697          * we need to configure the current clipping segment and insert the event 
2698          * in the queue to serialize it with the buffers for rendering. */
2699         gst_base_sink_configure_segment (basesink, pad, event,
2700             basesink->abidata.ABI.clip_segment);
2701
2702         ret =
2703             gst_base_sink_queue_object_unlocked (basesink, pad,
2704             GST_MINI_OBJECT_CAST (event), FALSE);
2705         if (G_UNLIKELY (ret != GST_FLOW_OK))
2706           result = FALSE;
2707         else {
2708           GST_OBJECT_LOCK (basesink);
2709           basesink->have_newsegment = TRUE;
2710           GST_OBJECT_UNLOCK (basesink);
2711         }
2712       }
2713       GST_PAD_PREROLL_UNLOCK (pad);
2714       break;
2715     }
2716     case GST_EVENT_FLUSH_START:
2717       if (bclass->event)
2718         bclass->event (basesink, event);
2719
2720       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2721
2722       gst_base_sink_flush_start (basesink, pad);
2723
2724       gst_event_unref (event);
2725       break;
2726     case GST_EVENT_FLUSH_STOP:
2727       if (bclass->event)
2728         bclass->event (basesink, event);
2729
2730       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
2731
2732       gst_base_sink_flush_stop (basesink, pad);
2733
2734       gst_event_unref (event);
2735       break;
2736     default:
2737       /* other events are sent to queue or subclass depending on if they
2738        * are serialized. */
2739       if (GST_EVENT_IS_SERIALIZED (event)) {
2740         gst_base_sink_queue_object (basesink, pad,
2741             GST_MINI_OBJECT_CAST (event), FALSE);
2742       } else {
2743         if (bclass->event)
2744           bclass->event (basesink, event);
2745         gst_event_unref (event);
2746       }
2747       break;
2748   }
2749 done:
2750   gst_object_unref (basesink);
2751
2752   return result;
2753
2754   /* ERRORS */
2755 flushing:
2756   {
2757     GST_DEBUG_OBJECT (basesink, "we are flushing");
2758     GST_PAD_PREROLL_UNLOCK (pad);
2759     result = FALSE;
2760     gst_event_unref (event);
2761     goto done;
2762   }
2763 }
2764
2765 /* default implementation to calculate the start and end
2766  * timestamps on a buffer, subclasses can override
2767  */
2768 static void
2769 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
2770     GstClockTime * start, GstClockTime * end)
2771 {
2772   GstClockTime timestamp, duration;
2773
2774   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2775   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2776
2777     /* get duration to calculate end time */
2778     duration = GST_BUFFER_DURATION (buffer);
2779     if (GST_CLOCK_TIME_IS_VALID (duration)) {
2780       *end = timestamp + duration;
2781     }
2782     *start = timestamp;
2783   }
2784 }
2785
2786 /* must be called with PREROLL_LOCK */
2787 static gboolean
2788 gst_base_sink_needs_preroll (GstBaseSink * basesink)
2789 {
2790   gboolean is_prerolled, res;
2791
2792   /* we have 2 cases where the PREROLL_LOCK is released:
2793    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
2794    *  2) we are syncing on the clock
2795    */
2796   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
2797   res = !is_prerolled;
2798
2799   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
2800       basesink->have_preroll, basesink->priv->received_eos, res);
2801
2802   return res;
2803 }
2804
2805 /* with STREAM_LOCK, PREROLL_LOCK 
2806  *
2807  * Takes a buffer and compare the timestamps with the last segment.
2808  * If the buffer falls outside of the segment boundaries, drop it.
2809  * Else queue the buffer for preroll and rendering.
2810  *
2811  * This function takes ownership of the buffer.
2812  */
2813 static GstFlowReturn
2814 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
2815     GstBuffer * buf)
2816 {
2817   GstBaseSinkClass *bclass;
2818   GstFlowReturn result;
2819   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
2820   GstSegment *clip_segment;
2821
2822   if (G_UNLIKELY (basesink->flushing))
2823     goto flushing;
2824
2825   if (G_UNLIKELY (basesink->priv->received_eos))
2826     goto was_eos;
2827
2828   /* for code clarity */
2829   clip_segment = basesink->abidata.ABI.clip_segment;
2830
2831   if (G_UNLIKELY (!basesink->have_newsegment)) {
2832     gboolean sync;
2833
2834     sync = gst_base_sink_get_sync (basesink);
2835     if (sync) {
2836       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
2837           (_("Internal data flow problem.")),
2838           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
2839     }
2840
2841     /* this means this sink will assume timestamps start from 0 */
2842     GST_OBJECT_LOCK (basesink);
2843     clip_segment->start = 0;
2844     clip_segment->stop = -1;
2845     basesink->segment.start = 0;
2846     basesink->segment.stop = -1;
2847     basesink->have_newsegment = TRUE;
2848     GST_OBJECT_UNLOCK (basesink);
2849   }
2850
2851   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2852
2853   /* check if the buffer needs to be dropped, we first ask the subclass for the
2854    * start and end */
2855   if (bclass->get_times)
2856     bclass->get_times (basesink, buf, &start, &end);
2857
2858   if (start == -1) {
2859     /* if the subclass does not want sync, we use our own values so that we at
2860      * least clip the buffer to the segment */
2861     gst_base_sink_get_times (basesink, buf, &start, &end);
2862   }
2863
2864   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2865       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
2866
2867   /* a dropped buffer does not participate in anything */
2868   if (GST_CLOCK_TIME_IS_VALID (start) &&
2869       (clip_segment->format == GST_FORMAT_TIME)) {
2870     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
2871                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
2872       goto out_of_segment;
2873   }
2874
2875   /* now we can process the buffer in the queue, this function takes ownership
2876    * of the buffer */
2877   result = gst_base_sink_queue_object_unlocked (basesink, pad,
2878       GST_MINI_OBJECT_CAST (buf), TRUE);
2879
2880   return result;
2881
2882   /* ERRORS */
2883 flushing:
2884   {
2885     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2886     gst_buffer_unref (buf);
2887     return GST_FLOW_WRONG_STATE;
2888   }
2889 was_eos:
2890   {
2891     GST_DEBUG_OBJECT (basesink,
2892         "we are EOS, dropping object, return UNEXPECTED");
2893     gst_buffer_unref (buf);
2894     return GST_FLOW_UNEXPECTED;
2895   }
2896 out_of_segment:
2897   {
2898     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
2899     gst_buffer_unref (buf);
2900     return GST_FLOW_OK;
2901   }
2902 }
2903
2904 /* with STREAM_LOCK
2905  */
2906 static GstFlowReturn
2907 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
2908 {
2909   GstBaseSink *basesink;
2910   GstFlowReturn result;
2911
2912   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2913
2914   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
2915     goto wrong_mode;
2916
2917   GST_PAD_PREROLL_LOCK (pad);
2918   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2919   GST_PAD_PREROLL_UNLOCK (pad);
2920
2921 done:
2922   return result;
2923
2924   /* ERRORS */
2925 wrong_mode:
2926   {
2927     GST_OBJECT_LOCK (pad);
2928     GST_WARNING_OBJECT (basesink,
2929         "Push on pad %s:%s, but it was not activated in push mode",
2930         GST_DEBUG_PAD_NAME (pad));
2931     GST_OBJECT_UNLOCK (pad);
2932     gst_buffer_unref (buf);
2933     /* we don't post an error message this will signal to the peer
2934      * pushing that EOS is reached. */
2935     result = GST_FLOW_UNEXPECTED;
2936     goto done;
2937   }
2938 }
2939
2940 static gboolean
2941 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
2942 {
2943   gboolean res = TRUE;
2944
2945   /* update our offset if the start/stop position was updated */
2946   if (segment->format == GST_FORMAT_BYTES) {
2947     segment->time = segment->start;
2948   } else if (segment->start == 0) {
2949     /* seek to start, we can implement a default for this. */
2950     segment->time = 0;
2951   } else {
2952     res = FALSE;
2953     GST_INFO_OBJECT (sink, "Can't do a default seek");
2954   }
2955
2956   return res;
2957 }
2958
2959 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
2960
2961 static gboolean
2962 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
2963     GstEvent * event, GstSegment * segment)
2964 {
2965   /* By default, we try one of 2 things:
2966    *   - For absolute seek positions, convert the requested position to our 
2967    *     configured processing format and place it in the output segment \
2968    *   - For relative seek positions, convert our current (input) values to the
2969    *     seek format, adjust by the relative seek offset and then convert back to
2970    *     the processing format
2971    */
2972   GstSeekType cur_type, stop_type;
2973   gint64 cur, stop;
2974   GstSeekFlags flags;
2975   GstFormat seek_format, dest_format;
2976   gdouble rate;
2977   gboolean update;
2978   gboolean res = TRUE;
2979
2980   gst_event_parse_seek (event, &rate, &seek_format, &flags,
2981       &cur_type, &cur, &stop_type, &stop);
2982   dest_format = segment->format;
2983
2984   if (seek_format == dest_format) {
2985     gst_segment_set_seek (segment, rate, seek_format, flags,
2986         cur_type, cur, stop_type, stop, &update);
2987     return TRUE;
2988   }
2989
2990   if (cur_type != GST_SEEK_TYPE_NONE) {
2991     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
2992     res =
2993         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
2994         &cur);
2995     cur_type = GST_SEEK_TYPE_SET;
2996   }
2997
2998   if (res && stop_type != GST_SEEK_TYPE_NONE) {
2999     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3000     res =
3001         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3002         &stop);
3003     stop_type = GST_SEEK_TYPE_SET;
3004   }
3005
3006   /* And finally, configure our output segment in the desired format */
3007   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3008       stop_type, stop, &update);
3009
3010   if (!res)
3011     goto no_format;
3012
3013   return res;
3014
3015 no_format:
3016   {
3017     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3018     return FALSE;
3019   }
3020 }
3021
3022 /* perform a seek, only executed in pull mode */
3023 static gboolean
3024 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3025 {
3026   gboolean flush;
3027   gdouble rate;
3028   GstFormat seek_format, dest_format;
3029   GstSeekFlags flags;
3030   GstSeekType cur_type, stop_type;
3031   gboolean seekseg_configured = FALSE;
3032   gint64 cur, stop;
3033   gboolean update, res = TRUE;
3034   GstSegment seeksegment;
3035
3036   dest_format = sink->segment.format;
3037
3038   if (event) {
3039     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3040     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3041         &cur_type, &cur, &stop_type, &stop);
3042
3043     flush = flags & GST_SEEK_FLAG_FLUSH;
3044   } else {
3045     GST_DEBUG_OBJECT (sink, "performing seek without event");
3046     flush = FALSE;
3047   }
3048
3049   if (flush) {
3050     GST_DEBUG_OBJECT (sink, "flushing upstream");
3051     gst_pad_push_event (pad, gst_event_new_flush_start ());
3052     gst_base_sink_flush_start (sink, pad);
3053   } else {
3054     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3055   }
3056
3057   GST_PAD_STREAM_LOCK (pad);
3058
3059   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3060    * copy the current segment info into the temp segment that we can actually
3061    * attempt the seek with. We only update the real segment if the seek suceeds. */
3062   if (!seekseg_configured) {
3063     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3064
3065     /* now configure the final seek segment */
3066     if (event) {
3067       if (sink->segment.format != seek_format) {
3068         /* OK, here's where we give the subclass a chance to convert the relative
3069          * seek into an absolute one in the processing format. We set up any
3070          * absolute seek above, before taking the stream lock. */
3071         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3072                 &seeksegment)) {
3073           GST_DEBUG_OBJECT (sink,
3074               "Preparing the seek failed after flushing. " "Aborting seek");
3075           res = FALSE;
3076         }
3077       } else {
3078         /* The seek format matches our processing format, no need to ask the
3079          * the subclass to configure the segment. */
3080         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3081             cur_type, cur, stop_type, stop, &update);
3082       }
3083     }
3084     /* Else, no seek event passed, so we're just (re)starting the 
3085        current segment. */
3086   }
3087
3088   if (res) {
3089     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3090         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3091         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3092
3093     /* do the seek, segment.last_stop contains the new position. */
3094     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3095   }
3096
3097
3098   if (flush) {
3099     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3100     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3101     gst_base_sink_flush_stop (sink, pad);
3102   } else if (res && sink->abidata.ABI.running) {
3103     /* we are running the current segment and doing a non-flushing seek, 
3104      * close the segment first based on the last_stop. */
3105     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3106         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3107   }
3108
3109   /* The subclass must have converted the segment to the processing format 
3110    * by now */
3111   if (res && seeksegment.format != dest_format) {
3112     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3113         "in the correct format. Aborting seek.");
3114     res = FALSE;
3115   }
3116
3117   /* if successfull seek, we update our real segment and push
3118    * out the new segment. */
3119   if (res) {
3120     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3121
3122     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3123       gst_element_post_message (GST_ELEMENT (sink),
3124           gst_message_new_segment_start (GST_OBJECT (sink),
3125               sink->segment.format, sink->segment.last_stop));
3126     }
3127   }
3128
3129   sink->priv->discont = TRUE;
3130   sink->abidata.ABI.running = TRUE;
3131
3132   GST_PAD_STREAM_UNLOCK (pad);
3133
3134   return res;
3135 }
3136
3137 /* with STREAM_LOCK
3138  */
3139 static void
3140 gst_base_sink_loop (GstPad * pad)
3141 {
3142   GstBaseSink *basesink;
3143   GstBuffer *buf = NULL;
3144   GstFlowReturn result;
3145   guint blocksize;
3146   guint64 offset;
3147
3148   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3149
3150   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3151
3152   if ((blocksize = basesink->priv->blocksize) == 0)
3153     blocksize = -1;
3154
3155   offset = basesink->segment.last_stop;
3156
3157   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3158       offset, blocksize);
3159
3160   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3161   if (G_UNLIKELY (result != GST_FLOW_OK))
3162     goto paused;
3163
3164   if (G_UNLIKELY (buf == NULL))
3165     goto no_buffer;
3166
3167   offset += GST_BUFFER_SIZE (buf);
3168
3169   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3170
3171   GST_PAD_PREROLL_LOCK (pad);
3172   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3173   GST_PAD_PREROLL_UNLOCK (pad);
3174   if (G_UNLIKELY (result != GST_FLOW_OK))
3175     goto paused;
3176
3177   return;
3178
3179   /* ERRORS */
3180 paused:
3181   {
3182     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3183         gst_flow_get_name (result));
3184     gst_pad_pause_task (pad);
3185     /* fatal errors and NOT_LINKED cause EOS */
3186     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3187       if (result == GST_FLOW_UNEXPECTED) {
3188         /* perform EOS logic */
3189         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3190           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3191               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3192                   basesink->segment.format, basesink->segment.last_stop));
3193         } else {
3194           gst_base_sink_event (pad, gst_event_new_eos ());
3195         }
3196       } else {
3197         /* for fatal errors we post an error message, post the error
3198          * first so the app knows about the error first. */
3199         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3200             (_("Internal data stream error.")),
3201             ("stream stopped, reason %s", gst_flow_get_name (result)));
3202         gst_base_sink_event (pad, gst_event_new_eos ());
3203       }
3204     }
3205     return;
3206   }
3207 no_buffer:
3208   {
3209     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3210     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3211         (_("Internal data flow error.")), ("element returned NULL buffer"));
3212     result = GST_FLOW_ERROR;
3213     goto paused;
3214   }
3215 }
3216
3217 static gboolean
3218 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3219     gboolean flushing)
3220 {
3221   GstBaseSinkClass *bclass;
3222
3223   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3224
3225   if (flushing) {
3226     /* unlock any subclasses, we need to do this before grabbing the
3227      * PREROLL_LOCK since we hold this lock before going into ::render. */
3228     if (bclass->unlock)
3229       bclass->unlock (basesink);
3230   }
3231
3232   GST_PAD_PREROLL_LOCK (pad);
3233   basesink->flushing = flushing;
3234   if (flushing) {
3235     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3236     if (bclass->unlock_stop)
3237       bclass->unlock_stop (basesink);
3238
3239     /* set need_preroll before we unblock the clock. If the clock is unblocked
3240      * before timing out, we can reuse the buffer for preroll. */
3241     basesink->need_preroll = TRUE;
3242
3243     /* step 2, unblock clock sync (if any) or any other blocking thing */
3244     if (basesink->clock_id) {
3245       gst_clock_id_unschedule (basesink->clock_id);
3246     }
3247
3248     /* flush out the data thread if it's locked in finish_preroll, this will
3249      * also flush out the EOS state */
3250     GST_DEBUG_OBJECT (basesink,
3251         "flushing out data thread, need preroll to TRUE");
3252     gst_base_sink_preroll_queue_flush (basesink, pad);
3253   }
3254   GST_PAD_PREROLL_UNLOCK (pad);
3255
3256   return TRUE;
3257 }
3258
3259 static gboolean
3260 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3261 {
3262   gboolean result;
3263
3264   if (active) {
3265     /* start task */
3266     result = gst_pad_start_task (basesink->sinkpad,
3267         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3268   } else {
3269     /* step 2, make sure streaming finishes */
3270     result = gst_pad_stop_task (basesink->sinkpad);
3271   }
3272
3273   return result;
3274 }
3275
3276 static gboolean
3277 gst_base_sink_pad_activate (GstPad * pad)
3278 {
3279   gboolean result = FALSE;
3280   GstBaseSink *basesink;
3281
3282   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3283
3284   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3285
3286   gst_base_sink_set_flushing (basesink, pad, FALSE);
3287
3288   /* we need to have the pull mode enabled */
3289   if (!basesink->can_activate_pull) {
3290     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3291     goto fallback;
3292   }
3293
3294   /* check if downstreams supports pull mode at all */
3295   if (!gst_pad_check_pull_range (pad)) {
3296     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3297     goto fallback;
3298   }
3299
3300   /* set the pad mode before starting the task so that it's in the
3301    * correct state for the new thread. also the sink set_caps and get_caps
3302    * function checks this */
3303   basesink->pad_mode = GST_ACTIVATE_PULL;
3304
3305   /* we first try to negotiate a format so that when we try to activate
3306    * downstream, it knows about our format */
3307   if (!gst_base_sink_negotiate_pull (basesink)) {
3308     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3309     goto fallback;
3310   }
3311
3312   /* ok activate now */
3313   if (!gst_pad_activate_pull (pad, TRUE)) {
3314     /* clear any pending caps */
3315     GST_OBJECT_LOCK (basesink);
3316     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3317     GST_OBJECT_UNLOCK (basesink);
3318     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3319     goto fallback;
3320   }
3321
3322   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3323   result = TRUE;
3324   goto done;
3325
3326   /* push mode fallback */
3327 fallback:
3328   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3329   if ((result = gst_pad_activate_push (pad, TRUE))) {
3330     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3331   }
3332
3333 done:
3334   if (!result) {
3335     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3336     gst_base_sink_set_flushing (basesink, pad, TRUE);
3337   }
3338
3339   gst_object_unref (basesink);
3340
3341   return result;
3342 }
3343
3344 static gboolean
3345 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3346 {
3347   gboolean result;
3348   GstBaseSink *basesink;
3349
3350   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3351
3352   if (active) {
3353     if (!basesink->can_activate_push) {
3354       result = FALSE;
3355       basesink->pad_mode = GST_ACTIVATE_NONE;
3356     } else {
3357       result = TRUE;
3358       basesink->pad_mode = GST_ACTIVATE_PUSH;
3359     }
3360   } else {
3361     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3362       g_warning ("Internal GStreamer activation error!!!");
3363       result = FALSE;
3364     } else {
3365       gst_base_sink_set_flushing (basesink, pad, TRUE);
3366       result = TRUE;
3367       basesink->pad_mode = GST_ACTIVATE_NONE;
3368     }
3369   }
3370
3371   gst_object_unref (basesink);
3372
3373   return result;
3374 }
3375
3376 static gboolean
3377 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3378 {
3379   GstCaps *caps;
3380   gboolean result;
3381
3382   result = FALSE;
3383
3384   /* this returns the intersection between our caps and the peer caps. If there
3385    * is no peer, it returns NULL and we can't operate in pull mode so we can
3386    * fail the negotiation. */
3387   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3388   if (caps == NULL || gst_caps_is_empty (caps))
3389     goto no_caps_possible;
3390
3391   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3392
3393   caps = gst_caps_make_writable (caps);
3394   /* get the first (prefered) format */
3395   gst_caps_truncate (caps);
3396   /* try to fixate */
3397   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3398
3399   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3400
3401   if (gst_caps_is_any (caps)) {
3402     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3403         "allowing pull()");
3404     /* neither side has template caps in this case, so they are prepared for
3405        pull() without setcaps() */
3406     result = TRUE;
3407   } else if (gst_caps_is_fixed (caps)) {
3408     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3409       goto could_not_set_caps;
3410
3411     GST_OBJECT_LOCK (basesink);
3412     gst_caps_replace (&basesink->priv->pull_caps, caps);
3413     GST_OBJECT_UNLOCK (basesink);
3414
3415     result = TRUE;
3416   }
3417
3418   gst_caps_unref (caps);
3419
3420   return result;
3421
3422 no_caps_possible:
3423   {
3424     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3425     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3426     if (caps)
3427       gst_caps_unref (caps);
3428     return FALSE;
3429   }
3430 could_not_set_caps:
3431   {
3432     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3433     gst_caps_unref (caps);
3434     return FALSE;
3435   }
3436 }
3437
3438 /* this won't get called until we implement an activate function */
3439 static gboolean
3440 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3441 {
3442   gboolean result = FALSE;
3443   GstBaseSink *basesink;
3444   GstBaseSinkClass *bclass;
3445
3446   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3447   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3448
3449   if (active) {
3450     GstFormat format;
3451     gint64 duration;
3452
3453     /* we mark we have a newsegment here because pull based
3454      * mode works just fine without having a newsegment before the
3455      * first buffer */
3456     format = GST_FORMAT_BYTES;
3457
3458     gst_segment_init (&basesink->segment, format);
3459     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3460     GST_OBJECT_LOCK (basesink);
3461     basesink->have_newsegment = TRUE;
3462     GST_OBJECT_UNLOCK (basesink);
3463
3464     /* get the peer duration in bytes */
3465     result = gst_pad_query_peer_duration (pad, &format, &duration);
3466     if (result) {
3467       GST_DEBUG_OBJECT (basesink,
3468           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3469       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3470           duration);
3471       gst_segment_set_duration (&basesink->segment, format, duration);
3472     } else {
3473       GST_DEBUG_OBJECT (basesink, "unknown duration");
3474     }
3475
3476     if (bclass->activate_pull)
3477       result = bclass->activate_pull (basesink, TRUE);
3478     else
3479       result = FALSE;
3480
3481     if (!result)
3482       goto activate_failed;
3483
3484     /* but if starting the thread fails, set it back */
3485     if (!result)
3486       basesink->pad_mode = GST_ACTIVATE_NONE;
3487   } else {
3488     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3489       g_warning ("Internal GStreamer activation error!!!");
3490       result = FALSE;
3491     } else {
3492       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3493       if (bclass->activate_pull)
3494         result &= bclass->activate_pull (basesink, FALSE);
3495       basesink->pad_mode = GST_ACTIVATE_NONE;
3496       /* clear any pending caps */
3497       GST_OBJECT_LOCK (basesink);
3498       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3499       GST_OBJECT_UNLOCK (basesink);
3500     }
3501   }
3502   gst_object_unref (basesink);
3503
3504   return result;
3505
3506   /* ERRORS */
3507 activate_failed:
3508   {
3509     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3510     return FALSE;
3511   }
3512 }
3513
3514 /* send an event to our sinkpad peer. */
3515 static gboolean
3516 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3517 {
3518   GstPad *pad;
3519   GstBaseSink *basesink = GST_BASE_SINK (element);
3520   gboolean forward, result = TRUE;
3521   GstActivateMode mode;
3522
3523   GST_OBJECT_LOCK (element);
3524   /* get the pad and the scheduling mode */
3525   pad = gst_object_ref (basesink->sinkpad);
3526   mode = basesink->pad_mode;
3527   GST_OBJECT_UNLOCK (element);
3528
3529   /* only push UPSTREAM events upstream and if we are in push mode */
3530   forward = GST_EVENT_IS_UPSTREAM (event) && (mode == GST_ACTIVATE_PUSH);
3531
3532   switch (GST_EVENT_TYPE (event)) {
3533     case GST_EVENT_LATENCY:
3534     {
3535       GstClockTime latency;
3536
3537       gst_event_parse_latency (event, &latency);
3538
3539       /* store the latency. We use this to adjust the running_time before syncing
3540        * it to the clock. */
3541       GST_OBJECT_LOCK (element);
3542       basesink->priv->latency = latency;
3543       GST_OBJECT_UNLOCK (element);
3544       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3545           GST_TIME_ARGS (latency));
3546
3547       /* We forward this event so that all elements know about the global pipeline
3548        * latency. This is interesting for an element when it wants to figure out
3549        * when a particular piece of data will be rendered. */
3550       break;
3551     }
3552     case GST_EVENT_SEEK:
3553       /* in pull mode we will execute the seek */
3554       if (mode == GST_ACTIVATE_PULL)
3555         result = gst_base_sink_perform_seek (basesink, pad, event);
3556       break;
3557     default:
3558       break;
3559   }
3560
3561   if (forward) {
3562     result = gst_pad_push_event (pad, event);
3563   } else {
3564     /* not forwarded, unref the event */
3565     gst_event_unref (event);
3566   }
3567
3568   gst_object_unref (pad);
3569   return result;
3570 }
3571
3572 static gboolean
3573 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3574 {
3575   GstPad *peer;
3576   gboolean res = FALSE;
3577
3578   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3579     res = gst_pad_query (peer, query);
3580     gst_object_unref (peer);
3581   }
3582   return res;
3583 }
3584
3585 /* get the end position of the last seen object, this is used
3586  * for EOS and for making sure that we don't report a position we
3587  * have not reached yet. */
3588 static gboolean
3589 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
3590     gint64 * cur)
3591 {
3592   GstFormat oformat;
3593   GstSegment *segment;
3594   gboolean ret = TRUE;
3595
3596   segment = &basesink->segment;
3597   oformat = segment->format;
3598
3599   if (oformat == GST_FORMAT_TIME) {
3600     /* return last observed stream time, we keep the stream time around in the
3601      * time format. */
3602     *cur = basesink->priv->current_sstop;
3603   } else {
3604     /* convert last stop to stream time */
3605     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
3606   }
3607
3608   if (*cur != -1 && oformat != format) {
3609     /* convert to the target format if we need to */
3610     ret =
3611         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
3612   }
3613
3614   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
3615       GST_TIME_ARGS (*cur));
3616
3617   return ret;
3618 }
3619
3620 /* get the position when we are PAUSED, this is the stream time of the buffer
3621  * that prerolled. If no buffer is prerolled (we are still flushing), this
3622  * value will be -1. */
3623 static gboolean
3624 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
3625     gint64 * cur)
3626 {
3627   gboolean res;
3628   gint64 time;
3629   GstSegment *segment;
3630   GstFormat oformat;
3631
3632   /* we don't use the clip segment in pull mode, when seeking we update the
3633    * main segment directly with the new segment values without it having to be
3634    * activated by the rendering after preroll */
3635   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
3636     segment = basesink->abidata.ABI.clip_segment;
3637   else
3638     segment = &basesink->segment;
3639   oformat = segment->format;
3640
3641   if (oformat == GST_FORMAT_TIME) {
3642     *cur = basesink->priv->current_sstart;
3643   } else {
3644     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
3645   }
3646
3647   time = segment->time;
3648
3649   if (*cur != -1) {
3650     *cur = MAX (*cur, time);
3651     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
3652         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
3653   } else {
3654     /* we have no buffer, use the segment times. */
3655     if (segment->rate >= 0.0) {
3656       /* forward, next position is always the time of the segment */
3657       *cur = time;
3658       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
3659           GST_TIME_ARGS (*cur));
3660     } else {
3661       /* reverse, next expected timestamp is segment->stop. We use the function
3662        * to get things right for negative applied_rates. */
3663       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
3664       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
3665           GST_TIME_ARGS (*cur));
3666     }
3667   }
3668   res = (*cur != -1);
3669   if (res && oformat != format) {
3670     res =
3671         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
3672   }
3673
3674   return res;
3675 }
3676
3677 static gboolean
3678 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
3679     gint64 * cur, gboolean * upstream)
3680 {
3681   GstClock *clock;
3682   gboolean res = FALSE;
3683   GstFormat oformat, tformat;
3684   GstClockTime now, base, latency;
3685   gint64 time, accum, duration;
3686   gdouble rate;
3687   gint64 last;
3688
3689   GST_OBJECT_LOCK (basesink);
3690   /* our intermediate time format */
3691   tformat = GST_FORMAT_TIME;
3692   /* get the format in the segment */
3693   oformat = basesink->segment.format;
3694
3695   /* can only give answer based on the clock if not EOS */
3696   if (G_UNLIKELY (basesink->eos))
3697     goto in_eos;
3698
3699   /* we can only get the segment when we are not NULL or READY */
3700   if (!basesink->have_newsegment)
3701     goto wrong_state;
3702
3703   /* when not in PLAYING or when we're busy with a state change, we
3704    * cannot read from the clock so we report time based on the
3705    * last seen timestamp. */
3706   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
3707       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
3708     goto in_pause;
3709
3710   /* we need to sync on the clock. */
3711   if (basesink->sync == FALSE)
3712     goto no_sync;
3713
3714   /* and we need a clock */
3715   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
3716     goto no_sync;
3717
3718   /* collect all data we need holding the lock */
3719   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
3720     time = basesink->segment.time;
3721   else
3722     time = 0;
3723
3724   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
3725     duration = basesink->segment.stop - basesink->segment.start;
3726   else
3727     duration = 0;
3728
3729   base = GST_ELEMENT_CAST (basesink)->base_time;
3730   accum = basesink->segment.accum;
3731   rate = basesink->segment.rate * basesink->segment.applied_rate;
3732   gst_base_sink_get_position_last (basesink, format, &last);
3733   latency = basesink->priv->latency;
3734
3735   gst_object_ref (clock);
3736   /* need to release the object lock before we can get the time, 
3737    * a clock might take the LOCK of the provider, which could be
3738    * a basesink subclass. */
3739   GST_OBJECT_UNLOCK (basesink);
3740
3741   now = gst_clock_get_time (clock);
3742
3743   if (oformat != tformat) {
3744     /* convert accum, time and duration to time */
3745     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
3746             &accum))
3747       goto convert_failed;
3748     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
3749             &duration))
3750       goto convert_failed;
3751     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
3752             &time))
3753       goto convert_failed;
3754   }
3755
3756   /* subtract base time and accumulated time from the clock time. 
3757    * Make sure we don't go negative. This is the current time in
3758    * the segment which we need to scale with the combined 
3759    * rate and applied rate. */
3760   base += accum;
3761   base += latency;
3762   base = MIN (now, base);
3763
3764   /* for negative rates we need to count back from from the segment
3765    * duration. */
3766   if (rate < 0.0)
3767     time += duration;
3768
3769   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
3770
3771   /* never report more than last seen position */
3772   if (last != -1)
3773     *cur = MIN (last, *cur);
3774
3775   gst_object_unref (clock);
3776
3777   GST_DEBUG_OBJECT (basesink,
3778       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
3779       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
3780       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
3781       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
3782
3783   if (oformat != format) {
3784     /* convert time to final format */
3785     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
3786       goto convert_failed;
3787   }
3788
3789   res = TRUE;
3790
3791 done:
3792   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
3793       res, GST_TIME_ARGS (*cur));
3794   return res;
3795
3796   /* special cases */
3797 in_eos:
3798   {
3799     GST_DEBUG_OBJECT (basesink, "position in EOS");
3800     res = gst_base_sink_get_position_last (basesink, format, cur);
3801     GST_OBJECT_UNLOCK (basesink);
3802     goto done;
3803   }
3804 in_pause:
3805   {
3806     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
3807     res = gst_base_sink_get_position_paused (basesink, format, cur);
3808     GST_OBJECT_UNLOCK (basesink);
3809     goto done;
3810   }
3811 wrong_state:
3812   {
3813     /* in NULL or READY we always return FALSE and -1 */
3814     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
3815     res = FALSE;
3816     *cur = -1;
3817     GST_OBJECT_UNLOCK (basesink);
3818     goto done;
3819   }
3820 no_sync:
3821   {
3822     /* report last seen timestamp if any, else ask upstream to answer */
3823     if ((*cur = basesink->priv->current_sstart) != -1)
3824       res = TRUE;
3825     else
3826       *upstream = TRUE;
3827
3828     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
3829         res, GST_TIME_ARGS (*cur));
3830     GST_OBJECT_UNLOCK (basesink);
3831     return res;
3832   }
3833 convert_failed:
3834   {
3835     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
3836     *upstream = TRUE;
3837     return FALSE;
3838   }
3839 }
3840
3841 static gboolean
3842 gst_base_sink_query (GstElement * element, GstQuery * query)
3843 {
3844   gboolean res = FALSE;
3845
3846   GstBaseSink *basesink = GST_BASE_SINK (element);
3847
3848   switch (GST_QUERY_TYPE (query)) {
3849     case GST_QUERY_POSITION:
3850     {
3851       gint64 cur = 0;
3852       GstFormat format;
3853       gboolean upstream = FALSE;
3854
3855       gst_query_parse_position (query, &format, NULL);
3856
3857       GST_DEBUG_OBJECT (basesink, "position format %d", format);
3858
3859       /* first try to get the position based on the clock */
3860       if ((res =
3861               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
3862         gst_query_set_position (query, format, cur);
3863       } else if (upstream) {
3864         /* fallback to peer query */
3865         res = gst_base_sink_peer_query (basesink, query);
3866       }
3867       break;
3868     }
3869     case GST_QUERY_DURATION:
3870     {
3871       GstFormat format, uformat;
3872       gint64 duration, uduration;
3873
3874       gst_query_parse_duration (query, &format, NULL);
3875
3876       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
3877           gst_format_get_name (format));
3878
3879       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
3880         uformat = GST_FORMAT_BYTES;
3881
3882         /* get the duration in bytes, in pull mode that's all we are sure to
3883          * know. We have to explicitly get this value from upstream instead of
3884          * using our cached value because it might change. Duration caching
3885          * should be done at a higher level. */
3886         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
3887             &uduration);
3888         if (res) {
3889           gst_segment_set_duration (&basesink->segment, uformat, uduration);
3890           if (format != uformat) {
3891             /* convert to the requested format */
3892             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
3893                 &format, &duration);
3894           } else {
3895             duration = uduration;
3896           }
3897           if (res) {
3898             /* set the result */
3899             gst_query_set_duration (query, format, duration);
3900           }
3901         }
3902       } else {
3903         /* in push mode we simply forward upstream */
3904         res = gst_base_sink_peer_query (basesink, query);
3905       }
3906       break;
3907     }
3908     case GST_QUERY_LATENCY:
3909     {
3910       gboolean live, us_live;
3911       GstClockTime min, max;
3912
3913       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
3914                   &max))) {
3915         gst_query_set_latency (query, live, min, max);
3916       }
3917       break;
3918     }
3919     case GST_QUERY_JITTER:
3920       break;
3921     case GST_QUERY_RATE:
3922       /* gst_query_set_rate (query, basesink->segment_rate); */
3923       res = TRUE;
3924       break;
3925     case GST_QUERY_SEGMENT:
3926     {
3927       /* FIXME, bring start/stop to stream time */
3928       gst_query_set_segment (query, basesink->segment.rate,
3929           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
3930       break;
3931     }
3932     case GST_QUERY_SEEKING:
3933     case GST_QUERY_CONVERT:
3934     case GST_QUERY_FORMATS:
3935     default:
3936       res = gst_base_sink_peer_query (basesink, query);
3937       break;
3938   }
3939   return res;
3940 }
3941
3942 static GstStateChangeReturn
3943 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
3944 {
3945   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3946   GstBaseSink *basesink = GST_BASE_SINK (element);
3947   GstBaseSinkClass *bclass;
3948   GstBaseSinkPrivate *priv;
3949
3950   priv = basesink->priv;
3951
3952   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3953
3954   switch (transition) {
3955     case GST_STATE_CHANGE_NULL_TO_READY:
3956       if (bclass->start)
3957         if (!bclass->start (basesink))
3958           goto start_failed;
3959       break;
3960     case GST_STATE_CHANGE_READY_TO_PAUSED:
3961       /* need to complete preroll before this state change completes, there
3962        * is no data flow in READY so we can safely assume we need to preroll. */
3963       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3964       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
3965       basesink->have_newsegment = FALSE;
3966       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3967       gst_segment_init (basesink->abidata.ABI.clip_segment,
3968           GST_FORMAT_UNDEFINED);
3969       basesink->offset = 0;
3970       basesink->have_preroll = FALSE;
3971       basesink->need_preroll = TRUE;
3972       basesink->playing_async = TRUE;
3973       priv->current_sstart = -1;
3974       priv->current_sstop = -1;
3975       priv->eos_rtime = -1;
3976       priv->latency = 0;
3977       basesink->eos = FALSE;
3978       priv->received_eos = FALSE;
3979       gst_base_sink_reset_qos (basesink);
3980       priv->commited = FALSE;
3981       if (priv->async_enabled) {
3982         GST_DEBUG_OBJECT (basesink, "doing async state change");
3983         /* when async enabled, post async-start message and return ASYNC from
3984          * the state change function */
3985         ret = GST_STATE_CHANGE_ASYNC;
3986         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3987             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3988       } else {
3989         priv->have_latency = TRUE;
3990       }
3991       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3992       break;
3993     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3994       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3995       if (!gst_base_sink_needs_preroll (basesink)) {
3996         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
3997         /* no preroll needed anymore now. */
3998         basesink->playing_async = FALSE;
3999         basesink->need_preroll = FALSE;
4000         if (basesink->eos) {
4001           /* need to post EOS message here */
4002           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4003           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4004               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
4005         } else {
4006           GST_DEBUG_OBJECT (basesink, "signal preroll");
4007           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4008         }
4009       } else {
4010         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4011         basesink->need_preroll = TRUE;
4012         basesink->playing_async = TRUE;
4013         priv->commited = FALSE;
4014         if (priv->async_enabled) {
4015           GST_DEBUG_OBJECT (basesink, "doing async state change");
4016           ret = GST_STATE_CHANGE_ASYNC;
4017           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4018               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4019         }
4020       }
4021       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4022       break;
4023     default:
4024       break;
4025   }
4026
4027   {
4028     GstStateChangeReturn bret;
4029
4030     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4031     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4032       goto activate_failed;
4033   }
4034
4035   switch (transition) {
4036     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4037       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4038       /* FIXME, make sure we cannot enter _render first */
4039
4040       /* we need to call ::unlock before locking PREROLL_LOCK
4041        * since we lock it before going into ::render */
4042       if (bclass->unlock)
4043         bclass->unlock (basesink);
4044
4045       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4046       /* now that we have the PREROLL lock, clear our unlock request */
4047       if (bclass->unlock_stop)
4048         bclass->unlock_stop (basesink);
4049
4050       /* we need preroll again and we set the flag before unlocking the clockid
4051        * because if the clockid is unlocked before a current buffer expired, we
4052        * can use that buffer to preroll with */
4053       basesink->need_preroll = TRUE;
4054
4055       if (basesink->clock_id) {
4056         gst_clock_id_unschedule (basesink->clock_id);
4057       }
4058
4059       /* if we don't have a preroll buffer we need to wait for a preroll and
4060        * return ASYNC. */
4061       if (!gst_base_sink_needs_preroll (basesink)) {
4062         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4063         basesink->playing_async = FALSE;
4064       } else {
4065         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4066           ret = GST_STATE_CHANGE_SUCCESS;
4067         } else {
4068           GST_DEBUG_OBJECT (basesink,
4069               "PLAYING to PAUSED, we are not prerolled");
4070           basesink->playing_async = TRUE;
4071           priv->commited = FALSE;
4072           if (priv->async_enabled) {
4073             GST_DEBUG_OBJECT (basesink, "doing async state change");
4074             ret = GST_STATE_CHANGE_ASYNC;
4075             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4076                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4077                     FALSE));
4078           }
4079         }
4080       }
4081       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4082           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4083
4084       gst_base_sink_reset_qos (basesink);
4085       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4086       break;
4087     case GST_STATE_CHANGE_PAUSED_TO_READY:
4088       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4089       /* start by reseting our position state with the object lock so that the
4090        * position query gets the right idea. We do this before we post the
4091        * messages so that the message handlers pick this up. */
4092       GST_OBJECT_LOCK (basesink);
4093       basesink->have_newsegment = FALSE;
4094       priv->current_sstart = -1;
4095       priv->current_sstop = -1;
4096       priv->have_latency = FALSE;
4097       GST_OBJECT_UNLOCK (basesink);
4098
4099       gst_base_sink_set_last_buffer (basesink, NULL);
4100
4101       if (!priv->commited) {
4102         if (priv->async_enabled) {
4103           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4104
4105           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4106               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4107                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4108
4109           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4110               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4111         }
4112         priv->commited = TRUE;
4113       } else {
4114         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4115       }
4116       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4117       break;
4118     case GST_STATE_CHANGE_READY_TO_NULL:
4119       if (bclass->stop) {
4120         if (!bclass->stop (basesink)) {
4121           GST_WARNING_OBJECT (basesink, "failed to stop");
4122         }
4123       }
4124       gst_base_sink_set_last_buffer (basesink, NULL);
4125       break;
4126     default:
4127       break;
4128   }
4129
4130   return ret;
4131
4132   /* ERRORS */
4133 start_failed:
4134   {
4135     GST_DEBUG_OBJECT (basesink, "failed to start");
4136     return GST_STATE_CHANGE_FAILURE;
4137   }
4138 activate_failed:
4139   {
4140     GST_DEBUG_OBJECT (basesink,
4141         "element failed to change states -- activation problem?");
4142     return GST_STATE_CHANGE_FAILURE;
4143   }
4144 }