libs/gst/base/gstbasesink.c: Query the total number of bytes when activating the...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 /* FIXME, some stuff in ABI.data and other in Private...
156  * Make up your mind please.
157  */
158 struct _GstBaseSinkPrivate
159 {
160   gint qos_enabled;             /* ATOMIC */
161   gboolean async_enabled;
162   GstClockTimeDiff ts_offset;
163   GstClockTime render_delay;
164
165   /* start, stop of current buffer, stream time, used to report position */
166   GstClockTime current_sstart;
167   GstClockTime current_sstop;
168
169   /* start, stop and jitter of current buffer, running time */
170   GstClockTime current_rstart;
171   GstClockTime current_rstop;
172   GstClockTimeDiff current_jitter;
173
174   /* EOS sync time in running time */
175   GstClockTime eos_rtime;
176
177   /* last buffer that arrived in time, running time */
178   GstClockTime last_in_time;
179   /* when the last buffer left the sink, running time */
180   GstClockTime last_left;
181
182   /* running averages go here these are done on running time */
183   GstClockTime avg_pt;
184   GstClockTime avg_duration;
185   gdouble avg_rate;
186
187   /* these are done on system time. avg_jitter and avg_render are
188    * compared to eachother to see if the rendering time takes a
189    * huge amount of the processing, If so we are flooded with
190    * buffers. */
191   GstClockTime last_left_systime;
192   GstClockTime avg_jitter;
193   GstClockTime start, stop;
194   GstClockTime avg_render;
195
196   /* number of rendered and dropped frames */
197   guint64 rendered;
198   guint64 dropped;
199
200   /* latency stuff */
201   GstClockTime latency;
202
203   /* if we already commited the state */
204   gboolean commited;
205
206   /* when we received EOS */
207   gboolean received_eos;
208
209   /* when we are prerolled and able to report latency */
210   gboolean have_latency;
211
212   /* the last buffer we prerolled or rendered. Useful for making snapshots */
213   GstBuffer *last_buffer;
214
215   /* caps for pull based scheduling */
216   GstCaps *pull_caps;
217
218   guint blocksize;
219 };
220
221 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
222
223 /* generic running average, this has a neutral window size */
224 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
225
226 /* the windows for these running averages are experimentally obtained.
227  * possitive values get averaged more while negative values use a small
228  * window so we can react faster to badness. */
229 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
230 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
231
232 /* BaseSink properties */
233
234 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
235 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
236
237 #define DEFAULT_PREROLL_QUEUE_LEN       0
238 #define DEFAULT_SYNC                    TRUE
239 #define DEFAULT_MAX_LATENESS            -1
240 #define DEFAULT_QOS                     FALSE
241 #define DEFAULT_ASYNC                   TRUE
242 #define DEFAULT_TS_OFFSET               0
243 #define DEFAULT_BLOCKSIZE               4096
244
245 enum
246 {
247   PROP_0,
248   PROP_PREROLL_QUEUE_LEN,
249   PROP_SYNC,
250   PROP_MAX_LATENESS,
251   PROP_QOS,
252   PROP_ASYNC,
253   PROP_TS_OFFSET,
254   PROP_LAST_BUFFER,
255   PROP_BLOCKSIZE,
256   PROP_LAST
257 };
258
259 static GstElementClass *parent_class = NULL;
260
261 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
262 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
263 static void gst_base_sink_finalize (GObject * object);
264
265 GType
266 gst_base_sink_get_type (void)
267 {
268   static GType base_sink_type = 0;
269
270   if (G_UNLIKELY (base_sink_type == 0)) {
271     static const GTypeInfo base_sink_info = {
272       sizeof (GstBaseSinkClass),
273       NULL,
274       NULL,
275       (GClassInitFunc) gst_base_sink_class_init,
276       NULL,
277       NULL,
278       sizeof (GstBaseSink),
279       0,
280       (GInstanceInitFunc) gst_base_sink_init,
281     };
282
283     base_sink_type = g_type_register_static (GST_TYPE_ELEMENT,
284         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
285   }
286   return base_sink_type;
287 }
288
289 static void gst_base_sink_set_property (GObject * object, guint prop_id,
290     const GValue * value, GParamSpec * pspec);
291 static void gst_base_sink_get_property (GObject * object, guint prop_id,
292     GValue * value, GParamSpec * pspec);
293
294 static gboolean gst_base_sink_send_event (GstElement * element,
295     GstEvent * event);
296 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
297
298 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
299 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
300 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
301     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
302 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
303     GstClockTime * start, GstClockTime * end);
304 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
305     GstPad * pad, gboolean flushing);
306 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
307     gboolean active);
308
309 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
310     GstStateChange transition);
311
312 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
313 static void gst_base_sink_loop (GstPad * pad);
314 static gboolean gst_base_sink_pad_activate (GstPad * pad);
315 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
316 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
317 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
318 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
319
320 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
321
322 /* check if an object was too late */
323 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
324     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
325     GstClockReturn status, GstClockTimeDiff jitter);
326
327 static void
328 gst_base_sink_class_init (GstBaseSinkClass * klass)
329 {
330   GObjectClass *gobject_class;
331   GstElementClass *gstelement_class;
332
333   gobject_class = G_OBJECT_CLASS (klass);
334   gstelement_class = GST_ELEMENT_CLASS (klass);
335
336   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
337       "basesink element");
338
339   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
340
341   parent_class = g_type_class_peek_parent (klass);
342
343   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
344   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
345   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
346
347   /* FIXME, this next value should be configured using an event from the
348    * upstream element, ie, the BUFFER_SIZE event. */
349   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
350       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
351           "Number of buffers to queue during preroll", 0, G_MAXUINT,
352           DEFAULT_PREROLL_QUEUE_LEN,
353           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
354
355   g_object_class_install_property (gobject_class, PROP_SYNC,
356       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
357           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358
359   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
360       g_param_spec_int64 ("max-lateness", "Max Lateness",
361           "Maximum number of nanoseconds that a buffer can be late before it "
362           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_QOS,
366       g_param_spec_boolean ("qos", "Qos",
367           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   /**
370    * GstBaseSink:async
371    *
372    * If set to #TRUE, the basesink will perform asynchronous state changes.
373    * When set to #FALSE, the sink will not signal the parent when it prerolls.
374    * Use this option when dealing with sparse streams or when synchronisation is
375    * not required.
376    *
377    * Since: 0.10.15
378    */
379   g_object_class_install_property (gobject_class, PROP_ASYNC,
380       g_param_spec_boolean ("async", "Async",
381           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   /**
384    * GstBaseSink:ts-offset
385    *
386    * Controls the final synchronisation, a negative value will render the buffer
387    * earlier while a positive value delays playback. This property can be 
388    * used to fix synchronisation in bad files.
389    *
390    * Since: 0.10.15
391    */
392   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
393       g_param_spec_int64 ("ts-offset", "TS Offset",
394           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
395           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
396
397   /**
398    * GstBaseSink:last-buffer
399    *
400    * The last buffer that arrived in the sink and was used for preroll or for
401    * rendering. This property can be used to generate thumbnails. This property
402    * can be NULL when the sink has not yet received a bufer.
403    *
404    * Since: 0.10.15
405    */
406   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
407       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
408           "The last buffer received in the sink", GST_TYPE_BUFFER,
409           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
410
411   /**
412    * GstBaseSink:blocksize
413    *
414    * The amount of bytes to pull when operating in pull mode.
415    *
416    * Since: 0.10.22
417    */
418   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
419       g_param_spec_uint ("blocksize", "Block size",
420           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
421           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422
423   gstelement_class->change_state =
424       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
425   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
426   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
427
428   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
429   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
430   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
431   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
432   klass->activate_pull =
433       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
434 }
435
436 static GstCaps *
437 gst_base_sink_pad_getcaps (GstPad * pad)
438 {
439   GstBaseSinkClass *bclass;
440   GstBaseSink *bsink;
441   GstCaps *caps = NULL;
442
443   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
444   bclass = GST_BASE_SINK_GET_CLASS (bsink);
445
446   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
447     /* if we are operating in pull mode we only accept the negotiated caps */
448     GST_OBJECT_LOCK (pad);
449     if ((caps = GST_PAD_CAPS (pad)))
450       gst_caps_ref (caps);
451     GST_OBJECT_UNLOCK (pad);
452   }
453   if (caps == NULL) {
454     if (bclass->get_caps)
455       caps = bclass->get_caps (bsink);
456
457     if (caps == NULL) {
458       GstPadTemplate *pad_template;
459
460       pad_template =
461           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
462           "sink");
463       if (pad_template != NULL) {
464         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
465       }
466     }
467   }
468   gst_object_unref (bsink);
469
470   return caps;
471 }
472
473 static gboolean
474 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
475 {
476   GstBaseSinkClass *bclass;
477   GstBaseSink *bsink;
478   gboolean res = TRUE;
479
480   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
481   bclass = GST_BASE_SINK_GET_CLASS (bsink);
482
483   if (res && bclass->set_caps)
484     res = bclass->set_caps (bsink, caps);
485
486   gst_object_unref (bsink);
487
488   return res;
489 }
490
491 static void
492 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
493 {
494   GstBaseSinkClass *bclass;
495   GstBaseSink *bsink;
496
497   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
498   bclass = GST_BASE_SINK_GET_CLASS (bsink);
499
500   if (bclass->fixate)
501     bclass->fixate (bsink, caps);
502
503   gst_object_unref (bsink);
504 }
505
506 static GstFlowReturn
507 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
508     GstCaps * caps, GstBuffer ** buf)
509 {
510   GstBaseSinkClass *bclass;
511   GstBaseSink *bsink;
512   GstFlowReturn result = GST_FLOW_OK;
513
514   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
515   bclass = GST_BASE_SINK_GET_CLASS (bsink);
516
517   if (bclass->buffer_alloc)
518     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
519   else
520     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
521
522   gst_object_unref (bsink);
523
524   return result;
525 }
526
527 static void
528 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
529 {
530   GstPadTemplate *pad_template;
531   GstBaseSinkPrivate *priv;
532
533   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
534
535   pad_template =
536       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
537   g_return_if_fail (pad_template != NULL);
538
539   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
540
541   gst_pad_set_getcaps_function (basesink->sinkpad,
542       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
543   gst_pad_set_setcaps_function (basesink->sinkpad,
544       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
545   gst_pad_set_fixatecaps_function (basesink->sinkpad,
546       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
547   gst_pad_set_bufferalloc_function (basesink->sinkpad,
548       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
549   gst_pad_set_activate_function (basesink->sinkpad,
550       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
551   gst_pad_set_activatepush_function (basesink->sinkpad,
552       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
553   gst_pad_set_activatepull_function (basesink->sinkpad,
554       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
555   gst_pad_set_event_function (basesink->sinkpad,
556       GST_DEBUG_FUNCPTR (gst_base_sink_event));
557   gst_pad_set_chain_function (basesink->sinkpad,
558       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
559   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
560
561   basesink->pad_mode = GST_ACTIVATE_NONE;
562   basesink->preroll_queue = g_queue_new ();
563   basesink->abidata.ABI.clip_segment = gst_segment_new ();
564   priv->have_latency = FALSE;
565
566   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
567   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
568
569   basesink->sync = DEFAULT_SYNC;
570   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
571   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
572   priv->async_enabled = DEFAULT_ASYNC;
573   priv->ts_offset = DEFAULT_TS_OFFSET;
574   priv->render_delay = 0;
575   priv->blocksize = DEFAULT_BLOCKSIZE;
576
577   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
578 }
579
580 static void
581 gst_base_sink_finalize (GObject * object)
582 {
583   GstBaseSink *basesink;
584
585   basesink = GST_BASE_SINK (object);
586
587   g_queue_free (basesink->preroll_queue);
588   gst_segment_free (basesink->abidata.ABI.clip_segment);
589
590   G_OBJECT_CLASS (parent_class)->finalize (object);
591 }
592
593 /**
594  * gst_base_sink_set_sync:
595  * @sink: the sink
596  * @sync: the new sync value.
597  *
598  * Configures @sink to synchronize on the clock or not. When
599  * @sync is FALSE, incomming samples will be played as fast as
600  * possible. If @sync is TRUE, the timestamps of the incomming
601  * buffers will be used to schedule the exact render time of its
602  * contents.
603  *
604  * Since: 0.10.4
605  */
606 void
607 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
608 {
609   g_return_if_fail (GST_IS_BASE_SINK (sink));
610
611   GST_OBJECT_LOCK (sink);
612   sink->sync = sync;
613   GST_OBJECT_UNLOCK (sink);
614 }
615
616 /**
617  * gst_base_sink_get_sync:
618  * @sink: the sink
619  *
620  * Checks if @sink is currently configured to synchronize against the
621  * clock.
622  *
623  * Returns: TRUE if the sink is configured to synchronize against the clock.
624  *
625  * Since: 0.10.4
626  */
627 gboolean
628 gst_base_sink_get_sync (GstBaseSink * sink)
629 {
630   gboolean res;
631
632   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
633
634   GST_OBJECT_LOCK (sink);
635   res = sink->sync;
636   GST_OBJECT_UNLOCK (sink);
637
638   return res;
639 }
640
641 /**
642  * gst_base_sink_set_max_lateness:
643  * @sink: the sink
644  * @max_lateness: the new max lateness value.
645  *
646  * Sets the new max lateness value to @max_lateness. This value is
647  * used to decide if a buffer should be dropped or not based on the
648  * buffer timestamp and the current clock time. A value of -1 means
649  * an unlimited time.
650  *
651  * Since: 0.10.4
652  */
653 void
654 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
655 {
656   g_return_if_fail (GST_IS_BASE_SINK (sink));
657
658   GST_OBJECT_LOCK (sink);
659   sink->abidata.ABI.max_lateness = max_lateness;
660   GST_OBJECT_UNLOCK (sink);
661 }
662
663 /**
664  * gst_base_sink_get_max_lateness:
665  * @sink: the sink
666  *
667  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
668  * more details.
669  *
670  * Returns: The maximum time in nanoseconds that a buffer can be late
671  * before it is dropped and not rendered. A value of -1 means an
672  * unlimited time.
673  *
674  * Since: 0.10.4
675  */
676 gint64
677 gst_base_sink_get_max_lateness (GstBaseSink * sink)
678 {
679   gint64 res;
680
681   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
682
683   GST_OBJECT_LOCK (sink);
684   res = sink->abidata.ABI.max_lateness;
685   GST_OBJECT_UNLOCK (sink);
686
687   return res;
688 }
689
690 /**
691  * gst_base_sink_set_qos_enabled:
692  * @sink: the sink
693  * @enabled: the new qos value.
694  *
695  * Configures @sink to send Quality-of-Service events upstream.
696  *
697  * Since: 0.10.5
698  */
699 void
700 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
701 {
702   g_return_if_fail (GST_IS_BASE_SINK (sink));
703
704   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
705 }
706
707 /**
708  * gst_base_sink_is_qos_enabled:
709  * @sink: the sink
710  *
711  * Checks if @sink is currently configured to send Quality-of-Service events
712  * upstream.
713  *
714  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
715  *
716  * Since: 0.10.5
717  */
718 gboolean
719 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
720 {
721   gboolean res;
722
723   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
724
725   res = g_atomic_int_get (&sink->priv->qos_enabled);
726
727   return res;
728 }
729
730 /**
731  * gst_base_sink_set_async_enabled:
732  * @sink: the sink
733  * @enabled: the new async value.
734  *
735  * Configures @sink to perform all state changes asynchronusly. When async is
736  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
737  * preroll buffer. This feature is usefull if the sink does not synchronize
738  * against the clock or when it is dealing with sparse streams.
739  *
740  * Since: 0.10.15
741  */
742 void
743 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
744 {
745   g_return_if_fail (GST_IS_BASE_SINK (sink));
746
747   GST_PAD_PREROLL_LOCK (sink->sinkpad);
748   sink->priv->async_enabled = enabled;
749   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
750   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
751 }
752
753 /**
754  * gst_base_sink_is_async_enabled:
755  * @sink: the sink
756  *
757  * Checks if @sink is currently configured to perform asynchronous state
758  * changes to PAUSED.
759  *
760  * Returns: TRUE if the sink is configured to perform asynchronous state
761  * changes.
762  *
763  * Since: 0.10.15
764  */
765 gboolean
766 gst_base_sink_is_async_enabled (GstBaseSink * sink)
767 {
768   gboolean res;
769
770   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
771
772   GST_PAD_PREROLL_LOCK (sink->sinkpad);
773   res = sink->priv->async_enabled;
774   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
775
776   return res;
777 }
778
779 /**
780  * gst_base_sink_set_ts_offset:
781  * @sink: the sink
782  * @offset: the new offset
783  *
784  * Adjust the synchronisation of @sink with @offset. A negative value will
785  * render buffers earlier than their timestamp. A positive value will delay
786  * rendering. This function can be used to fix playback of badly timestamped
787  * buffers.
788  *
789  * Since: 0.10.15
790  */
791 void
792 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
793 {
794   g_return_if_fail (GST_IS_BASE_SINK (sink));
795
796   GST_OBJECT_LOCK (sink);
797   sink->priv->ts_offset = offset;
798   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
799   GST_OBJECT_UNLOCK (sink);
800 }
801
802 /**
803  * gst_base_sink_get_ts_offset:
804  * @sink: the sink
805  *
806  * Get the synchronisation offset of @sink.
807  *
808  * Returns: The synchronisation offset.
809  *
810  * Since: 0.10.15
811  */
812 GstClockTimeDiff
813 gst_base_sink_get_ts_offset (GstBaseSink * sink)
814 {
815   GstClockTimeDiff res;
816
817   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
818
819   GST_OBJECT_LOCK (sink);
820   res = sink->priv->ts_offset;
821   GST_OBJECT_UNLOCK (sink);
822
823   return res;
824 }
825
826 /**
827  * gst_base_sink_get_last_buffer:
828  * @sink: the sink
829  *
830  * Get the last buffer that arrived in the sink and was used for preroll or for
831  * rendering. This property can be used to generate thumbnails.
832  *
833  * The #GstCaps on the buffer can be used to determine the type of the buffer.
834  * 
835  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
836  * NULL when no buffer has arrived in the sink yet or when the sink is not in
837  * PAUSED or PLAYING.
838  *
839  * Since: 0.10.15
840  */
841 GstBuffer *
842 gst_base_sink_get_last_buffer (GstBaseSink * sink)
843 {
844   GstBuffer *res;
845
846   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
847
848   GST_OBJECT_LOCK (sink);
849   if ((res = sink->priv->last_buffer))
850     gst_buffer_ref (res);
851   GST_OBJECT_UNLOCK (sink);
852
853   return res;
854 }
855
856 static void
857 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
858 {
859   GstBuffer *old;
860
861   if (buffer)
862     gst_buffer_ref (buffer);
863
864   GST_OBJECT_LOCK (sink);
865   old = sink->priv->last_buffer;
866   sink->priv->last_buffer = buffer;
867   GST_OBJECT_UNLOCK (sink);
868
869   if (old)
870     gst_buffer_unref (old);
871 }
872
873 /**
874  * gst_base_sink_get_latency:
875  * @sink: the sink
876  *
877  * Get the currently configured latency.
878  *
879  * Returns: The configured latency.
880  *
881  * Since: 0.10.12
882  */
883 GstClockTime
884 gst_base_sink_get_latency (GstBaseSink * sink)
885 {
886   GstClockTime res;
887
888   GST_OBJECT_LOCK (sink);
889   res = sink->priv->latency;
890   GST_OBJECT_UNLOCK (sink);
891
892   return res;
893 }
894
895 /**
896  * gst_base_sink_query_latency:
897  * @sink: the sink
898  * @live: if the sink is live
899  * @upstream_live: if an upstream element is live
900  * @min_latency: the min latency of the upstream elements
901  * @max_latency: the max latency of the upstream elements
902  *
903  * Query the sink for the latency parameters. The latency will be queried from
904  * the upstream elements. @live will be TRUE if @sink is configured to
905  * synchronize against the clock. @upstream_live will be TRUE if an upstream
906  * element is live. 
907  *
908  * If both @live and @upstream_live are TRUE, the sink will want to compensate
909  * for the latency introduced by the upstream elements by setting the
910  * @min_latency to a strictly possitive value.
911  *
912  * This function is mostly used by subclasses. 
913  *
914  * Returns: TRUE if the query succeeded.
915  *
916  * Since: 0.10.12
917  */
918 gboolean
919 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
920     gboolean * upstream_live, GstClockTime * min_latency,
921     GstClockTime * max_latency)
922 {
923   gboolean l, us_live, res, have_latency;
924   GstClockTime min, max, render_delay;
925   GstQuery *query;
926   GstClockTime us_min, us_max;
927
928   /* we are live when we sync to the clock */
929   GST_OBJECT_LOCK (sink);
930   l = sink->sync;
931   have_latency = sink->priv->have_latency;
932   render_delay = sink->priv->render_delay;
933   GST_OBJECT_UNLOCK (sink);
934
935   /* assume no latency */
936   min = 0;
937   max = -1;
938   us_live = FALSE;
939
940   if (have_latency) {
941     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
942     /* we are ready for a latency query this is when we preroll or when we are
943      * not async. */
944     query = gst_query_new_latency ();
945
946     /* ask the peer for the latency */
947     if ((res = gst_base_sink_peer_query (sink, query))) {
948       /* get upstream min and max latency */
949       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
950
951       if (us_live) {
952         /* upstream live, use its latency, subclasses should use these
953          * values to create the complete latency. */
954         min = us_min;
955         max = us_max;
956       }
957       if (l) {
958         /* we need to add the render delay if we are live */
959         if (min != -1)
960           min += render_delay;
961         if (max != -1)
962           max += render_delay;
963       }
964     }
965     gst_query_unref (query);
966   } else {
967     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
968     res = FALSE;
969   }
970
971   /* not live, we tried to do the query, if it failed we return TRUE anyway */
972   if (!res) {
973     if (!l) {
974       res = TRUE;
975       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
976     } else {
977       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
978     }
979   }
980
981   if (res) {
982     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
983         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
984         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
985
986     if (live)
987       *live = l;
988     if (upstream_live)
989       *upstream_live = us_live;
990     if (min_latency)
991       *min_latency = min;
992     if (max_latency)
993       *max_latency = max;
994   }
995   return res;
996 }
997
998 /**
999  * gst_base_sink_set_render_delay:
1000  * @sink: a #GstBaseSink
1001  * @delay: the new delay
1002  *
1003  * Set the render delay in @sink to @delay. The render delay is the time 
1004  * between actual rendering of a buffer and its synchronisation time. Some
1005  * devices might delay media rendering which can be compensated for with this
1006  * function. 
1007  *
1008  * After calling this function, this sink will report additional latency and
1009  * other sinks will adjust their latency to delay the rendering of their media.
1010  *
1011  * This function is usually called by subclasses.
1012  *
1013  * Since: 0.10.21
1014  */
1015 void
1016 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1017 {
1018   g_return_if_fail (GST_IS_BASE_SINK (sink));
1019
1020   GST_OBJECT_LOCK (sink);
1021   sink->priv->render_delay = delay;
1022   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1023       GST_TIME_ARGS (delay));
1024   GST_OBJECT_UNLOCK (sink);
1025 }
1026
1027 /**
1028  * gst_base_sink_get_render_delay:
1029  * @sink: a #GstBaseSink
1030  *
1031  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1032  * information about the render delay.
1033  *
1034  * Returns: the render delay of @sink.
1035  *
1036  * Since: 0.10.21
1037  */
1038 GstClockTime
1039 gst_base_sink_get_render_delay (GstBaseSink * sink)
1040 {
1041   GstClockTimeDiff res;
1042
1043   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1044
1045   GST_OBJECT_LOCK (sink);
1046   res = sink->priv->render_delay;
1047   GST_OBJECT_UNLOCK (sink);
1048
1049   return res;
1050 }
1051
1052 /**
1053  * gst_base_sink_set_blocksize:
1054  * @sink: a #GstBaseSink
1055  * @blocksize: the blocksize in bytes
1056  *
1057  * Set the number of bytes that the sink will pull when it is operating in pull
1058  * mode.
1059  *
1060  * Since: 0.10.22
1061  */
1062 void
1063 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1064 {
1065   g_return_if_fail (GST_IS_BASE_SINK (sink));
1066
1067   GST_OBJECT_LOCK (sink);
1068   sink->priv->blocksize = blocksize;
1069   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1070   GST_OBJECT_UNLOCK (sink);
1071 }
1072
1073 /**
1074  * gst_base_sink_get_blocksize:
1075  * @sink: a #GstBaseSink
1076  *
1077  * Get the number of bytes that the sink will pull when it is operating in pull
1078  * mode.
1079  *
1080  * Returns: the number of bytes @sink will pull in pull mode.
1081  *
1082  * Since: 0.10.22
1083  */
1084 guint
1085 gst_base_sink_get_blocksize (GstBaseSink * sink)
1086 {
1087   guint res;
1088
1089   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1090
1091   GST_OBJECT_LOCK (sink);
1092   res = sink->priv->blocksize;
1093   GST_OBJECT_UNLOCK (sink);
1094
1095   return res;
1096 }
1097
1098 static void
1099 gst_base_sink_set_property (GObject * object, guint prop_id,
1100     const GValue * value, GParamSpec * pspec)
1101 {
1102   GstBaseSink *sink = GST_BASE_SINK (object);
1103
1104   switch (prop_id) {
1105     case PROP_PREROLL_QUEUE_LEN:
1106       /* preroll lock necessary to serialize with finish_preroll */
1107       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1108       sink->preroll_queue_max_len = g_value_get_uint (value);
1109       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1110       break;
1111     case PROP_SYNC:
1112       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1113       break;
1114     case PROP_MAX_LATENESS:
1115       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1116       break;
1117     case PROP_QOS:
1118       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1119       break;
1120     case PROP_ASYNC:
1121       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1122       break;
1123     case PROP_TS_OFFSET:
1124       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1125       break;
1126     case PROP_BLOCKSIZE:
1127       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1128       break;
1129     default:
1130       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1131       break;
1132   }
1133 }
1134
1135 static void
1136 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1137     GParamSpec * pspec)
1138 {
1139   GstBaseSink *sink = GST_BASE_SINK (object);
1140
1141   switch (prop_id) {
1142     case PROP_PREROLL_QUEUE_LEN:
1143       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1144       g_value_set_uint (value, sink->preroll_queue_max_len);
1145       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1146       break;
1147     case PROP_SYNC:
1148       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1149       break;
1150     case PROP_MAX_LATENESS:
1151       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1152       break;
1153     case PROP_QOS:
1154       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1155       break;
1156     case PROP_ASYNC:
1157       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1158       break;
1159     case PROP_TS_OFFSET:
1160       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1161       break;
1162     case PROP_LAST_BUFFER:
1163       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1164       break;
1165     case PROP_BLOCKSIZE:
1166       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1167       break;
1168     default:
1169       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1170       break;
1171   }
1172 }
1173
1174
1175 static GstCaps *
1176 gst_base_sink_get_caps (GstBaseSink * sink)
1177 {
1178   return NULL;
1179 }
1180
1181 static gboolean
1182 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1183 {
1184   return TRUE;
1185 }
1186
1187 static GstFlowReturn
1188 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1189     GstCaps * caps, GstBuffer ** buf)
1190 {
1191   *buf = NULL;
1192   return GST_FLOW_OK;
1193 }
1194
1195 /* with PREROLL_LOCK, STREAM_LOCK */
1196 static void
1197 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1198 {
1199   GstMiniObject *obj;
1200
1201   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1202   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1203     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1204     gst_mini_object_unref (obj);
1205   }
1206   /* we can't have EOS anymore now */
1207   basesink->eos = FALSE;
1208   basesink->priv->received_eos = FALSE;
1209   basesink->have_preroll = FALSE;
1210   basesink->eos_queued = FALSE;
1211   basesink->preroll_queued = 0;
1212   basesink->buffers_queued = 0;
1213   basesink->events_queued = 0;
1214   /* can't report latency anymore until we preroll again */
1215   if (basesink->priv->async_enabled) {
1216     GST_OBJECT_LOCK (basesink);
1217     basesink->priv->have_latency = FALSE;
1218     GST_OBJECT_UNLOCK (basesink);
1219   }
1220   /* and signal any waiters now */
1221   GST_PAD_PREROLL_SIGNAL (pad);
1222 }
1223
1224 /* with STREAM_LOCK, configures given segment with the event information. */
1225 static void
1226 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1227     GstEvent * event, GstSegment * segment)
1228 {
1229   gboolean update;
1230   gdouble rate, arate;
1231   GstFormat format;
1232   gint64 start;
1233   gint64 stop;
1234   gint64 time;
1235
1236   /* the newsegment event is needed to bring the buffer timestamps to the
1237    * stream time and to drop samples outside of the playback segment. */
1238   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1239       &start, &stop, &time);
1240
1241   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1242    * We protect with the OBJECT_LOCK so that we can use the values to
1243    * safely answer a POSITION query. */
1244   GST_OBJECT_LOCK (basesink);
1245   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1246       stop, time);
1247
1248   if (format == GST_FORMAT_TIME) {
1249     GST_DEBUG_OBJECT (basesink,
1250         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1251         "format GST_FORMAT_TIME, "
1252         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1253         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1254         update, rate, arate, GST_TIME_ARGS (segment->start),
1255         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1256         GST_TIME_ARGS (segment->accum));
1257   } else {
1258     GST_DEBUG_OBJECT (basesink,
1259         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1260         "format %d, "
1261         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1262         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1263         segment->format, segment->start, segment->stop, segment->time,
1264         segment->accum);
1265   }
1266   GST_OBJECT_UNLOCK (basesink);
1267 }
1268
1269 /* with PREROLL_LOCK, STREAM_LOCK */
1270 static gboolean
1271 gst_base_sink_commit_state (GstBaseSink * basesink)
1272 {
1273   /* commit state and proceed to next pending state */
1274   GstState current, next, pending, post_pending;
1275   gboolean post_paused = FALSE;
1276   gboolean post_async_done = FALSE;
1277   gboolean post_playing = FALSE;
1278   gboolean sync;
1279
1280   /* we are certainly not playing async anymore now */
1281   basesink->playing_async = FALSE;
1282
1283   GST_OBJECT_LOCK (basesink);
1284   current = GST_STATE (basesink);
1285   next = GST_STATE_NEXT (basesink);
1286   pending = GST_STATE_PENDING (basesink);
1287   post_pending = pending;
1288   sync = basesink->sync;
1289
1290   switch (pending) {
1291     case GST_STATE_PLAYING:
1292     {
1293       GstBaseSinkClass *bclass;
1294       GstStateChangeReturn ret;
1295
1296       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1297
1298       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1299
1300       basesink->need_preroll = FALSE;
1301       post_async_done = TRUE;
1302       basesink->priv->commited = TRUE;
1303       post_playing = TRUE;
1304       /* post PAUSED too when we were READY */
1305       if (current == GST_STATE_READY) {
1306         post_paused = TRUE;
1307       }
1308
1309       /* make sure we notify the subclass of async playing */
1310       if (bclass->async_play) {
1311         ret = bclass->async_play (basesink);
1312         if (ret == GST_STATE_CHANGE_FAILURE)
1313           goto async_failed;
1314       }
1315       break;
1316     }
1317     case GST_STATE_PAUSED:
1318       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1319       post_paused = TRUE;
1320       post_async_done = TRUE;
1321       basesink->priv->commited = TRUE;
1322       post_pending = GST_STATE_VOID_PENDING;
1323       break;
1324     case GST_STATE_READY:
1325     case GST_STATE_NULL:
1326       goto stopping;
1327     case GST_STATE_VOID_PENDING:
1328       goto nothing_pending;
1329     default:
1330       break;
1331   }
1332
1333   /* we can report latency queries now */
1334   basesink->priv->have_latency = TRUE;
1335
1336   GST_STATE (basesink) = pending;
1337   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1338   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1339   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1340   GST_OBJECT_UNLOCK (basesink);
1341
1342   if (post_paused) {
1343     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1344     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1345         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1346             current, next, post_pending));
1347   }
1348   if (post_async_done) {
1349     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1350     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1351         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1352   }
1353   if (post_playing) {
1354     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1355     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1356         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1357             next, pending, GST_STATE_VOID_PENDING));
1358   }
1359
1360   GST_STATE_BROADCAST (basesink);
1361
1362   return TRUE;
1363
1364 nothing_pending:
1365   {
1366     /* Depending on the state, set our vars. We get in this situation when the
1367      * state change function got a change to update the state vars before the
1368      * streaming thread did. This is fine but we need to make sure that we
1369      * update the need_preroll var since it was TRUE when we got here and might
1370      * become FALSE if we got to PLAYING. */
1371     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1372         gst_element_state_get_name (current));
1373     switch (current) {
1374       case GST_STATE_PLAYING:
1375         basesink->need_preroll = FALSE;
1376         break;
1377       case GST_STATE_PAUSED:
1378         basesink->need_preroll = TRUE;
1379         break;
1380       default:
1381         basesink->need_preroll = FALSE;
1382         basesink->flushing = TRUE;
1383         break;
1384     }
1385     /* we can report latency queries now */
1386     basesink->priv->have_latency = TRUE;
1387     GST_OBJECT_UNLOCK (basesink);
1388     return TRUE;
1389   }
1390 stopping:
1391   {
1392     /* app is going to READY */
1393     GST_DEBUG_OBJECT (basesink, "stopping");
1394     basesink->need_preroll = FALSE;
1395     basesink->flushing = TRUE;
1396     GST_OBJECT_UNLOCK (basesink);
1397     return FALSE;
1398   }
1399 async_failed:
1400   {
1401     GST_DEBUG_OBJECT (basesink, "async commit failed");
1402     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1403     GST_OBJECT_UNLOCK (basesink);
1404     return FALSE;
1405   }
1406 }
1407
1408
1409 /* with STREAM_LOCK, PREROLL_LOCK
1410  *
1411  * Returns TRUE if the object needs synchronisation and takes therefore
1412  * part in prerolling.
1413  *
1414  * rsstart/rsstop contain the start/stop in stream time.
1415  * rrstart/rrstop contain the start/stop in running time.
1416  */
1417 static gboolean
1418 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1419     GstClockTime * rsstart, GstClockTime * rsstop,
1420     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1421     GstSegment * segment)
1422 {
1423   GstBaseSinkClass *bclass;
1424   GstBuffer *buffer;
1425   GstClockTime start, stop;     /* raw start/stop timestamps */
1426   gint64 cstart, cstop;         /* clipped raw timestamps */
1427   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1428   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1429   GstFormat format;
1430   GstBaseSinkPrivate *priv;
1431
1432   priv = basesink->priv;
1433
1434   /* start with nothing */
1435   start = stop = sstart = sstop = rstart = rstop = -1;
1436
1437   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1438     GstEvent *event = GST_EVENT_CAST (obj);
1439
1440     switch (GST_EVENT_TYPE (event)) {
1441         /* EOS event needs syncing */
1442       case GST_EVENT_EOS:
1443       {
1444         if (basesink->segment.rate >= 0.0) {
1445           sstart = sstop = priv->current_sstop;
1446           if (sstart == -1) {
1447             /* we have not seen a buffer yet, use the segment values */
1448             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1449                 basesink->segment.format, basesink->segment.stop);
1450           }
1451         } else {
1452           sstart = sstop = priv->current_sstart;
1453           if (sstart == -1) {
1454             /* we have not seen a buffer yet, use the segment values */
1455             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1456                 basesink->segment.format, basesink->segment.start);
1457           }
1458         }
1459
1460         rstart = rstop = priv->eos_rtime;
1461         *do_sync = rstart != -1;
1462         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1463             GST_TIME_ARGS (rstart));
1464         goto done;
1465       }
1466       default:
1467         /* other events do not need syncing */
1468         /* FIXME, maybe NEWSEGMENT might need synchronisation
1469          * since the POSITION query depends on accumulated times and
1470          * we cannot accumulate the current segment before the previous
1471          * one completed.
1472          */
1473         return FALSE;
1474     }
1475   }
1476
1477   /* else do buffer sync code */
1478   buffer = GST_BUFFER_CAST (obj);
1479
1480   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1481
1482   /* just get the times to see if we need syncing */
1483   if (bclass->get_times)
1484     bclass->get_times (basesink, buffer, &start, &stop);
1485
1486   if (start == -1) {
1487     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1488     *do_sync = FALSE;
1489   } else {
1490     *do_sync = TRUE;
1491   }
1492
1493   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1494       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1495       GST_TIME_ARGS (stop), *do_sync);
1496
1497   /* collect segment and format for code clarity */
1498   format = segment->format;
1499
1500   /* no timestamp clipping if we did not * get a TIME segment format */
1501   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1502     cstart = start;
1503     cstop = stop;
1504     /* do running and stream time in TIME format */
1505     format = GST_FORMAT_TIME;
1506     goto do_times;
1507   }
1508
1509   /* clip */
1510   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1511               (gint64) start, (gint64) stop, &cstart, &cstop)))
1512     goto out_of_segment;
1513
1514   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1515     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1516         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1517         GST_TIME_ARGS (cstop));
1518   }
1519
1520   /* set last stop position */
1521   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1522     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1523   else
1524     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1525
1526 do_times:
1527   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1528    * upstream is behaving very badly */
1529   sstart = gst_segment_to_stream_time (segment, format, cstart);
1530   sstop = gst_segment_to_stream_time (segment, format, cstop);
1531   rstart = gst_segment_to_running_time (segment, format, cstart);
1532   rstop = gst_segment_to_running_time (segment, format, cstop);
1533
1534 done:
1535   /* save times */
1536   *rsstart = sstart;
1537   *rsstop = sstop;
1538   *rrstart = rstart;
1539   *rrstop = rstop;
1540
1541   /* buffers and EOS always need syncing and preroll */
1542   return TRUE;
1543
1544   /* special cases */
1545 out_of_segment:
1546   {
1547     /* should not happen since we clip them in the chain function already, 
1548      * we return FALSE so that we don't try to sync on it. */
1549     GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
1550         (NULL), ("unexpected buffer out of segment found."));
1551     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1552     return FALSE;
1553   }
1554 }
1555
1556 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1557  * adjust a timestamp with the latency and timestamp offset */
1558 static GstClockTime
1559 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1560 {
1561   GstClockTimeDiff ts_offset;
1562
1563   /* don't do anything funny with invalid timestamps */
1564   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1565     return time;
1566
1567   time += basesink->priv->latency;
1568
1569   /* apply offset, be carefull for underflows */
1570   ts_offset = basesink->priv->ts_offset;
1571   if (ts_offset < 0) {
1572     ts_offset = -ts_offset;
1573     if (ts_offset < time)
1574       time -= ts_offset;
1575     else
1576       time = 0;
1577   } else
1578     time += ts_offset;
1579
1580   return time;
1581 }
1582
1583 /* gst_base_sink_wait_clock:
1584  * @sink: the sink
1585  * @time: the running_time to be reached
1586  * @jitter: the jitter to be filled with time diff (can be NULL)
1587  *
1588  * This function will block until @time is reached. It is usually called by
1589  * subclasses that use their own internal synchronisation.
1590  *
1591  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1592  * returned. Likewise, if synchronisation is disabled in the element or there
1593  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1594  *
1595  * This function should only be called with the PREROLL_LOCK held, like when
1596  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1597  * the ::render vmethod.
1598  *
1599  * The @time argument should be the running_time of when this method should
1600  * return and is not adjusted with any latency or offset configured in the
1601  * sink.
1602  *
1603  * Since 0.10.20
1604  *
1605  * Returns: #GstClockReturn
1606  */
1607 GstClockReturn
1608 gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time,
1609     GstClockTimeDiff * jitter)
1610 {
1611   GstClockID id;
1612   GstClockReturn ret;
1613   GstClock *clock;
1614
1615   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1616     goto invalid_time;
1617
1618   GST_OBJECT_LOCK (basesink);
1619   if (G_UNLIKELY (!basesink->sync))
1620     goto no_sync;
1621
1622   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
1623     goto no_clock;
1624
1625   /* add base_time to running_time to get the time against the clock */
1626   time += GST_ELEMENT_CAST (basesink)->base_time;
1627
1628   id = gst_clock_new_single_shot_id (clock, time);
1629   GST_OBJECT_UNLOCK (basesink);
1630
1631   /* A blocking wait is performed on the clock. We save the ClockID
1632    * so we can unlock the entry at any time. While we are blocking, we 
1633    * release the PREROLL_LOCK so that other threads can interrupt the
1634    * entry. */
1635   basesink->clock_id = id;
1636   /* release the preroll lock while waiting */
1637   GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1638
1639   ret = gst_clock_id_wait (id, jitter);
1640
1641   GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1642   gst_clock_id_unref (id);
1643   basesink->clock_id = NULL;
1644
1645   return ret;
1646
1647   /* no syncing needed */
1648 invalid_time:
1649   {
1650     GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed");
1651     return GST_CLOCK_BADTIME;
1652   }
1653 no_sync:
1654   {
1655     GST_DEBUG_OBJECT (basesink, "sync disabled");
1656     GST_OBJECT_UNLOCK (basesink);
1657     return GST_CLOCK_BADTIME;
1658   }
1659 no_clock:
1660   {
1661     GST_DEBUG_OBJECT (basesink, "no clock, can't sync");
1662     GST_OBJECT_UNLOCK (basesink);
1663     return GST_CLOCK_BADTIME;
1664   }
1665 }
1666
1667 /**
1668  * gst_base_sink_wait_preroll:
1669  * @sink: the sink
1670  *
1671  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1672  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1673  * this method before continuing to render the remaining data.
1674  *
1675  * This function will block until a state change to PLAYING happens (in which
1676  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1677  * to a state change to READY or a FLUSH event (in which case this function
1678  * returns #GST_FLOW_WRONG_STATE).
1679  *
1680  * This function should only be called with the PREROLL_LOCK held, like in the
1681  * render function.
1682  *
1683  * Since: 0.10.11
1684  *
1685  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1686  * continue. Any other return value should be returned from the render vmethod.
1687  */
1688 GstFlowReturn
1689 gst_base_sink_wait_preroll (GstBaseSink * sink)
1690 {
1691   sink->have_preroll = TRUE;
1692   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1693   /* block until the state changes, or we get a flush, or something */
1694   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1695   sink->have_preroll = FALSE;
1696   if (G_UNLIKELY (sink->flushing))
1697     goto stopping;
1698   GST_DEBUG_OBJECT (sink, "continue after preroll");
1699
1700   return GST_FLOW_OK;
1701
1702   /* ERRORS */
1703 stopping:
1704   {
1705     GST_DEBUG_OBJECT (sink, "preroll interrupted");
1706     return GST_FLOW_WRONG_STATE;
1707   }
1708 }
1709
1710 /**
1711  * gst_base_sink_do_preroll:
1712  * @sink: the sink
1713  * @obj: the object that caused the preroll
1714  *
1715  * If the @sink spawns its own thread for pulling buffers from upstream it
1716  * should call this method after it has pulled a buffer. If the element needed
1717  * to preroll, this function will perform the preroll and will then block
1718  * until the element state is changed.
1719  *
1720  * This function should be called with the PREROLL_LOCK held.
1721  *
1722  * Since 0.10.22
1723  *
1724  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1725  * continue. Any other return value should be returned from the render vmethod.
1726  */
1727 GstFlowReturn
1728 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
1729 {
1730   GstFlowReturn ret;
1731
1732   while (G_UNLIKELY (sink->need_preroll)) {
1733     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
1734
1735     if (G_LIKELY (sink->playing_async)) {
1736       /* commit state */
1737       if (G_UNLIKELY (!gst_base_sink_commit_state (sink)))
1738         goto stopping;
1739     }
1740
1741     /* need to recheck here because the commit state could have
1742      * made us not need the preroll anymore */
1743     if (G_LIKELY (sink->need_preroll)) {
1744       /* block until the state changes, or we get a flush, or something */
1745       ret = gst_base_sink_wait_preroll (sink);
1746       if (ret != GST_FLOW_OK)
1747         goto flushing;
1748     }
1749   }
1750   return GST_FLOW_OK;
1751
1752   /* ERRORS */
1753 flushing:
1754   {
1755     GST_DEBUG_OBJECT (sink, "we are flushing");
1756     return ret;
1757   }
1758 stopping:
1759   {
1760     GST_DEBUG_OBJECT (sink, "stopping while commiting state");
1761     return GST_FLOW_WRONG_STATE;
1762   }
1763 }
1764
1765 /**
1766  * gst_base_sink_wait_eos:
1767  * @sink: the sink
1768  * @time: the running_time to be reached
1769  * @jitter: the jitter to be filled with time diff (can be NULL)
1770  *
1771  * This function will block until @time is reached. It is usually called by
1772  * subclasses that use their own internal synchronisation but want to let the
1773  * EOS be handled by the base class.
1774  *
1775  * This function should only be called with the PREROLL_LOCK held, like when
1776  * receiving an EOS event in the ::event vmethod.
1777  *
1778  * The @time argument should be the running_time of when the EOS should happen
1779  * and will be adjusted with any latency and offset configured in the sink.
1780  *
1781  * Since 0.10.15
1782  *
1783  * Returns: #GstFlowReturn
1784  */
1785 GstFlowReturn
1786 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
1787     GstClockTimeDiff * jitter)
1788 {
1789   GstClockReturn status;
1790   GstFlowReturn ret;
1791
1792   do {
1793     GstClockTime stime;
1794
1795     GST_DEBUG_OBJECT (sink, "checking preroll");
1796
1797     /* first wait for the playing state before we can continue */
1798     if (G_UNLIKELY (sink->need_preroll)) {
1799       ret = gst_base_sink_wait_preroll (sink);
1800       if (ret != GST_FLOW_OK)
1801         goto flushing;
1802     }
1803
1804     /* preroll done, we can sync since we are in PLAYING now. */
1805     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
1806         GST_TIME_FORMAT, GST_TIME_ARGS (time));
1807
1808     /* compensate for latency and ts_offset. We don't adjust for render delay
1809      * because we don't interact with the device on EOS normally. */
1810     stime = gst_base_sink_adjust_time (sink, time);
1811
1812     /* wait for the clock, this can be interrupted because we got shut down or 
1813      * we PAUSED. */
1814     status = gst_base_sink_wait_clock (sink, stime, jitter);
1815
1816     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
1817
1818     /* invalid time, no clock or sync disabled, just continue then */
1819     if (status == GST_CLOCK_BADTIME)
1820       break;
1821
1822     /* waiting could have been interrupted and we can be flushing now */
1823     if (G_UNLIKELY (sink->flushing))
1824       goto flushing;
1825
1826     /* retry if we got unscheduled, which means we did not reach the timeout
1827      * yet. if some other error occures, we continue. */
1828   } while (status == GST_CLOCK_UNSCHEDULED);
1829
1830   GST_DEBUG_OBJECT (sink, "end of stream");
1831
1832   return GST_FLOW_OK;
1833
1834   /* ERRORS */
1835 flushing:
1836   {
1837     GST_DEBUG_OBJECT (sink, "we are flushing");
1838     return GST_FLOW_WRONG_STATE;
1839   }
1840 }
1841
1842 /* with STREAM_LOCK, PREROLL_LOCK
1843  *
1844  * Make sure we are in PLAYING and synchronize an object to the clock.
1845  *
1846  * If we need preroll, we are not in PLAYING. We try to commit the state
1847  * if needed and then block if we still are not PLAYING.
1848  *
1849  * We start waiting on the clock in PLAYING. If we got interrupted, we
1850  * immediatly try to re-preroll.
1851  *
1852  * Some objects do not need synchronisation (most events) and so this function
1853  * immediatly returns GST_FLOW_OK.
1854  *
1855  * for objects that arrive later than max-lateness to be synchronized to the 
1856  * clock have the @late boolean set to TRUE.
1857  *
1858  * This function keeps a running average of the jitter (the diff between the
1859  * clock time and the requested sync time). The jitter is negative for
1860  * objects that arrive in time and positive for late buffers.
1861  *
1862  * does not take ownership of obj.
1863  */
1864 static GstFlowReturn
1865 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
1866     GstMiniObject * obj, gboolean * late)
1867 {
1868   GstClockTimeDiff jitter;
1869   gboolean syncable;
1870   GstClockReturn status = GST_CLOCK_OK;
1871   GstClockTime rstart, rstop, sstart, sstop, stime;
1872   gboolean do_sync;
1873   GstBaseSinkPrivate *priv;
1874
1875   priv = basesink->priv;
1876
1877   sstart = sstop = rstart = rstop = -1;
1878   do_sync = TRUE;
1879
1880   priv->current_rstart = -1;
1881
1882   /* get timing information for this object against the render segment */
1883   syncable = gst_base_sink_get_sync_times (basesink, obj,
1884       &sstart, &sstop, &rstart, &rstop, &do_sync, &basesink->segment);
1885
1886   /* a syncable object needs to participate in preroll and
1887    * clocking. All buffers and EOS are syncable. */
1888   if (G_UNLIKELY (!syncable))
1889     goto not_syncable;
1890
1891   /* store timing info for current object */
1892   priv->current_rstart = rstart;
1893   priv->current_rstop = (rstop != -1 ? rstop : rstart);
1894   /* save sync time for eos when the previous object needed sync */
1895   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
1896
1897 again:
1898   /* first do preroll, this makes sure we commit our state
1899    * to PAUSED and can continue to PLAYING. We cannot perform
1900    * any clock sync in PAUSED because there is no clock. 
1901    */
1902   while (G_UNLIKELY (basesink->need_preroll)) {
1903     GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
1904
1905     if (G_LIKELY (basesink->playing_async)) {
1906       /* commit state */
1907       if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
1908         goto stopping;
1909     }
1910
1911     /* need to recheck here because the commit state could have
1912      * made us not need the preroll anymore */
1913     if (G_LIKELY (basesink->need_preroll)) {
1914       /* block until the state changes, or we get a flush, or something */
1915       if (gst_base_sink_wait_preroll (basesink) != GST_FLOW_OK)
1916         goto flushing;
1917     }
1918   }
1919
1920   /* After rendering we store the position of the last buffer so that we can use
1921    * it to report the position. We need to take the lock here. */
1922   GST_OBJECT_LOCK (basesink);
1923   priv->current_sstart = sstart;
1924   priv->current_sstop = (sstop != -1 ? sstop : sstart);
1925   GST_OBJECT_UNLOCK (basesink);
1926
1927   if (!do_sync)
1928     goto done;
1929
1930   /* adjust for latency */
1931   stime = gst_base_sink_adjust_time (basesink, rstart);
1932
1933   /* adjust for render-delay, avoid underflows */
1934   if (stime != -1) {
1935     if (stime > priv->render_delay)
1936       stime -= priv->render_delay;
1937     else
1938       stime = 0;
1939   }
1940
1941   /* preroll done, we can sync since we are in PLAYING now. */
1942   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
1943       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
1944       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
1945
1946   /* This function will return immediatly if start == -1, no clock
1947    * or sync is disabled with GST_CLOCK_BADTIME. */
1948   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
1949
1950   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
1951
1952   /* invalid time, no clock or sync disabled, just render */
1953   if (status == GST_CLOCK_BADTIME)
1954     goto done;
1955
1956   /* waiting could have been interrupted and we can be flushing now */
1957   if (G_UNLIKELY (basesink->flushing))
1958     goto flushing;
1959
1960   /* check for unlocked by a state change, we are not flushing so
1961    * we can try to preroll on the current buffer. */
1962   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
1963     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
1964     goto again;
1965   }
1966
1967   /* successful syncing done, record observation */
1968   priv->current_jitter = jitter;
1969
1970   /* check if the object should be dropped */
1971   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
1972       status, jitter);
1973
1974 done:
1975   return GST_FLOW_OK;
1976
1977   /* ERRORS */
1978 not_syncable:
1979   {
1980     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
1981     return GST_FLOW_OK;
1982   }
1983 flushing:
1984   {
1985     GST_DEBUG_OBJECT (basesink, "we are flushing");
1986     return GST_FLOW_WRONG_STATE;
1987   }
1988 stopping:
1989   {
1990     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
1991     return GST_FLOW_WRONG_STATE;
1992   }
1993 }
1994
1995 static gboolean
1996 gst_base_sink_send_qos (GstBaseSink * basesink,
1997     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
1998 {
1999   GstEvent *event;
2000   gboolean res;
2001
2002   /* generate Quality-of-Service event */
2003   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2004       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2005       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2006
2007   event = gst_event_new_qos (proportion, diff, time);
2008
2009   /* send upstream */
2010   res = gst_pad_push_event (basesink->sinkpad, event);
2011
2012   return res;
2013 }
2014
2015 static void
2016 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2017 {
2018   GstBaseSinkPrivate *priv;
2019   GstClockTime start, stop;
2020   GstClockTimeDiff jitter;
2021   GstClockTime pt, entered, left;
2022   GstClockTime duration;
2023   gdouble rate;
2024
2025   priv = sink->priv;
2026
2027   start = priv->current_rstart;
2028
2029   /* if Quality-of-Service disabled, do nothing */
2030   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2031     return;
2032
2033   stop = priv->current_rstop;
2034   jitter = priv->current_jitter;
2035
2036   if (jitter < 0) {
2037     /* this is the time the buffer entered the sink */
2038     if (start < -jitter)
2039       entered = 0;
2040     else
2041       entered = start + jitter;
2042     left = start;
2043   } else {
2044     /* this is the time the buffer entered the sink */
2045     entered = start + jitter;
2046     /* this is the time the buffer left the sink */
2047     left = start + jitter;
2048   }
2049
2050   /* calculate duration of the buffer */
2051   if (stop != -1)
2052     duration = stop - start;
2053   else
2054     duration = -1;
2055
2056   /* if we have the time when the last buffer left us, calculate
2057    * processing time */
2058   if (priv->last_left != -1) {
2059     if (entered > priv->last_left) {
2060       pt = entered - priv->last_left;
2061     } else {
2062       pt = 0;
2063     }
2064   } else {
2065     pt = priv->avg_pt;
2066   }
2067
2068   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2069       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2070       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2071       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2072       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2073       jitter);
2074
2075   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2076       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2077       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2078       priv->avg_rate);
2079
2080   /* collect running averages. for first observations, we copy the
2081    * values */
2082   if (priv->avg_duration == -1)
2083     priv->avg_duration = duration;
2084   else
2085     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2086
2087   if (priv->avg_pt == -1)
2088     priv->avg_pt = pt;
2089   else
2090     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2091
2092   if (priv->avg_duration != 0)
2093     rate =
2094         gst_guint64_to_gdouble (priv->avg_pt) /
2095         gst_guint64_to_gdouble (priv->avg_duration);
2096   else
2097     rate = 0.0;
2098
2099   if (priv->last_left != -1) {
2100     if (dropped || priv->avg_rate < 0.0) {
2101       priv->avg_rate = rate;
2102     } else {
2103       if (rate > 1.0)
2104         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2105       else
2106         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2107     }
2108   }
2109
2110   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2111       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2112       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2113       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2114
2115
2116   if (priv->avg_rate >= 0.0) {
2117     /* if we have a valid rate, start sending QoS messages */
2118     if (priv->current_jitter < 0) {
2119       /* make sure we never go below 0 when adding the jitter to the
2120        * timestamp. */
2121       if (priv->current_rstart < -priv->current_jitter)
2122         priv->current_jitter = -priv->current_rstart;
2123     }
2124     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2125         priv->current_jitter);
2126   }
2127
2128   /* record when this buffer will leave us */
2129   priv->last_left = left;
2130 }
2131
2132 /* reset all qos measuring */
2133 static void
2134 gst_base_sink_reset_qos (GstBaseSink * sink)
2135 {
2136   GstBaseSinkPrivate *priv;
2137
2138   priv = sink->priv;
2139
2140   priv->last_in_time = -1;
2141   priv->last_left = -1;
2142   priv->avg_duration = -1;
2143   priv->avg_pt = -1;
2144   priv->avg_rate = -1.0;
2145   priv->avg_render = -1;
2146   priv->rendered = 0;
2147   priv->dropped = 0;
2148
2149 }
2150
2151 /* Checks if the object was scheduled too late.
2152  *
2153  * start/stop contain the raw timestamp start and stop values
2154  * of the object.
2155  *
2156  * status and jitter contain the return values from the clock wait.
2157  *
2158  * returns TRUE if the buffer was too late.
2159  */
2160 static gboolean
2161 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2162     GstClockTime start, GstClockTime stop,
2163     GstClockReturn status, GstClockTimeDiff jitter)
2164 {
2165   gboolean late;
2166   gint64 max_lateness;
2167   GstBaseSinkPrivate *priv;
2168
2169   priv = basesink->priv;
2170
2171   late = FALSE;
2172
2173   /* only for objects that were too late */
2174   if (G_LIKELY (status != GST_CLOCK_EARLY))
2175     goto in_time;
2176
2177   max_lateness = basesink->abidata.ABI.max_lateness;
2178
2179   /* check if frame dropping is enabled */
2180   if (max_lateness == -1)
2181     goto no_drop;
2182
2183   /* only check for buffers */
2184   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2185     goto not_buffer;
2186
2187   /* can't do check if we don't have a timestamp */
2188   if (G_UNLIKELY (start == -1))
2189     goto no_timestamp;
2190
2191   /* we can add a valid stop time */
2192   if (stop != -1)
2193     max_lateness += stop;
2194   else
2195     max_lateness += start;
2196
2197   /* if the jitter bigger than duration and lateness we are too late */
2198   if ((late = start + jitter > max_lateness)) {
2199     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2200         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2201         GST_TIME_ARGS (max_lateness));
2202     /* !!emergency!!, if we did not receive anything valid for more than a 
2203      * second, render it anyway so the user sees something */
2204     if (priv->last_in_time && start - priv->last_in_time > GST_SECOND) {
2205       late = FALSE;
2206       GST_DEBUG_OBJECT (basesink,
2207           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2208           GST_TIME_ARGS (priv->last_in_time));
2209     }
2210   }
2211
2212 done:
2213   if (!late) {
2214     priv->last_in_time = start;
2215   }
2216   return late;
2217
2218   /* all is fine */
2219 in_time:
2220   {
2221     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2222     goto done;
2223   }
2224 no_drop:
2225   {
2226     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2227     goto done;
2228   }
2229 not_buffer:
2230   {
2231     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2232     return FALSE;
2233   }
2234 no_timestamp:
2235   {
2236     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2237     return FALSE;
2238   }
2239 }
2240
2241 /* called before and after calling the render vmethod. It keeps track of how
2242  * much time was spent in the render method and is used to check if we are
2243  * flooded */
2244 static void
2245 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2246 {
2247   GstBaseSinkPrivate *priv;
2248
2249   priv = basesink->priv;
2250
2251   if (start) {
2252     priv->start = gst_util_get_timestamp ();
2253   } else {
2254     GstClockTime elapsed;
2255
2256     priv->stop = gst_util_get_timestamp ();
2257
2258     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2259
2260     if (priv->avg_render == -1)
2261       priv->avg_render = elapsed;
2262     else
2263       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2264
2265     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2266         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2267   }
2268 }
2269
2270 /* with STREAM_LOCK, PREROLL_LOCK,
2271  *
2272  * Synchronize the object on the clock and then render it.
2273  *
2274  * takes ownership of obj.
2275  */
2276 static GstFlowReturn
2277 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2278     GstMiniObject * obj)
2279 {
2280   GstFlowReturn ret = GST_FLOW_OK;
2281   GstBaseSinkClass *bclass;
2282   gboolean late = FALSE;
2283   GstBaseSinkPrivate *priv;
2284
2285   priv = basesink->priv;
2286
2287   /* synchronize this object, non syncable objects return OK
2288    * immediatly. */
2289   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2290   if (G_UNLIKELY (ret != GST_FLOW_OK))
2291     goto sync_failed;
2292
2293   /* and now render, event or buffer. */
2294   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2295     GstBuffer *buf;
2296
2297     /* drop late buffers unconditionally, let's hope it's unlikely */
2298     if (G_UNLIKELY (late))
2299       goto dropped;
2300
2301     buf = GST_BUFFER_CAST (obj);
2302
2303     gst_base_sink_set_last_buffer (basesink, buf);
2304
2305     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2306
2307     if (G_LIKELY (bclass->render)) {
2308       gint do_qos;
2309
2310       /* read once, to get same value before and after */
2311       do_qos = g_atomic_int_get (&priv->qos_enabled);
2312
2313       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2314
2315       /* record rendering time for QoS and stats */
2316       if (do_qos)
2317         gst_base_sink_do_render_stats (basesink, TRUE);
2318
2319       ret = bclass->render (basesink, buf);
2320
2321       priv->rendered++;
2322
2323       if (do_qos)
2324         gst_base_sink_do_render_stats (basesink, FALSE);
2325     }
2326   } else {
2327     GstEvent *event = GST_EVENT_CAST (obj);
2328     gboolean event_res = TRUE;
2329     GstEventType type;
2330
2331     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2332
2333     type = GST_EVENT_TYPE (event);
2334
2335     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2336         gst_event_type_get_name (type));
2337
2338     if (bclass->event)
2339       event_res = bclass->event (basesink, event);
2340
2341     /* when we get here we could be flushing again when the event handler calls
2342      * _wait_eos(). We have to ignore this object in that case. */
2343     if (G_UNLIKELY (basesink->flushing))
2344       goto flushing;
2345
2346     if (G_LIKELY (event_res)) {
2347       switch (type) {
2348         case GST_EVENT_EOS:
2349           /* the EOS event is completely handled so we mark
2350            * ourselves as being in the EOS state. eos is also 
2351            * protected by the object lock so we can read it when 
2352            * answering the POSITION query. */
2353           GST_OBJECT_LOCK (basesink);
2354           basesink->eos = TRUE;
2355           GST_OBJECT_UNLOCK (basesink);
2356           /* ok, now we can post the message */
2357           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2358           gst_element_post_message (GST_ELEMENT_CAST (basesink),
2359               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
2360           break;
2361         case GST_EVENT_NEWSEGMENT:
2362           /* configure the segment */
2363           gst_base_sink_configure_segment (basesink, pad, event,
2364               &basesink->segment);
2365           break;
2366         default:
2367           break;
2368       }
2369     }
2370   }
2371
2372 done:
2373   gst_base_sink_perform_qos (basesink, late);
2374
2375   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2376   gst_mini_object_unref (obj);
2377
2378   return ret;
2379
2380   /* ERRORS */
2381 sync_failed:
2382   {
2383     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2384     goto done;
2385   }
2386 dropped:
2387   {
2388     priv->dropped++;
2389     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2390     goto done;
2391   }
2392 flushing:
2393   {
2394     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2395     gst_mini_object_unref (obj);
2396     return GST_FLOW_WRONG_STATE;
2397   }
2398 }
2399
2400 /* with STREAM_LOCK, PREROLL_LOCK
2401  *
2402  * Perform preroll on the given object. For buffers this means 
2403  * calling the preroll subclass method. 
2404  * If that succeeds, the state will be commited.
2405  *
2406  * function does not take ownership of obj.
2407  */
2408 static GstFlowReturn
2409 gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad,
2410     GstMiniObject * obj)
2411 {
2412   GstFlowReturn ret;
2413
2414   GST_DEBUG_OBJECT (basesink, "do preroll %p", obj);
2415
2416   /* if it's a buffer, we need to call the preroll method */
2417   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2418     GstBaseSinkClass *bclass;
2419     GstBuffer *buf;
2420     GstClockTime timestamp;
2421
2422     buf = GST_BUFFER_CAST (obj);
2423     timestamp = GST_BUFFER_TIMESTAMP (buf);
2424
2425     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2426         GST_TIME_ARGS (timestamp));
2427
2428     gst_base_sink_set_last_buffer (basesink, buf);
2429
2430     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2431     if (bclass->preroll)
2432       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2433         goto preroll_failed;
2434   }
2435
2436   /* commit state */
2437   if (G_LIKELY (basesink->playing_async)) {
2438     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2439       goto stopping;
2440   }
2441
2442   return GST_FLOW_OK;
2443
2444   /* ERRORS */
2445 preroll_failed:
2446   {
2447     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2448     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2449     return ret;
2450   }
2451 stopping:
2452   {
2453     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2454     return GST_FLOW_WRONG_STATE;
2455   }
2456 }
2457
2458 /* with STREAM_LOCK, PREROLL_LOCK 
2459  *
2460  * Queue an object for rendering.
2461  * The first prerollable object queued will complete the preroll. If the
2462  * preroll queue if filled, we render all the objects in the queue.
2463  *
2464  * This function takes ownership of the object.
2465  */
2466 static GstFlowReturn
2467 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2468     GstMiniObject * obj, gboolean prerollable)
2469 {
2470   GstFlowReturn ret = GST_FLOW_OK;
2471   gint length;
2472   GQueue *q;
2473
2474   if (G_UNLIKELY (basesink->need_preroll)) {
2475     if (G_LIKELY (prerollable))
2476       basesink->preroll_queued++;
2477
2478     length = basesink->preroll_queued;
2479
2480     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2481
2482     /* first prerollable item needs to finish the preroll */
2483     if (length == 1) {
2484       ret = gst_base_sink_preroll_object (basesink, pad, obj);
2485       if (G_UNLIKELY (ret != GST_FLOW_OK))
2486         goto preroll_failed;
2487     }
2488     /* need to recheck if we need preroll, commmit state during preroll 
2489      * could have made us not need more preroll. */
2490     if (G_UNLIKELY (basesink->need_preroll)) {
2491       /* see if we can render now, if we can't add the object to the preroll
2492        * queue. */
2493       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2494         goto more_preroll;
2495     }
2496   }
2497
2498   /* we can start rendering (or blocking) the queued object
2499    * if any. */
2500   q = basesink->preroll_queue;
2501   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2502     GstMiniObject *o;
2503
2504     o = g_queue_pop_head (q);
2505     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2506
2507     /* do something with the return value */
2508     ret = gst_base_sink_render_object (basesink, pad, o);
2509     if (ret != GST_FLOW_OK)
2510       goto dequeue_failed;
2511   }
2512
2513   /* now render the object */
2514   ret = gst_base_sink_render_object (basesink, pad, obj);
2515   basesink->preroll_queued = 0;
2516
2517   return ret;
2518
2519   /* special cases */
2520 preroll_failed:
2521   {
2522     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2523         gst_flow_get_name (ret));
2524     gst_mini_object_unref (obj);
2525     return ret;
2526   }
2527 more_preroll:
2528   {
2529     /* add object to the queue and return */
2530     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2531         length, basesink->preroll_queue_max_len);
2532     g_queue_push_tail (basesink->preroll_queue, obj);
2533     return GST_FLOW_OK;
2534   }
2535 dequeue_failed:
2536   {
2537     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2538         gst_flow_get_name (ret));
2539     gst_mini_object_unref (obj);
2540     return ret;
2541   }
2542 }
2543
2544 /* with STREAM_LOCK
2545  *
2546  * This function grabs the PREROLL_LOCK and adds the object to
2547  * the queue.
2548  *
2549  * This function takes ownership of obj.
2550  */
2551 static GstFlowReturn
2552 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2553     GstMiniObject * obj, gboolean prerollable)
2554 {
2555   GstFlowReturn ret;
2556
2557   GST_PAD_PREROLL_LOCK (pad);
2558   if (G_UNLIKELY (basesink->flushing))
2559     goto flushing;
2560
2561   if (G_UNLIKELY (basesink->priv->received_eos))
2562     goto was_eos;
2563
2564   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2565   GST_PAD_PREROLL_UNLOCK (pad);
2566
2567   return ret;
2568
2569   /* ERRORS */
2570 flushing:
2571   {
2572     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2573     GST_PAD_PREROLL_UNLOCK (pad);
2574     gst_mini_object_unref (obj);
2575     return GST_FLOW_WRONG_STATE;
2576   }
2577 was_eos:
2578   {
2579     GST_DEBUG_OBJECT (basesink,
2580         "we are EOS, dropping object, return UNEXPECTED");
2581     GST_PAD_PREROLL_UNLOCK (pad);
2582     gst_mini_object_unref (obj);
2583     return GST_FLOW_UNEXPECTED;
2584   }
2585 }
2586
2587 static void
2588 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2589 {
2590   /* make sure we are not blocked on the clock also clear any pending
2591    * eos state. */
2592   gst_base_sink_set_flushing (basesink, pad, TRUE);
2593
2594   /* we grab the stream lock but that is not needed since setting the
2595    * sink to flushing would make sure no state commit is being done
2596    * anymore */
2597   GST_PAD_STREAM_LOCK (pad);
2598   gst_base_sink_reset_qos (basesink);
2599   if (basesink->priv->async_enabled) {
2600     /* and we need to commit our state again on the next
2601      * prerolled buffer */
2602     basesink->playing_async = TRUE;
2603     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2604   } else {
2605     basesink->priv->have_latency = TRUE;
2606     basesink->need_preroll = FALSE;
2607   }
2608   gst_base_sink_set_last_buffer (basesink, NULL);
2609   GST_PAD_STREAM_UNLOCK (pad);
2610 }
2611
2612 static void
2613 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2614 {
2615   /* unset flushing so we can accept new data, this also flushes out any EOS
2616    * event. */
2617   gst_base_sink_set_flushing (basesink, pad, FALSE);
2618
2619   /* for position reporting */
2620   GST_OBJECT_LOCK (basesink);
2621   basesink->priv->current_sstart = -1;
2622   basesink->priv->current_sstop = -1;
2623   basesink->priv->eos_rtime = -1;
2624   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2625     /* we need new segment info after the flush. */
2626     basesink->have_newsegment = FALSE;
2627     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2628     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2629   }
2630   GST_OBJECT_UNLOCK (basesink);
2631 }
2632
2633 static gboolean
2634 gst_base_sink_event (GstPad * pad, GstEvent * event)
2635 {
2636   GstBaseSink *basesink;
2637   gboolean result = TRUE;
2638   GstBaseSinkClass *bclass;
2639
2640   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2641
2642   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2643
2644   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2645       GST_EVENT_TYPE_NAME (event));
2646
2647   switch (GST_EVENT_TYPE (event)) {
2648     case GST_EVENT_EOS:
2649     {
2650       GstFlowReturn ret;
2651
2652       GST_PAD_PREROLL_LOCK (pad);
2653       if (G_UNLIKELY (basesink->flushing))
2654         goto flushing;
2655
2656       if (G_UNLIKELY (basesink->priv->received_eos)) {
2657         /* we can't accept anything when we are EOS */
2658         result = FALSE;
2659         gst_event_unref (event);
2660       } else {
2661         /* we set the received EOS flag here so that we can use it when testing if
2662          * we are prerolled and to refure more buffers. */
2663         basesink->priv->received_eos = TRUE;
2664
2665         /* EOS is a prerollable object, we call the unlocked version because it
2666          * does not check the received_eos flag. */
2667         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2668             GST_MINI_OBJECT_CAST (event), TRUE);
2669         if (G_UNLIKELY (ret != GST_FLOW_OK))
2670           result = FALSE;
2671       }
2672       GST_PAD_PREROLL_UNLOCK (pad);
2673       break;
2674     }
2675     case GST_EVENT_NEWSEGMENT:
2676     {
2677       GstFlowReturn ret;
2678
2679       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2680
2681       GST_PAD_PREROLL_LOCK (pad);
2682       if (G_UNLIKELY (basesink->flushing))
2683         goto flushing;
2684
2685       if (G_UNLIKELY (basesink->priv->received_eos)) {
2686         /* we can't accept anything when we are EOS */
2687         result = FALSE;
2688         gst_event_unref (event);
2689       } else {
2690         /* the new segment is a non prerollable item and does not block anything,
2691          * we need to configure the current clipping segment and insert the event 
2692          * in the queue to serialize it with the buffers for rendering. */
2693         gst_base_sink_configure_segment (basesink, pad, event,
2694             basesink->abidata.ABI.clip_segment);
2695
2696         ret =
2697             gst_base_sink_queue_object_unlocked (basesink, pad,
2698             GST_MINI_OBJECT_CAST (event), FALSE);
2699         if (G_UNLIKELY (ret != GST_FLOW_OK))
2700           result = FALSE;
2701         else {
2702           GST_OBJECT_LOCK (basesink);
2703           basesink->have_newsegment = TRUE;
2704           GST_OBJECT_UNLOCK (basesink);
2705         }
2706       }
2707       GST_PAD_PREROLL_UNLOCK (pad);
2708       break;
2709     }
2710     case GST_EVENT_FLUSH_START:
2711       if (bclass->event)
2712         bclass->event (basesink, event);
2713
2714       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2715
2716       gst_base_sink_flush_start (basesink, pad);
2717
2718       gst_event_unref (event);
2719       break;
2720     case GST_EVENT_FLUSH_STOP:
2721       if (bclass->event)
2722         bclass->event (basesink, event);
2723
2724       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
2725
2726       gst_base_sink_flush_stop (basesink, pad);
2727
2728       gst_event_unref (event);
2729       break;
2730     default:
2731       /* other events are sent to queue or subclass depending on if they
2732        * are serialized. */
2733       if (GST_EVENT_IS_SERIALIZED (event)) {
2734         gst_base_sink_queue_object (basesink, pad,
2735             GST_MINI_OBJECT_CAST (event), FALSE);
2736       } else {
2737         if (bclass->event)
2738           bclass->event (basesink, event);
2739         gst_event_unref (event);
2740       }
2741       break;
2742   }
2743 done:
2744   gst_object_unref (basesink);
2745
2746   return result;
2747
2748   /* ERRORS */
2749 flushing:
2750   {
2751     GST_DEBUG_OBJECT (basesink, "we are flushing");
2752     GST_PAD_PREROLL_UNLOCK (pad);
2753     result = FALSE;
2754     gst_event_unref (event);
2755     goto done;
2756   }
2757 }
2758
2759 /* default implementation to calculate the start and end
2760  * timestamps on a buffer, subclasses can override
2761  */
2762 static void
2763 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
2764     GstClockTime * start, GstClockTime * end)
2765 {
2766   GstClockTime timestamp, duration;
2767
2768   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2769   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2770
2771     /* get duration to calculate end time */
2772     duration = GST_BUFFER_DURATION (buffer);
2773     if (GST_CLOCK_TIME_IS_VALID (duration)) {
2774       *end = timestamp + duration;
2775     }
2776     *start = timestamp;
2777   }
2778 }
2779
2780 /* must be called with PREROLL_LOCK */
2781 static gboolean
2782 gst_base_sink_needs_preroll (GstBaseSink * basesink)
2783 {
2784   gboolean is_prerolled, res;
2785
2786   /* we have 2 cases where the PREROLL_LOCK is released:
2787    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
2788    *  2) we are syncing on the clock
2789    */
2790   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
2791   res = !is_prerolled;
2792
2793   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
2794       basesink->have_preroll, basesink->priv->received_eos, res);
2795
2796   return res;
2797 }
2798
2799 /* with STREAM_LOCK, PREROLL_LOCK 
2800  *
2801  * Takes a buffer and compare the timestamps with the last segment.
2802  * If the buffer falls outside of the segment boundaries, drop it.
2803  * Else queue the buffer for preroll and rendering.
2804  *
2805  * This function takes ownership of the buffer.
2806  */
2807 static GstFlowReturn
2808 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
2809     GstBuffer * buf)
2810 {
2811   GstBaseSinkClass *bclass;
2812   GstFlowReturn result;
2813   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
2814   GstSegment *clip_segment;
2815
2816   if (G_UNLIKELY (basesink->flushing))
2817     goto flushing;
2818
2819   if (G_UNLIKELY (basesink->priv->received_eos))
2820     goto was_eos;
2821
2822   /* for code clarity */
2823   clip_segment = basesink->abidata.ABI.clip_segment;
2824
2825   if (G_UNLIKELY (!basesink->have_newsegment)) {
2826     gboolean sync;
2827
2828     sync = gst_base_sink_get_sync (basesink);
2829     if (sync) {
2830       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
2831           (_("Internal data flow problem.")),
2832           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
2833     }
2834
2835     /* this means this sink will assume timestamps start from 0 */
2836     GST_OBJECT_LOCK (basesink);
2837     clip_segment->start = 0;
2838     clip_segment->stop = -1;
2839     basesink->segment.start = 0;
2840     basesink->segment.stop = -1;
2841     basesink->have_newsegment = TRUE;
2842     GST_OBJECT_UNLOCK (basesink);
2843   }
2844
2845   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2846
2847   /* check if the buffer needs to be dropped, we first ask the subclass for the
2848    * start and end */
2849   if (bclass->get_times)
2850     bclass->get_times (basesink, buf, &start, &end);
2851
2852   if (start == -1) {
2853     /* if the subclass does not want sync, we use our own values so that we at
2854      * least clip the buffer to the segment */
2855     gst_base_sink_get_times (basesink, buf, &start, &end);
2856   }
2857
2858   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2859       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
2860
2861   /* a dropped buffer does not participate in anything */
2862   if (GST_CLOCK_TIME_IS_VALID (start) &&
2863       (clip_segment->format == GST_FORMAT_TIME)) {
2864     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
2865                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
2866       goto out_of_segment;
2867   }
2868
2869   /* now we can process the buffer in the queue, this function takes ownership
2870    * of the buffer */
2871   result = gst_base_sink_queue_object_unlocked (basesink, pad,
2872       GST_MINI_OBJECT_CAST (buf), TRUE);
2873
2874   return result;
2875
2876   /* ERRORS */
2877 flushing:
2878   {
2879     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2880     gst_buffer_unref (buf);
2881     return GST_FLOW_WRONG_STATE;
2882   }
2883 was_eos:
2884   {
2885     GST_DEBUG_OBJECT (basesink,
2886         "we are EOS, dropping object, return UNEXPECTED");
2887     gst_buffer_unref (buf);
2888     return GST_FLOW_UNEXPECTED;
2889   }
2890 out_of_segment:
2891   {
2892     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
2893     gst_buffer_unref (buf);
2894     return GST_FLOW_OK;
2895   }
2896 }
2897
2898 /* with STREAM_LOCK
2899  */
2900 static GstFlowReturn
2901 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
2902 {
2903   GstBaseSink *basesink;
2904   GstFlowReturn result;
2905
2906   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2907
2908   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
2909     goto wrong_mode;
2910
2911   GST_PAD_PREROLL_LOCK (pad);
2912   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2913   GST_PAD_PREROLL_UNLOCK (pad);
2914
2915 done:
2916   return result;
2917
2918   /* ERRORS */
2919 wrong_mode:
2920   {
2921     GST_OBJECT_LOCK (pad);
2922     GST_WARNING_OBJECT (basesink,
2923         "Push on pad %s:%s, but it was not activated in push mode",
2924         GST_DEBUG_PAD_NAME (pad));
2925     GST_OBJECT_UNLOCK (pad);
2926     gst_buffer_unref (buf);
2927     /* we don't post an error message this will signal to the peer
2928      * pushing that EOS is reached. */
2929     result = GST_FLOW_UNEXPECTED;
2930     goto done;
2931   }
2932 }
2933
2934 /* perform a seek, only executed in pull mode */
2935 static gboolean
2936 gst_base_sink_perform_seek (GstBaseSink * basesink, GstPad * pad,
2937     GstEvent * event)
2938 {
2939   gboolean flush;
2940   gdouble rate;
2941   GstFormat seek_format;
2942   GstSeekFlags flags;
2943   GstSeekType cur_type, stop_type;
2944   gint64 cur, stop;
2945
2946   if (event) {
2947     GST_DEBUG_OBJECT (basesink, "performing seek with event %p", event);
2948     gst_event_parse_seek (event, &rate, &seek_format, &flags,
2949         &cur_type, &cur, &stop_type, &stop);
2950
2951     flush = flags & GST_SEEK_FLAG_FLUSH;
2952   } else {
2953     GST_DEBUG_OBJECT (basesink, "performing seek without event");
2954     flush = FALSE;
2955   }
2956
2957   if (flush) {
2958     GST_DEBUG_OBJECT (basesink, "flushing upstream");
2959     gst_pad_push_event (pad, gst_event_new_flush_start ());
2960     gst_base_sink_flush_start (basesink, pad);
2961   } else {
2962     GST_DEBUG_OBJECT (basesink, "pausing pulling thread");
2963   }
2964
2965   GST_PAD_STREAM_LOCK (pad);
2966
2967   if (flush) {
2968     GST_DEBUG_OBJECT (basesink, "stop flushing upstream");
2969     gst_pad_push_event (pad, gst_event_new_flush_stop ());
2970     gst_base_sink_flush_stop (basesink, pad);
2971   } else {
2972     GST_DEBUG_OBJECT (basesink, "restarting pulling thread");
2973   }
2974
2975   GST_PAD_STREAM_UNLOCK (pad);
2976
2977   return FALSE;
2978 }
2979
2980 /* with STREAM_LOCK
2981  */
2982 static void
2983 gst_base_sink_loop (GstPad * pad)
2984 {
2985   GstBaseSink *basesink;
2986   GstBuffer *buf = NULL;
2987   GstFlowReturn result;
2988   guint blocksize;
2989   guint64 offset;
2990
2991   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2992
2993   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
2994
2995   if ((blocksize = basesink->priv->blocksize) == 0)
2996     blocksize = -1;
2997
2998   offset = basesink->segment.last_stop;
2999
3000   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3001       offset, blocksize);
3002
3003   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3004   if (G_UNLIKELY (result != GST_FLOW_OK))
3005     goto paused;
3006
3007   if (G_UNLIKELY (buf == NULL))
3008     goto no_buffer;
3009
3010   offset += GST_BUFFER_SIZE (buf);
3011
3012   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3013
3014   GST_PAD_PREROLL_LOCK (pad);
3015   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3016   GST_PAD_PREROLL_UNLOCK (pad);
3017   if (G_UNLIKELY (result != GST_FLOW_OK))
3018     goto paused;
3019
3020   return;
3021
3022   /* ERRORS */
3023 paused:
3024   {
3025     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3026         gst_flow_get_name (result));
3027     gst_pad_pause_task (pad);
3028     /* fatal errors and NOT_LINKED cause EOS */
3029     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3030       if (result == GST_FLOW_UNEXPECTED) {
3031         /* perform EOS logic */
3032         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3033           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3034               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3035                   basesink->segment.format, basesink->segment.last_stop));
3036         } else {
3037           gst_base_sink_event (pad, gst_event_new_eos ());
3038         }
3039       } else {
3040         /* for fatal errors we post an error message, post the error
3041          * first so the app knows about the error first. */
3042         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3043             (_("Internal data stream error.")),
3044             ("stream stopped, reason %s", gst_flow_get_name (result)));
3045         gst_base_sink_event (pad, gst_event_new_eos ());
3046       }
3047     }
3048     return;
3049   }
3050 no_buffer:
3051   {
3052     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3053     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3054         (_("Internal data flow error.")), ("element returned NULL buffer"));
3055     result = GST_FLOW_ERROR;
3056     goto paused;
3057   }
3058 }
3059
3060 static gboolean
3061 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3062     gboolean flushing)
3063 {
3064   GstBaseSinkClass *bclass;
3065
3066   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3067
3068   if (flushing) {
3069     /* unlock any subclasses, we need to do this before grabbing the
3070      * PREROLL_LOCK since we hold this lock before going into ::render. */
3071     if (bclass->unlock)
3072       bclass->unlock (basesink);
3073   }
3074
3075   GST_PAD_PREROLL_LOCK (pad);
3076   basesink->flushing = flushing;
3077   if (flushing) {
3078     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3079     if (bclass->unlock_stop)
3080       bclass->unlock_stop (basesink);
3081
3082     /* set need_preroll before we unblock the clock. If the clock is unblocked
3083      * before timing out, we can reuse the buffer for preroll. */
3084     basesink->need_preroll = TRUE;
3085
3086     /* step 2, unblock clock sync (if any) or any other blocking thing */
3087     if (basesink->clock_id) {
3088       gst_clock_id_unschedule (basesink->clock_id);
3089     }
3090
3091     /* flush out the data thread if it's locked in finish_preroll, this will
3092      * also flush out the EOS state */
3093     GST_DEBUG_OBJECT (basesink,
3094         "flushing out data thread, need preroll to TRUE");
3095     gst_base_sink_preroll_queue_flush (basesink, pad);
3096   }
3097   GST_PAD_PREROLL_UNLOCK (pad);
3098
3099   return TRUE;
3100 }
3101
3102 static gboolean
3103 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3104 {
3105   gboolean result;
3106
3107   if (active) {
3108     /* start task */
3109     result = gst_pad_start_task (basesink->sinkpad,
3110         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3111   } else {
3112     /* step 2, make sure streaming finishes */
3113     result = gst_pad_stop_task (basesink->sinkpad);
3114   }
3115
3116   return result;
3117 }
3118
3119 static gboolean
3120 gst_base_sink_pad_activate (GstPad * pad)
3121 {
3122   gboolean result = FALSE;
3123   GstBaseSink *basesink;
3124
3125   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3126
3127   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3128
3129   gst_base_sink_set_flushing (basesink, pad, FALSE);
3130
3131   /* we need to have the pull mode enabled */
3132   if (!basesink->can_activate_pull) {
3133     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3134     goto fallback;
3135   }
3136
3137   /* check if downstreams supports pull mode at all */
3138   if (!gst_pad_check_pull_range (pad)) {
3139     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3140     goto fallback;
3141   }
3142
3143   /* set the pad mode before starting the task so that it's in the
3144    * correct state for the new thread. also the sink set_caps and get_caps
3145    * function checks this */
3146   basesink->pad_mode = GST_ACTIVATE_PULL;
3147
3148   /* we first try to negotiate a format so that when we try to activate
3149    * downstream, it knows about our format */
3150   if (!gst_base_sink_negotiate_pull (basesink)) {
3151     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3152     goto fallback;
3153   }
3154
3155   /* ok activate now */
3156   if (!gst_pad_activate_pull (pad, TRUE)) {
3157     /* clear any pending caps */
3158     GST_OBJECT_LOCK (basesink);
3159     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3160     GST_OBJECT_UNLOCK (basesink);
3161     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3162     goto fallback;
3163   }
3164
3165   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3166   result = TRUE;
3167   goto done;
3168
3169   /* push mode fallback */
3170 fallback:
3171   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3172   if ((result = gst_pad_activate_push (pad, TRUE))) {
3173     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3174   }
3175
3176 done:
3177   if (!result) {
3178     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3179     gst_base_sink_set_flushing (basesink, pad, TRUE);
3180   }
3181
3182   gst_object_unref (basesink);
3183
3184   return result;
3185 }
3186
3187 static gboolean
3188 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3189 {
3190   gboolean result;
3191   GstBaseSink *basesink;
3192
3193   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3194
3195   if (active) {
3196     if (!basesink->can_activate_push) {
3197       result = FALSE;
3198       basesink->pad_mode = GST_ACTIVATE_NONE;
3199     } else {
3200       result = TRUE;
3201       basesink->pad_mode = GST_ACTIVATE_PUSH;
3202     }
3203   } else {
3204     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3205       g_warning ("Internal GStreamer activation error!!!");
3206       result = FALSE;
3207     } else {
3208       gst_base_sink_set_flushing (basesink, pad, TRUE);
3209       result = TRUE;
3210       basesink->pad_mode = GST_ACTIVATE_NONE;
3211     }
3212   }
3213
3214   gst_object_unref (basesink);
3215
3216   return result;
3217 }
3218
3219 static gboolean
3220 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3221 {
3222   GstCaps *caps;
3223   gboolean result;
3224
3225   result = FALSE;
3226
3227   /* this returns the intersection between our caps and the peer caps. If there
3228    * is no peer, it returns NULL and we can't operate in pull mode so we can
3229    * fail the negotiation. */
3230   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3231   if (caps == NULL || gst_caps_is_empty (caps))
3232     goto no_caps_possible;
3233
3234   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3235
3236   caps = gst_caps_make_writable (caps);
3237   /* get the first (prefered) format */
3238   gst_caps_truncate (caps);
3239   /* try to fixate */
3240   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3241
3242   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3243
3244   if (gst_caps_is_any (caps)) {
3245     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3246         "allowing pull()");
3247     /* neither side has template caps in this case, so they are prepared for
3248        pull() without setcaps() */
3249     result = TRUE;
3250   } else if (gst_caps_is_fixed (caps)) {
3251     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3252       goto could_not_set_caps;
3253
3254     GST_OBJECT_LOCK (basesink);
3255     gst_caps_replace (&basesink->priv->pull_caps, caps);
3256     GST_OBJECT_UNLOCK (basesink);
3257
3258     result = TRUE;
3259   }
3260
3261   gst_caps_unref (caps);
3262
3263   return result;
3264
3265 no_caps_possible:
3266   {
3267     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3268     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3269     if (caps)
3270       gst_caps_unref (caps);
3271     return FALSE;
3272   }
3273 could_not_set_caps:
3274   {
3275     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3276     gst_caps_unref (caps);
3277     return FALSE;
3278   }
3279 }
3280
3281 /* this won't get called until we implement an activate function */
3282 static gboolean
3283 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3284 {
3285   gboolean result = FALSE;
3286   GstBaseSink *basesink;
3287   GstBaseSinkClass *bclass;
3288
3289   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3290   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3291
3292   if (active) {
3293     GstFormat format;
3294     gint64 duration;
3295
3296     /* we mark we have a newsegment here because pull based
3297      * mode works just fine without having a newsegment before the
3298      * first buffer */
3299     format = GST_FORMAT_BYTES;
3300
3301     gst_segment_init (&basesink->segment, format);
3302     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3303     GST_OBJECT_LOCK (basesink);
3304     basesink->have_newsegment = TRUE;
3305     GST_OBJECT_UNLOCK (basesink);
3306
3307     /* get the peer duration in bytes */
3308     result = gst_pad_query_peer_duration (pad, &format, &duration);
3309     if (result) {
3310       GST_DEBUG_OBJECT (basesink,
3311           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3312       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3313           duration);
3314       gst_segment_set_duration (&basesink->segment, format, duration);
3315     } else {
3316       GST_DEBUG_OBJECT (basesink, "unknown duration");
3317     }
3318
3319     if (bclass->activate_pull)
3320       result = bclass->activate_pull (basesink, TRUE);
3321     else
3322       result = FALSE;
3323
3324     if (!result)
3325       goto activate_failed;
3326
3327     /* but if starting the thread fails, set it back */
3328     if (!result)
3329       basesink->pad_mode = GST_ACTIVATE_NONE;
3330   } else {
3331     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3332       g_warning ("Internal GStreamer activation error!!!");
3333       result = FALSE;
3334     } else {
3335       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3336       if (bclass->activate_pull)
3337         result &= bclass->activate_pull (basesink, FALSE);
3338       basesink->pad_mode = GST_ACTIVATE_NONE;
3339       /* clear any pending caps */
3340       GST_OBJECT_LOCK (basesink);
3341       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3342       GST_OBJECT_UNLOCK (basesink);
3343     }
3344   }
3345   gst_object_unref (basesink);
3346
3347   return result;
3348
3349   /* ERRORS */
3350 activate_failed:
3351   {
3352     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3353     return FALSE;
3354   }
3355 }
3356
3357 /* send an event to our sinkpad peer. */
3358 static gboolean
3359 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3360 {
3361   GstPad *pad;
3362   GstBaseSink *basesink = GST_BASE_SINK (element);
3363   gboolean forward, result = TRUE;
3364   GstActivateMode mode;
3365
3366   GST_OBJECT_LOCK (element);
3367   /* get the pad and the scheduling mode */
3368   pad = gst_object_ref (basesink->sinkpad);
3369   mode = basesink->pad_mode;
3370   GST_OBJECT_UNLOCK (element);
3371
3372   /* only push UPSTREAM events upstream and if we are in push mode */
3373   forward = GST_EVENT_IS_UPSTREAM (event) && (mode == GST_ACTIVATE_PUSH);
3374
3375   switch (GST_EVENT_TYPE (event)) {
3376     case GST_EVENT_LATENCY:
3377     {
3378       GstClockTime latency;
3379
3380       gst_event_parse_latency (event, &latency);
3381
3382       /* store the latency. We use this to adjust the running_time before syncing
3383        * it to the clock. */
3384       GST_OBJECT_LOCK (element);
3385       basesink->priv->latency = latency;
3386       GST_OBJECT_UNLOCK (element);
3387       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3388           GST_TIME_ARGS (latency));
3389
3390       /* We forward this event so that all elements know about the global pipeline
3391        * latency. This is interesting for an element when it wants to figure out
3392        * when a particular piece of data will be rendered. */
3393       break;
3394     }
3395     case GST_EVENT_SEEK:
3396       /* in pull mode we will execute the seek */
3397       if (mode == GST_ACTIVATE_PULL)
3398         result = gst_base_sink_perform_seek (basesink, pad, event);
3399       break;
3400     default:
3401       break;
3402   }
3403
3404   if (forward) {
3405     result = gst_pad_push_event (pad, event);
3406   } else {
3407     /* not forwarded, unref the event */
3408     gst_event_unref (event);
3409   }
3410
3411   gst_object_unref (pad);
3412   return result;
3413 }
3414
3415 static gboolean
3416 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3417 {
3418   GstPad *peer;
3419   gboolean res = FALSE;
3420
3421   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3422     res = gst_pad_query (peer, query);
3423     gst_object_unref (peer);
3424   }
3425   return res;
3426 }
3427
3428 /* get the end position of the last seen object, this is used
3429  * for EOS and for making sure that we don't report a position we
3430  * have not reached yet. */
3431 static gboolean
3432 gst_base_sink_get_position_last (GstBaseSink * basesink, gint64 * cur)
3433 {
3434   /* return last observed stream time */
3435   *cur = basesink->priv->current_sstop;
3436
3437   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
3438       GST_TIME_ARGS (*cur));
3439   return TRUE;
3440 }
3441
3442 /* get the position when we are PAUSED, this is the stream time of the buffer
3443  * that prerolled. If no buffer is prerolled (we are still flushing), this
3444  * value will be -1. */
3445 static gboolean
3446 gst_base_sink_get_position_paused (GstBaseSink * basesink, gint64 * cur)
3447 {
3448   gboolean res;
3449   gint64 time;
3450   GstSegment *segment;
3451
3452   *cur = basesink->priv->current_sstart;
3453   segment = basesink->abidata.ABI.clip_segment;
3454
3455   time = segment->time;
3456
3457   if (*cur != -1) {
3458     *cur = MAX (*cur, time);
3459     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
3460         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
3461   } else {
3462     /* we have no buffer, use the segment times. */
3463     if (segment->rate >= 0.0) {
3464       /* forward, next position is always the time of the segment */
3465       *cur = time;
3466       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
3467           GST_TIME_ARGS (*cur));
3468     } else {
3469       /* reverse, next expected timestamp is segment->stop. We use the function
3470        * to get things right for negative applied_rates. */
3471       *cur =
3472           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, segment->stop);
3473       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
3474           GST_TIME_ARGS (*cur));
3475     }
3476   }
3477   res = (*cur != -1);
3478
3479   return res;
3480 }
3481
3482 static gboolean
3483 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
3484     gint64 * cur, gboolean * upstream)
3485 {
3486   GstClock *clock;
3487   gboolean res = FALSE;
3488
3489   switch (format) {
3490       /* we can answer time format */
3491     case GST_FORMAT_TIME:
3492     {
3493       GstClockTime now, base, latency;
3494       gint64 time, accum, duration;
3495       gdouble rate;
3496       gint64 last;
3497
3498       GST_OBJECT_LOCK (basesink);
3499
3500       /* can only give answer based on the clock if not EOS */
3501       if (G_UNLIKELY (basesink->eos))
3502         goto in_eos;
3503
3504       /* we can only get the segment when we are not NULL or READY */
3505       if (!basesink->have_newsegment)
3506         goto wrong_state;
3507
3508       /* when not in PLAYING or when we're busy with a state change, we
3509        * cannot read from the clock so we report time based on the
3510        * last seen timestamp. */
3511       if (GST_STATE (basesink) != GST_STATE_PLAYING ||
3512           GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
3513         goto in_pause;
3514
3515       /* we need to sync on the clock. */
3516       if (basesink->sync == FALSE)
3517         goto no_sync;
3518
3519       /* and we need a clock */
3520       if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
3521         goto no_sync;
3522
3523       /* collect all data we need holding the lock */
3524       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
3525         time = basesink->segment.time;
3526       else
3527         time = 0;
3528
3529       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
3530         duration = basesink->segment.stop - basesink->segment.start;
3531       else
3532         duration = 0;
3533
3534       base = GST_ELEMENT_CAST (basesink)->base_time;
3535       accum = basesink->segment.accum;
3536       rate = basesink->segment.rate * basesink->segment.applied_rate;
3537       gst_base_sink_get_position_last (basesink, &last);
3538       latency = basesink->priv->latency;
3539
3540       gst_object_ref (clock);
3541       /* need to release the object lock before we can get the time, 
3542        * a clock might take the LOCK of the provider, which could be
3543        * a basesink subclass. */
3544       GST_OBJECT_UNLOCK (basesink);
3545
3546       now = gst_clock_get_time (clock);
3547
3548       /* subtract base time and accumulated time from the clock time. 
3549        * Make sure we don't go negative. This is the current time in
3550        * the segment which we need to scale with the combined 
3551        * rate and applied rate. */
3552       base += accum;
3553       base += latency;
3554       base = MIN (now, base);
3555
3556       /* for negative rates we need to count back from from the segment
3557        * duration. */
3558       if (rate < 0.0)
3559         time += duration;
3560
3561       *cur = time + gst_guint64_to_gdouble (now - base) * rate;
3562
3563       /* never report more than last seen position */
3564       if (last != -1)
3565         *cur = MIN (last, *cur);
3566
3567       gst_object_unref (clock);
3568
3569       res = TRUE;
3570
3571       GST_DEBUG_OBJECT (basesink,
3572           "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
3573           GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
3574           GST_TIME_ARGS (now), GST_TIME_ARGS (base),
3575           GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
3576       break;
3577     }
3578     default:
3579       /* cannot answer other than TIME, ask to send the query upstream. */
3580       *upstream = TRUE;
3581       break;
3582   }
3583
3584 done:
3585   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
3586       res, GST_TIME_ARGS (*cur));
3587   return res;
3588
3589   /* special cases */
3590 in_eos:
3591   {
3592     GST_DEBUG_OBJECT (basesink, "position in EOS");
3593     res = gst_base_sink_get_position_last (basesink, cur);
3594     GST_OBJECT_UNLOCK (basesink);
3595     goto done;
3596   }
3597 in_pause:
3598   {
3599     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
3600     res = gst_base_sink_get_position_paused (basesink, cur);
3601     GST_OBJECT_UNLOCK (basesink);
3602     goto done;
3603   }
3604 wrong_state:
3605   {
3606     /* in NULL or READY we always return FALSE and -1 */
3607     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
3608     res = FALSE;
3609     *cur = -1;
3610     GST_OBJECT_UNLOCK (basesink);
3611     goto done;
3612   }
3613 no_sync:
3614   {
3615     /* report last seen timestamp if any, else ask upstream to answer */
3616     if ((*cur = basesink->priv->current_sstart) != -1)
3617       res = TRUE;
3618     else
3619       *upstream = TRUE;
3620
3621     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
3622         res, GST_TIME_ARGS (*cur));
3623     GST_OBJECT_UNLOCK (basesink);
3624     return res;
3625   }
3626 }
3627
3628 static gboolean
3629 gst_base_sink_query (GstElement * element, GstQuery * query)
3630 {
3631   gboolean res = FALSE;
3632
3633   GstBaseSink *basesink = GST_BASE_SINK (element);
3634
3635   switch (GST_QUERY_TYPE (query)) {
3636     case GST_QUERY_POSITION:
3637     {
3638       gint64 cur = 0;
3639       GstFormat format;
3640       gboolean upstream = FALSE;
3641
3642       gst_query_parse_position (query, &format, NULL);
3643
3644       GST_DEBUG_OBJECT (basesink, "position format %d", format);
3645
3646       /* first try to get the position based on the clock */
3647       if ((res =
3648               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
3649         gst_query_set_position (query, format, cur);
3650       } else if (upstream) {
3651         /* fallback to peer query */
3652         res = gst_base_sink_peer_query (basesink, query);
3653       }
3654       break;
3655     }
3656     case GST_QUERY_DURATION:
3657     {
3658       GstFormat format, uformat;
3659       gint64 duration, uduration;
3660
3661       gst_query_parse_duration (query, &format, NULL);
3662
3663       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
3664           gst_format_get_name (format));
3665
3666       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
3667         uformat = GST_FORMAT_BYTES;
3668
3669         /* get the duration in bytes, in pull mode that's all we are sure to
3670          * know. */
3671         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
3672             &uduration);
3673         if (res && format != uformat) {
3674           /* convert to the requested format */
3675           res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
3676               &format, &duration);
3677         } else {
3678           duration = uduration;
3679         }
3680         if (res) {
3681           /* set the result */
3682           gst_query_set_duration (query, format, duration);
3683         }
3684       } else {
3685         /* in push mode we simply forward upstream */
3686         res = gst_base_sink_peer_query (basesink, query);
3687       }
3688       break;
3689     }
3690     case GST_QUERY_LATENCY:
3691     {
3692       gboolean live, us_live;
3693       GstClockTime min, max;
3694
3695       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
3696                   &max))) {
3697         gst_query_set_latency (query, live, min, max);
3698       }
3699       break;
3700     }
3701     case GST_QUERY_JITTER:
3702       break;
3703     case GST_QUERY_RATE:
3704       /* gst_query_set_rate (query, basesink->segment_rate); */
3705       res = TRUE;
3706       break;
3707     case GST_QUERY_SEGMENT:
3708     {
3709       /* FIXME, bring start/stop to stream time */
3710       gst_query_set_segment (query, basesink->segment.rate,
3711           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
3712       break;
3713     }
3714     case GST_QUERY_SEEKING:
3715     case GST_QUERY_CONVERT:
3716     case GST_QUERY_FORMATS:
3717     default:
3718       res = gst_base_sink_peer_query (basesink, query);
3719       break;
3720   }
3721   return res;
3722 }
3723
3724 static GstStateChangeReturn
3725 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
3726 {
3727   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3728   GstBaseSink *basesink = GST_BASE_SINK (element);
3729   GstBaseSinkClass *bclass;
3730   GstBaseSinkPrivate *priv;
3731
3732   priv = basesink->priv;
3733
3734   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3735
3736   switch (transition) {
3737     case GST_STATE_CHANGE_NULL_TO_READY:
3738       if (bclass->start)
3739         if (!bclass->start (basesink))
3740           goto start_failed;
3741       break;
3742     case GST_STATE_CHANGE_READY_TO_PAUSED:
3743       /* need to complete preroll before this state change completes, there
3744        * is no data flow in READY so we can safely assume we need to preroll. */
3745       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3746       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
3747       basesink->have_newsegment = FALSE;
3748       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3749       gst_segment_init (basesink->abidata.ABI.clip_segment,
3750           GST_FORMAT_UNDEFINED);
3751       basesink->offset = 0;
3752       basesink->have_preroll = FALSE;
3753       basesink->need_preroll = TRUE;
3754       basesink->playing_async = TRUE;
3755       priv->current_sstart = -1;
3756       priv->current_sstop = -1;
3757       priv->eos_rtime = -1;
3758       priv->latency = 0;
3759       basesink->eos = FALSE;
3760       priv->received_eos = FALSE;
3761       gst_base_sink_reset_qos (basesink);
3762       priv->commited = FALSE;
3763       if (priv->async_enabled) {
3764         GST_DEBUG_OBJECT (basesink, "doing async state change");
3765         /* when async enabled, post async-start message and return ASYNC from
3766          * the state change function */
3767         ret = GST_STATE_CHANGE_ASYNC;
3768         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3769             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3770       } else {
3771         priv->have_latency = TRUE;
3772       }
3773       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3774       break;
3775     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3776       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3777       if (!gst_base_sink_needs_preroll (basesink)) {
3778         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
3779         /* no preroll needed anymore now. */
3780         basesink->playing_async = FALSE;
3781         basesink->need_preroll = FALSE;
3782         if (basesink->eos) {
3783           /* need to post EOS message here */
3784           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3785           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3786               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
3787         } else {
3788           GST_DEBUG_OBJECT (basesink, "signal preroll");
3789           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
3790         }
3791       } else {
3792         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
3793         basesink->need_preroll = TRUE;
3794         basesink->playing_async = TRUE;
3795         priv->commited = FALSE;
3796         if (priv->async_enabled) {
3797           GST_DEBUG_OBJECT (basesink, "doing async state change");
3798           ret = GST_STATE_CHANGE_ASYNC;
3799           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3800               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3801         }
3802       }
3803       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3804       break;
3805     default:
3806       break;
3807   }
3808
3809   {
3810     GstStateChangeReturn bret;
3811
3812     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3813     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
3814       goto activate_failed;
3815   }
3816
3817   switch (transition) {
3818     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3819       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
3820       /* FIXME, make sure we cannot enter _render first */
3821
3822       /* we need to call ::unlock before locking PREROLL_LOCK
3823        * since we lock it before going into ::render */
3824       if (bclass->unlock)
3825         bclass->unlock (basesink);
3826
3827       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3828       /* now that we have the PREROLL lock, clear our unlock request */
3829       if (bclass->unlock_stop)
3830         bclass->unlock_stop (basesink);
3831
3832       /* we need preroll again and we set the flag before unlocking the clockid
3833        * because if the clockid is unlocked before a current buffer expired, we
3834        * can use that buffer to preroll with */
3835       basesink->need_preroll = TRUE;
3836
3837       if (basesink->clock_id) {
3838         gst_clock_id_unschedule (basesink->clock_id);
3839       }
3840
3841       /* if we don't have a preroll buffer we need to wait for a preroll and
3842        * return ASYNC. */
3843       if (!gst_base_sink_needs_preroll (basesink)) {
3844         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
3845         basesink->playing_async = FALSE;
3846       } else {
3847         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
3848           ret = GST_STATE_CHANGE_SUCCESS;
3849         } else {
3850           GST_DEBUG_OBJECT (basesink,
3851               "PLAYING to PAUSED, we are not prerolled");
3852           basesink->playing_async = TRUE;
3853           priv->commited = FALSE;
3854           if (priv->async_enabled) {
3855             GST_DEBUG_OBJECT (basesink, "doing async state change");
3856             ret = GST_STATE_CHANGE_ASYNC;
3857             gst_element_post_message (GST_ELEMENT_CAST (basesink),
3858                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
3859                     FALSE));
3860           }
3861         }
3862       }
3863       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
3864           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
3865
3866       gst_base_sink_reset_qos (basesink);
3867       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3868       break;
3869     case GST_STATE_CHANGE_PAUSED_TO_READY:
3870       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3871       /* start by reseting our position state with the object lock so that the
3872        * position query gets the right idea. We do this before we post the
3873        * messages so that the message handlers pick this up. */
3874       GST_OBJECT_LOCK (basesink);
3875       basesink->have_newsegment = FALSE;
3876       priv->current_sstart = -1;
3877       priv->current_sstop = -1;
3878       priv->have_latency = FALSE;
3879       GST_OBJECT_UNLOCK (basesink);
3880
3881       gst_base_sink_set_last_buffer (basesink, NULL);
3882
3883       if (!priv->commited) {
3884         if (priv->async_enabled) {
3885           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
3886
3887           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3888               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
3889                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
3890
3891           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3892               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
3893         }
3894         priv->commited = TRUE;
3895       } else {
3896         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
3897       }
3898       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3899       break;
3900     case GST_STATE_CHANGE_READY_TO_NULL:
3901       if (bclass->stop) {
3902         if (!bclass->stop (basesink)) {
3903           GST_WARNING_OBJECT (basesink, "failed to stop");
3904         }
3905       }
3906       gst_base_sink_set_last_buffer (basesink, NULL);
3907       break;
3908     default:
3909       break;
3910   }
3911
3912   return ret;
3913
3914   /* ERRORS */
3915 start_failed:
3916   {
3917     GST_DEBUG_OBJECT (basesink, "failed to start");
3918     return GST_STATE_CHANGE_FAILURE;
3919   }
3920 activate_failed:
3921   {
3922     GST_DEBUG_OBJECT (basesink,
3923         "element failed to change states -- activation problem?");
3924     return GST_STATE_CHANGE_FAILURE;
3925   }
3926 }