basesink; handle EOS correctly.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
156
157 typedef struct
158 {
159   gboolean valid;               /* if this info is valid */
160   guint32 seqnum;               /* the seqnum of the STEP event */
161   GstFormat format;             /* the format of the amount */
162   guint64 amount;               /* the total amount of data to skip */
163   guint64 position;             /* the position in the stepped data */
164   guint64 duration;             /* the duration in time of the skipped data */
165   guint64 start;                /* running_time of the start */
166   gdouble rate;                 /* rate of skipping */
167   gboolean flush;               /* if this was a flushing step */
168   gboolean intermediate;        /* if this is an intermediate step */
169   gboolean need_preroll;        /* if we need preroll after this step */
170 } GstStepInfo;
171
172 /* FIXME, some stuff in ABI.data and other in Private...
173  * Make up your mind please.
174  */
175 struct _GstBaseSinkPrivate
176 {
177   gint qos_enabled;             /* ATOMIC */
178   gboolean async_enabled;
179   GstClockTimeDiff ts_offset;
180   GstClockTime render_delay;
181
182   /* start, stop of current buffer, stream time, used to report position */
183   GstClockTime current_sstart;
184   GstClockTime current_sstop;
185
186   /* start, stop and jitter of current buffer, running time */
187   GstClockTime current_rstart;
188   GstClockTime current_rstop;
189   GstClockTimeDiff current_jitter;
190
191   /* EOS sync time in running time */
192   GstClockTime eos_rtime;
193
194   /* last buffer that arrived in time, running time */
195   GstClockTime last_in_time;
196   /* when the last buffer left the sink, running time */
197   GstClockTime last_left;
198
199   /* running averages go here these are done on running time */
200   GstClockTime avg_pt;
201   GstClockTime avg_duration;
202   gdouble avg_rate;
203
204   /* these are done on system time. avg_jitter and avg_render are
205    * compared to eachother to see if the rendering time takes a
206    * huge amount of the processing, If so we are flooded with
207    * buffers. */
208   GstClockTime last_left_systime;
209   GstClockTime avg_jitter;
210   GstClockTime start, stop;
211   GstClockTime avg_render;
212
213   /* number of rendered and dropped frames */
214   guint64 rendered;
215   guint64 dropped;
216
217   /* latency stuff */
218   GstClockTime latency;
219
220   /* if we already commited the state */
221   gboolean commited;
222
223   /* when we received EOS */
224   gboolean received_eos;
225
226   /* when we are prerolled and able to report latency */
227   gboolean have_latency;
228
229   /* the last buffer we prerolled or rendered. Useful for making snapshots */
230   GstBuffer *last_buffer;
231
232   /* caps for pull based scheduling */
233   GstCaps *pull_caps;
234
235   /* blocksize for pulling */
236   guint blocksize;
237
238   gboolean discont;
239
240   /* seqnum of the stream */
241   guint32 seqnum;
242
243   gboolean call_preroll;
244   gboolean step_unlock;
245
246   /* we have a pending and a current step operation */
247   GstStepInfo current_step;
248   GstStepInfo pending_step;
249 };
250
251 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
252
253 /* generic running average, this has a neutral window size */
254 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
255
256 /* the windows for these running averages are experimentally obtained.
257  * possitive values get averaged more while negative values use a small
258  * window so we can react faster to badness. */
259 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
260 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
261
262 /* BaseSink properties */
263
264 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
265 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
266
267 #define DEFAULT_PREROLL_QUEUE_LEN       0
268 #define DEFAULT_SYNC                    TRUE
269 #define DEFAULT_MAX_LATENESS            -1
270 #define DEFAULT_QOS                     FALSE
271 #define DEFAULT_ASYNC                   TRUE
272 #define DEFAULT_TS_OFFSET               0
273 #define DEFAULT_BLOCKSIZE               4096
274 #define DEFAULT_RENDER_DELAY            0
275
276 enum
277 {
278   PROP_0,
279   PROP_PREROLL_QUEUE_LEN,
280   PROP_SYNC,
281   PROP_MAX_LATENESS,
282   PROP_QOS,
283   PROP_ASYNC,
284   PROP_TS_OFFSET,
285   PROP_LAST_BUFFER,
286   PROP_BLOCKSIZE,
287   PROP_RENDER_DELAY,
288   PROP_LAST
289 };
290
291 static GstElementClass *parent_class = NULL;
292
293 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
294 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
295 static void gst_base_sink_finalize (GObject * object);
296
297 GType
298 gst_base_sink_get_type (void)
299 {
300   static volatile gsize base_sink_type = 0;
301
302   if (g_once_init_enter (&base_sink_type)) {
303     GType _type;
304     static const GTypeInfo base_sink_info = {
305       sizeof (GstBaseSinkClass),
306       NULL,
307       NULL,
308       (GClassInitFunc) gst_base_sink_class_init,
309       NULL,
310       NULL,
311       sizeof (GstBaseSink),
312       0,
313       (GInstanceInitFunc) gst_base_sink_init,
314     };
315
316     _type = g_type_register_static (GST_TYPE_ELEMENT,
317         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
318     g_once_init_leave (&base_sink_type, _type);
319   }
320   return base_sink_type;
321 }
322
323 static void gst_base_sink_set_property (GObject * object, guint prop_id,
324     const GValue * value, GParamSpec * pspec);
325 static void gst_base_sink_get_property (GObject * object, guint prop_id,
326     GValue * value, GParamSpec * pspec);
327
328 static gboolean gst_base_sink_send_event (GstElement * element,
329     GstEvent * event);
330 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
331
332 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
333 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
334 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
335     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
336 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
337     GstClockTime * start, GstClockTime * end);
338 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
339     GstPad * pad, gboolean flushing);
340 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
341     gboolean active);
342 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
343     GstSegment * segment);
344 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
345     GstEvent * event, GstSegment * segment);
346
347 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
348     GstStateChange transition);
349
350 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
351 static void gst_base_sink_loop (GstPad * pad);
352 static gboolean gst_base_sink_pad_activate (GstPad * pad);
353 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
354 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
355 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
356 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
357
358 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
359
360 /* check if an object was too late */
361 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
362     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
363     GstClockReturn status, GstClockTimeDiff jitter);
364 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
365     GstMiniObject * obj);
366
367 static void
368 gst_base_sink_class_init (GstBaseSinkClass * klass)
369 {
370   GObjectClass *gobject_class;
371   GstElementClass *gstelement_class;
372
373   gobject_class = G_OBJECT_CLASS (klass);
374   gstelement_class = GST_ELEMENT_CLASS (klass);
375
376   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
377       "basesink element");
378
379   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
380
381   parent_class = g_type_class_peek_parent (klass);
382
383   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
384   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
385   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
386
387   /* FIXME, this next value should be configured using an event from the
388    * upstream element, ie, the BUFFER_SIZE event. */
389   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
390       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
391           "Number of buffers to queue during preroll", 0, G_MAXUINT,
392           DEFAULT_PREROLL_QUEUE_LEN,
393           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
394
395   g_object_class_install_property (gobject_class, PROP_SYNC,
396       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
397           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
400       g_param_spec_int64 ("max-lateness", "Max Lateness",
401           "Maximum number of nanoseconds that a buffer can be late before it "
402           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
403           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404
405   g_object_class_install_property (gobject_class, PROP_QOS,
406       g_param_spec_boolean ("qos", "Qos",
407           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstBaseSink:async
411    *
412    * If set to #TRUE, the basesink will perform asynchronous state changes.
413    * When set to #FALSE, the sink will not signal the parent when it prerolls.
414    * Use this option when dealing with sparse streams or when synchronisation is
415    * not required.
416    *
417    * Since: 0.10.15
418    */
419   g_object_class_install_property (gobject_class, PROP_ASYNC,
420       g_param_spec_boolean ("async", "Async",
421           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
422           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423   /**
424    * GstBaseSink:ts-offset
425    *
426    * Controls the final synchronisation, a negative value will render the buffer
427    * earlier while a positive value delays playback. This property can be 
428    * used to fix synchronisation in bad files.
429    *
430    * Since: 0.10.15
431    */
432   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
433       g_param_spec_int64 ("ts-offset", "TS Offset",
434           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
435           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436   /**
437    * GstBaseSink:last-buffer
438    *
439    * The last buffer that arrived in the sink and was used for preroll or for
440    * rendering. This property can be used to generate thumbnails. This property
441    * can be NULL when the sink has not yet received a bufer.
442    *
443    * Since: 0.10.15
444    */
445   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
446       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
447           "The last buffer received in the sink", GST_TYPE_BUFFER,
448           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
449   /**
450    * GstBaseSink:blocksize
451    *
452    * The amount of bytes to pull when operating in pull mode.
453    *
454    * Since: 0.10.22
455    */
456   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
457       g_param_spec_uint ("blocksize", "Block size",
458           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
459           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
460   /**
461    * GstBaseSink:render-delay
462    *
463    * The additional delay between synchronisation and actual rendering of the
464    * media. This property will add additional latency to the device in order to
465    * make other sinks compensate for the delay.
466    *
467    * Since: 0.10.22
468    */
469   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
470       g_param_spec_uint64 ("render-delay", "Render Delay",
471           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
472           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
473
474   gstelement_class->change_state =
475       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
476   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
477   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
478
479   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
480   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
481   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
482   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
483   klass->activate_pull =
484       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
485 }
486
487 static GstCaps *
488 gst_base_sink_pad_getcaps (GstPad * pad)
489 {
490   GstBaseSinkClass *bclass;
491   GstBaseSink *bsink;
492   GstCaps *caps = NULL;
493
494   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
495   bclass = GST_BASE_SINK_GET_CLASS (bsink);
496
497   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
498     /* if we are operating in pull mode we only accept the negotiated caps */
499     GST_OBJECT_LOCK (pad);
500     if ((caps = GST_PAD_CAPS (pad)))
501       gst_caps_ref (caps);
502     GST_OBJECT_UNLOCK (pad);
503   }
504   if (caps == NULL) {
505     if (bclass->get_caps)
506       caps = bclass->get_caps (bsink);
507
508     if (caps == NULL) {
509       GstPadTemplate *pad_template;
510
511       pad_template =
512           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
513           "sink");
514       if (pad_template != NULL) {
515         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
516       }
517     }
518   }
519   gst_object_unref (bsink);
520
521   return caps;
522 }
523
524 static gboolean
525 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
526 {
527   GstBaseSinkClass *bclass;
528   GstBaseSink *bsink;
529   gboolean res = TRUE;
530
531   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
532   bclass = GST_BASE_SINK_GET_CLASS (bsink);
533
534   if (res && bclass->set_caps)
535     res = bclass->set_caps (bsink, caps);
536
537   gst_object_unref (bsink);
538
539   return res;
540 }
541
542 static void
543 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
544 {
545   GstBaseSinkClass *bclass;
546   GstBaseSink *bsink;
547
548   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
549   bclass = GST_BASE_SINK_GET_CLASS (bsink);
550
551   if (bclass->fixate)
552     bclass->fixate (bsink, caps);
553
554   gst_object_unref (bsink);
555 }
556
557 static GstFlowReturn
558 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
559     GstCaps * caps, GstBuffer ** buf)
560 {
561   GstBaseSinkClass *bclass;
562   GstBaseSink *bsink;
563   GstFlowReturn result = GST_FLOW_OK;
564
565   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
566   bclass = GST_BASE_SINK_GET_CLASS (bsink);
567
568   if (bclass->buffer_alloc)
569     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
570   else
571     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
572
573   gst_object_unref (bsink);
574
575   return result;
576 }
577
578 static void
579 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
580 {
581   GstPadTemplate *pad_template;
582   GstBaseSinkPrivate *priv;
583
584   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
585
586   pad_template =
587       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
588   g_return_if_fail (pad_template != NULL);
589
590   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
591
592   gst_pad_set_getcaps_function (basesink->sinkpad,
593       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
594   gst_pad_set_setcaps_function (basesink->sinkpad,
595       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
596   gst_pad_set_fixatecaps_function (basesink->sinkpad,
597       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
598   gst_pad_set_bufferalloc_function (basesink->sinkpad,
599       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
600   gst_pad_set_activate_function (basesink->sinkpad,
601       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
602   gst_pad_set_activatepush_function (basesink->sinkpad,
603       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
604   gst_pad_set_activatepull_function (basesink->sinkpad,
605       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
606   gst_pad_set_event_function (basesink->sinkpad,
607       GST_DEBUG_FUNCPTR (gst_base_sink_event));
608   gst_pad_set_chain_function (basesink->sinkpad,
609       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
610   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
611
612   basesink->pad_mode = GST_ACTIVATE_NONE;
613   basesink->preroll_queue = g_queue_new ();
614   basesink->abidata.ABI.clip_segment = gst_segment_new ();
615   priv->have_latency = FALSE;
616
617   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
618   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
619
620   basesink->sync = DEFAULT_SYNC;
621   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
622   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
623   priv->async_enabled = DEFAULT_ASYNC;
624   priv->ts_offset = DEFAULT_TS_OFFSET;
625   priv->render_delay = DEFAULT_RENDER_DELAY;
626   priv->blocksize = DEFAULT_BLOCKSIZE;
627
628   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
629 }
630
631 static void
632 gst_base_sink_finalize (GObject * object)
633 {
634   GstBaseSink *basesink;
635
636   basesink = GST_BASE_SINK (object);
637
638   g_queue_free (basesink->preroll_queue);
639   gst_segment_free (basesink->abidata.ABI.clip_segment);
640
641   G_OBJECT_CLASS (parent_class)->finalize (object);
642 }
643
644 /**
645  * gst_base_sink_set_sync:
646  * @sink: the sink
647  * @sync: the new sync value.
648  *
649  * Configures @sink to synchronize on the clock or not. When
650  * @sync is FALSE, incomming samples will be played as fast as
651  * possible. If @sync is TRUE, the timestamps of the incomming
652  * buffers will be used to schedule the exact render time of its
653  * contents.
654  *
655  * Since: 0.10.4
656  */
657 void
658 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
659 {
660   g_return_if_fail (GST_IS_BASE_SINK (sink));
661
662   GST_OBJECT_LOCK (sink);
663   sink->sync = sync;
664   GST_OBJECT_UNLOCK (sink);
665 }
666
667 /**
668  * gst_base_sink_get_sync:
669  * @sink: the sink
670  *
671  * Checks if @sink is currently configured to synchronize against the
672  * clock.
673  *
674  * Returns: TRUE if the sink is configured to synchronize against the clock.
675  *
676  * Since: 0.10.4
677  */
678 gboolean
679 gst_base_sink_get_sync (GstBaseSink * sink)
680 {
681   gboolean res;
682
683   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
684
685   GST_OBJECT_LOCK (sink);
686   res = sink->sync;
687   GST_OBJECT_UNLOCK (sink);
688
689   return res;
690 }
691
692 /**
693  * gst_base_sink_set_max_lateness:
694  * @sink: the sink
695  * @max_lateness: the new max lateness value.
696  *
697  * Sets the new max lateness value to @max_lateness. This value is
698  * used to decide if a buffer should be dropped or not based on the
699  * buffer timestamp and the current clock time. A value of -1 means
700  * an unlimited time.
701  *
702  * Since: 0.10.4
703  */
704 void
705 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
706 {
707   g_return_if_fail (GST_IS_BASE_SINK (sink));
708
709   GST_OBJECT_LOCK (sink);
710   sink->abidata.ABI.max_lateness = max_lateness;
711   GST_OBJECT_UNLOCK (sink);
712 }
713
714 /**
715  * gst_base_sink_get_max_lateness:
716  * @sink: the sink
717  *
718  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
719  * more details.
720  *
721  * Returns: The maximum time in nanoseconds that a buffer can be late
722  * before it is dropped and not rendered. A value of -1 means an
723  * unlimited time.
724  *
725  * Since: 0.10.4
726  */
727 gint64
728 gst_base_sink_get_max_lateness (GstBaseSink * sink)
729 {
730   gint64 res;
731
732   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
733
734   GST_OBJECT_LOCK (sink);
735   res = sink->abidata.ABI.max_lateness;
736   GST_OBJECT_UNLOCK (sink);
737
738   return res;
739 }
740
741 /**
742  * gst_base_sink_set_qos_enabled:
743  * @sink: the sink
744  * @enabled: the new qos value.
745  *
746  * Configures @sink to send Quality-of-Service events upstream.
747  *
748  * Since: 0.10.5
749  */
750 void
751 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
752 {
753   g_return_if_fail (GST_IS_BASE_SINK (sink));
754
755   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
756 }
757
758 /**
759  * gst_base_sink_is_qos_enabled:
760  * @sink: the sink
761  *
762  * Checks if @sink is currently configured to send Quality-of-Service events
763  * upstream.
764  *
765  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
766  *
767  * Since: 0.10.5
768  */
769 gboolean
770 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
771 {
772   gboolean res;
773
774   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
775
776   res = g_atomic_int_get (&sink->priv->qos_enabled);
777
778   return res;
779 }
780
781 /**
782  * gst_base_sink_set_async_enabled:
783  * @sink: the sink
784  * @enabled: the new async value.
785  *
786  * Configures @sink to perform all state changes asynchronusly. When async is
787  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
788  * preroll buffer. This feature is usefull if the sink does not synchronize
789  * against the clock or when it is dealing with sparse streams.
790  *
791  * Since: 0.10.15
792  */
793 void
794 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
795 {
796   g_return_if_fail (GST_IS_BASE_SINK (sink));
797
798   GST_PAD_PREROLL_LOCK (sink->sinkpad);
799   sink->priv->async_enabled = enabled;
800   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
801   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
802 }
803
804 /**
805  * gst_base_sink_is_async_enabled:
806  * @sink: the sink
807  *
808  * Checks if @sink is currently configured to perform asynchronous state
809  * changes to PAUSED.
810  *
811  * Returns: TRUE if the sink is configured to perform asynchronous state
812  * changes.
813  *
814  * Since: 0.10.15
815  */
816 gboolean
817 gst_base_sink_is_async_enabled (GstBaseSink * sink)
818 {
819   gboolean res;
820
821   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
822
823   GST_PAD_PREROLL_LOCK (sink->sinkpad);
824   res = sink->priv->async_enabled;
825   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
826
827   return res;
828 }
829
830 /**
831  * gst_base_sink_set_ts_offset:
832  * @sink: the sink
833  * @offset: the new offset
834  *
835  * Adjust the synchronisation of @sink with @offset. A negative value will
836  * render buffers earlier than their timestamp. A positive value will delay
837  * rendering. This function can be used to fix playback of badly timestamped
838  * buffers.
839  *
840  * Since: 0.10.15
841  */
842 void
843 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
844 {
845   g_return_if_fail (GST_IS_BASE_SINK (sink));
846
847   GST_OBJECT_LOCK (sink);
848   sink->priv->ts_offset = offset;
849   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
850   GST_OBJECT_UNLOCK (sink);
851 }
852
853 /**
854  * gst_base_sink_get_ts_offset:
855  * @sink: the sink
856  *
857  * Get the synchronisation offset of @sink.
858  *
859  * Returns: The synchronisation offset.
860  *
861  * Since: 0.10.15
862  */
863 GstClockTimeDiff
864 gst_base_sink_get_ts_offset (GstBaseSink * sink)
865 {
866   GstClockTimeDiff res;
867
868   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
869
870   GST_OBJECT_LOCK (sink);
871   res = sink->priv->ts_offset;
872   GST_OBJECT_UNLOCK (sink);
873
874   return res;
875 }
876
877 /**
878  * gst_base_sink_get_last_buffer:
879  * @sink: the sink
880  *
881  * Get the last buffer that arrived in the sink and was used for preroll or for
882  * rendering. This property can be used to generate thumbnails.
883  *
884  * The #GstCaps on the buffer can be used to determine the type of the buffer.
885  * 
886  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
887  * NULL when no buffer has arrived in the sink yet or when the sink is not in
888  * PAUSED or PLAYING.
889  *
890  * Since: 0.10.15
891  */
892 GstBuffer *
893 gst_base_sink_get_last_buffer (GstBaseSink * sink)
894 {
895   GstBuffer *res;
896
897   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
898
899   GST_OBJECT_LOCK (sink);
900   if ((res = sink->priv->last_buffer))
901     gst_buffer_ref (res);
902   GST_OBJECT_UNLOCK (sink);
903
904   return res;
905 }
906
907 static void
908 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
909 {
910   GstBuffer *old;
911
912   GST_OBJECT_LOCK (sink);
913   old = sink->priv->last_buffer;
914   if (G_LIKELY (old != buffer)) {
915     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
916     if (G_LIKELY (buffer))
917       gst_buffer_ref (buffer);
918     sink->priv->last_buffer = buffer;
919   } else {
920     old = NULL;
921   }
922   GST_OBJECT_UNLOCK (sink);
923
924   /* avoid unreffing with the lock because cleanup code might want to take the
925    * lock too */
926   if (G_LIKELY (old))
927     gst_buffer_unref (old);
928 }
929
930 /**
931  * gst_base_sink_get_latency:
932  * @sink: the sink
933  *
934  * Get the currently configured latency.
935  *
936  * Returns: The configured latency.
937  *
938  * Since: 0.10.12
939  */
940 GstClockTime
941 gst_base_sink_get_latency (GstBaseSink * sink)
942 {
943   GstClockTime res;
944
945   GST_OBJECT_LOCK (sink);
946   res = sink->priv->latency;
947   GST_OBJECT_UNLOCK (sink);
948
949   return res;
950 }
951
952 /**
953  * gst_base_sink_query_latency:
954  * @sink: the sink
955  * @live: if the sink is live
956  * @upstream_live: if an upstream element is live
957  * @min_latency: the min latency of the upstream elements
958  * @max_latency: the max latency of the upstream elements
959  *
960  * Query the sink for the latency parameters. The latency will be queried from
961  * the upstream elements. @live will be TRUE if @sink is configured to
962  * synchronize against the clock. @upstream_live will be TRUE if an upstream
963  * element is live. 
964  *
965  * If both @live and @upstream_live are TRUE, the sink will want to compensate
966  * for the latency introduced by the upstream elements by setting the
967  * @min_latency to a strictly possitive value.
968  *
969  * This function is mostly used by subclasses. 
970  *
971  * Returns: TRUE if the query succeeded.
972  *
973  * Since: 0.10.12
974  */
975 gboolean
976 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
977     gboolean * upstream_live, GstClockTime * min_latency,
978     GstClockTime * max_latency)
979 {
980   gboolean l, us_live, res, have_latency;
981   GstClockTime min, max, render_delay;
982   GstQuery *query;
983   GstClockTime us_min, us_max;
984
985   /* we are live when we sync to the clock */
986   GST_OBJECT_LOCK (sink);
987   l = sink->sync;
988   have_latency = sink->priv->have_latency;
989   render_delay = sink->priv->render_delay;
990   GST_OBJECT_UNLOCK (sink);
991
992   /* assume no latency */
993   min = 0;
994   max = -1;
995   us_live = FALSE;
996
997   if (have_latency) {
998     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
999     /* we are ready for a latency query this is when we preroll or when we are
1000      * not async. */
1001     query = gst_query_new_latency ();
1002
1003     /* ask the peer for the latency */
1004     if ((res = gst_base_sink_peer_query (sink, query))) {
1005       /* get upstream min and max latency */
1006       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1007
1008       if (us_live) {
1009         /* upstream live, use its latency, subclasses should use these
1010          * values to create the complete latency. */
1011         min = us_min;
1012         max = us_max;
1013       }
1014       if (l) {
1015         /* we need to add the render delay if we are live */
1016         if (min != -1)
1017           min += render_delay;
1018         if (max != -1)
1019           max += render_delay;
1020       }
1021     }
1022     gst_query_unref (query);
1023   } else {
1024     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1025     res = FALSE;
1026   }
1027
1028   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1029   if (!res) {
1030     if (!l) {
1031       res = TRUE;
1032       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1033     } else {
1034       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1035     }
1036   }
1037
1038   if (res) {
1039     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1040         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1041         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1042
1043     if (live)
1044       *live = l;
1045     if (upstream_live)
1046       *upstream_live = us_live;
1047     if (min_latency)
1048       *min_latency = min;
1049     if (max_latency)
1050       *max_latency = max;
1051   }
1052   return res;
1053 }
1054
1055 /**
1056  * gst_base_sink_set_render_delay:
1057  * @sink: a #GstBaseSink
1058  * @delay: the new delay
1059  *
1060  * Set the render delay in @sink to @delay. The render delay is the time 
1061  * between actual rendering of a buffer and its synchronisation time. Some
1062  * devices might delay media rendering which can be compensated for with this
1063  * function. 
1064  *
1065  * After calling this function, this sink will report additional latency and
1066  * other sinks will adjust their latency to delay the rendering of their media.
1067  *
1068  * This function is usually called by subclasses.
1069  *
1070  * Since: 0.10.21
1071  */
1072 void
1073 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1074 {
1075   GstClockTime old_render_delay;
1076
1077   g_return_if_fail (GST_IS_BASE_SINK (sink));
1078
1079   GST_OBJECT_LOCK (sink);
1080   old_render_delay = sink->priv->render_delay;
1081   sink->priv->render_delay = delay;
1082   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1083       GST_TIME_ARGS (delay));
1084   GST_OBJECT_UNLOCK (sink);
1085
1086   if (delay != old_render_delay) {
1087     GST_DEBUG_OBJECT (sink, "posting latency changed");
1088     gst_element_post_message (GST_ELEMENT_CAST (sink),
1089         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1090   }
1091 }
1092
1093 /**
1094  * gst_base_sink_get_render_delay:
1095  * @sink: a #GstBaseSink
1096  *
1097  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1098  * information about the render delay.
1099  *
1100  * Returns: the render delay of @sink.
1101  *
1102  * Since: 0.10.21
1103  */
1104 GstClockTime
1105 gst_base_sink_get_render_delay (GstBaseSink * sink)
1106 {
1107   GstClockTimeDiff res;
1108
1109   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1110
1111   GST_OBJECT_LOCK (sink);
1112   res = sink->priv->render_delay;
1113   GST_OBJECT_UNLOCK (sink);
1114
1115   return res;
1116 }
1117
1118 /**
1119  * gst_base_sink_set_blocksize:
1120  * @sink: a #GstBaseSink
1121  * @blocksize: the blocksize in bytes
1122  *
1123  * Set the number of bytes that the sink will pull when it is operating in pull
1124  * mode.
1125  *
1126  * Since: 0.10.22
1127  */
1128 void
1129 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1130 {
1131   g_return_if_fail (GST_IS_BASE_SINK (sink));
1132
1133   GST_OBJECT_LOCK (sink);
1134   sink->priv->blocksize = blocksize;
1135   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1136   GST_OBJECT_UNLOCK (sink);
1137 }
1138
1139 /**
1140  * gst_base_sink_get_blocksize:
1141  * @sink: a #GstBaseSink
1142  *
1143  * Get the number of bytes that the sink will pull when it is operating in pull
1144  * mode.
1145  *
1146  * Returns: the number of bytes @sink will pull in pull mode.
1147  *
1148  * Since: 0.10.22
1149  */
1150 guint
1151 gst_base_sink_get_blocksize (GstBaseSink * sink)
1152 {
1153   guint res;
1154
1155   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1156
1157   GST_OBJECT_LOCK (sink);
1158   res = sink->priv->blocksize;
1159   GST_OBJECT_UNLOCK (sink);
1160
1161   return res;
1162 }
1163
1164 static void
1165 gst_base_sink_set_property (GObject * object, guint prop_id,
1166     const GValue * value, GParamSpec * pspec)
1167 {
1168   GstBaseSink *sink = GST_BASE_SINK (object);
1169
1170   switch (prop_id) {
1171     case PROP_PREROLL_QUEUE_LEN:
1172       /* preroll lock necessary to serialize with finish_preroll */
1173       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1174       sink->preroll_queue_max_len = g_value_get_uint (value);
1175       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1176       break;
1177     case PROP_SYNC:
1178       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1179       break;
1180     case PROP_MAX_LATENESS:
1181       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1182       break;
1183     case PROP_QOS:
1184       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1185       break;
1186     case PROP_ASYNC:
1187       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1188       break;
1189     case PROP_TS_OFFSET:
1190       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1191       break;
1192     case PROP_BLOCKSIZE:
1193       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1194       break;
1195     case PROP_RENDER_DELAY:
1196       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1197       break;
1198     default:
1199       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1200       break;
1201   }
1202 }
1203
1204 static void
1205 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1206     GParamSpec * pspec)
1207 {
1208   GstBaseSink *sink = GST_BASE_SINK (object);
1209
1210   switch (prop_id) {
1211     case PROP_PREROLL_QUEUE_LEN:
1212       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1213       g_value_set_uint (value, sink->preroll_queue_max_len);
1214       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1215       break;
1216     case PROP_SYNC:
1217       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1218       break;
1219     case PROP_MAX_LATENESS:
1220       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1221       break;
1222     case PROP_QOS:
1223       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1224       break;
1225     case PROP_ASYNC:
1226       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1227       break;
1228     case PROP_TS_OFFSET:
1229       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1230       break;
1231     case PROP_LAST_BUFFER:
1232       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1233       break;
1234     case PROP_BLOCKSIZE:
1235       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1236       break;
1237     case PROP_RENDER_DELAY:
1238       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1239       break;
1240     default:
1241       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1242       break;
1243   }
1244 }
1245
1246
1247 static GstCaps *
1248 gst_base_sink_get_caps (GstBaseSink * sink)
1249 {
1250   return NULL;
1251 }
1252
1253 static gboolean
1254 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1255 {
1256   return TRUE;
1257 }
1258
1259 static GstFlowReturn
1260 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1261     GstCaps * caps, GstBuffer ** buf)
1262 {
1263   *buf = NULL;
1264   return GST_FLOW_OK;
1265 }
1266
1267 /* with PREROLL_LOCK, STREAM_LOCK */
1268 static void
1269 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1270 {
1271   GstMiniObject *obj;
1272
1273   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1274   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1275     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1276     gst_mini_object_unref (obj);
1277   }
1278   /* we can't have EOS anymore now */
1279   basesink->eos = FALSE;
1280   basesink->priv->received_eos = FALSE;
1281   basesink->have_preroll = FALSE;
1282   basesink->priv->step_unlock = FALSE;
1283   basesink->eos_queued = FALSE;
1284   basesink->preroll_queued = 0;
1285   basesink->buffers_queued = 0;
1286   basesink->events_queued = 0;
1287   /* can't report latency anymore until we preroll again */
1288   if (basesink->priv->async_enabled) {
1289     GST_OBJECT_LOCK (basesink);
1290     basesink->priv->have_latency = FALSE;
1291     GST_OBJECT_UNLOCK (basesink);
1292   }
1293   /* and signal any waiters now */
1294   GST_PAD_PREROLL_SIGNAL (pad);
1295 }
1296
1297 /* with STREAM_LOCK, configures given segment with the event information. */
1298 static void
1299 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1300     GstEvent * event, GstSegment * segment)
1301 {
1302   gboolean update;
1303   gdouble rate, arate;
1304   GstFormat format;
1305   gint64 start;
1306   gint64 stop;
1307   gint64 time;
1308
1309   /* the newsegment event is needed to bring the buffer timestamps to the
1310    * stream time and to drop samples outside of the playback segment. */
1311   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1312       &start, &stop, &time);
1313
1314   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1315    * We protect with the OBJECT_LOCK so that we can use the values to
1316    * safely answer a POSITION query. */
1317   GST_OBJECT_LOCK (basesink);
1318   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1319       stop, time);
1320
1321   if (format == GST_FORMAT_TIME) {
1322     GST_DEBUG_OBJECT (basesink,
1323         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1324         "format GST_FORMAT_TIME, "
1325         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1326         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1327         update, rate, arate, GST_TIME_ARGS (segment->start),
1328         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1329         GST_TIME_ARGS (segment->accum));
1330   } else {
1331     GST_DEBUG_OBJECT (basesink,
1332         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1333         "format %d, "
1334         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1335         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1336         segment->format, segment->start, segment->stop, segment->time,
1337         segment->accum);
1338   }
1339   GST_OBJECT_UNLOCK (basesink);
1340 }
1341
1342 /* with PREROLL_LOCK, STREAM_LOCK */
1343 static gboolean
1344 gst_base_sink_commit_state (GstBaseSink * basesink)
1345 {
1346   /* commit state and proceed to next pending state */
1347   GstState current, next, pending, post_pending;
1348   gboolean post_paused = FALSE;
1349   gboolean post_async_done = FALSE;
1350   gboolean post_playing = FALSE;
1351
1352   /* we are certainly not playing async anymore now */
1353   basesink->playing_async = FALSE;
1354
1355   GST_OBJECT_LOCK (basesink);
1356   current = GST_STATE (basesink);
1357   next = GST_STATE_NEXT (basesink);
1358   pending = GST_STATE_PENDING (basesink);
1359   post_pending = pending;
1360
1361   switch (pending) {
1362     case GST_STATE_PLAYING:
1363     {
1364       GstBaseSinkClass *bclass;
1365       GstStateChangeReturn ret;
1366
1367       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1368
1369       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1370
1371       basesink->need_preroll = FALSE;
1372       post_async_done = TRUE;
1373       basesink->priv->commited = TRUE;
1374       post_playing = TRUE;
1375       /* post PAUSED too when we were READY */
1376       if (current == GST_STATE_READY) {
1377         post_paused = TRUE;
1378       }
1379
1380       /* make sure we notify the subclass of async playing */
1381       if (bclass->async_play) {
1382         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1383         ret = bclass->async_play (basesink);
1384         if (ret == GST_STATE_CHANGE_FAILURE)
1385           goto async_failed;
1386       }
1387       break;
1388     }
1389     case GST_STATE_PAUSED:
1390       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1391       post_paused = TRUE;
1392       post_async_done = TRUE;
1393       basesink->priv->commited = TRUE;
1394       post_pending = GST_STATE_VOID_PENDING;
1395       break;
1396     case GST_STATE_READY:
1397     case GST_STATE_NULL:
1398       goto stopping;
1399     case GST_STATE_VOID_PENDING:
1400       goto nothing_pending;
1401     default:
1402       break;
1403   }
1404
1405   /* we can report latency queries now */
1406   basesink->priv->have_latency = TRUE;
1407
1408   GST_STATE (basesink) = pending;
1409   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1410   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1411   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1412   GST_OBJECT_UNLOCK (basesink);
1413
1414   if (post_paused) {
1415     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1416     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1417         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1418             current, next, post_pending));
1419   }
1420   if (post_async_done) {
1421     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1422     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1423         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1424   }
1425   if (post_playing) {
1426     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1427     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1428         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1429             next, pending, GST_STATE_VOID_PENDING));
1430   }
1431
1432   GST_STATE_BROADCAST (basesink);
1433
1434   return TRUE;
1435
1436 nothing_pending:
1437   {
1438     /* Depending on the state, set our vars. We get in this situation when the
1439      * state change function got a change to update the state vars before the
1440      * streaming thread did. This is fine but we need to make sure that we
1441      * update the need_preroll var since it was TRUE when we got here and might
1442      * become FALSE if we got to PLAYING. */
1443     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1444         gst_element_state_get_name (current));
1445     switch (current) {
1446       case GST_STATE_PLAYING:
1447         basesink->need_preroll = FALSE;
1448         break;
1449       case GST_STATE_PAUSED:
1450         basesink->need_preroll = TRUE;
1451         break;
1452       default:
1453         basesink->need_preroll = FALSE;
1454         basesink->flushing = TRUE;
1455         break;
1456     }
1457     /* we can report latency queries now */
1458     basesink->priv->have_latency = TRUE;
1459     GST_OBJECT_UNLOCK (basesink);
1460     return TRUE;
1461   }
1462 stopping:
1463   {
1464     /* app is going to READY */
1465     GST_DEBUG_OBJECT (basesink, "stopping");
1466     basesink->need_preroll = FALSE;
1467     basesink->flushing = TRUE;
1468     GST_OBJECT_UNLOCK (basesink);
1469     return FALSE;
1470   }
1471 async_failed:
1472   {
1473     GST_DEBUG_OBJECT (basesink, "async commit failed");
1474     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1475     GST_OBJECT_UNLOCK (basesink);
1476     return FALSE;
1477   }
1478 }
1479
1480 static void
1481 start_stepping (GstBaseSink * sink, GstSegment * segment,
1482     GstStepInfo * pending, GstStepInfo * current)
1483 {
1484   GST_DEBUG_OBJECT (sink, "update pending step");
1485   memcpy (current, pending, sizeof (GstStepInfo));
1486   pending->valid = FALSE;
1487
1488   /* get the running time of where we paused and remember it */
1489   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1490
1491   /* set the new rate */
1492   segment->rate = segment->rate * current->rate;
1493
1494   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1495       GST_TIME_ARGS (current->start));
1496
1497   if (current->amount == -1) {
1498     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1499     current->valid = FALSE;
1500   } else {
1501     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1502         "rate: %f", current->amount, gst_format_get_name (current->format),
1503         current->rate);
1504   }
1505 }
1506
1507 static void
1508 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1509     GstStepInfo * current, guint64 cstart, guint64 cstop, gint64 * rstart,
1510     gint64 * rstop)
1511 {
1512   GstMessage *message;
1513
1514   GST_DEBUG_OBJECT (sink, "step complete");
1515
1516   /* update the segment, discarding what was consumed, running time goes
1517    * backwards with the duration of the data we skipped. FIXME, this only works
1518    * in PAUSED. */
1519   if (segment->rate > 0.0) {
1520     GST_DEBUG_OBJECT (sink,
1521         "step stop at running_time %" GST_TIME_FORMAT ", timestamp %"
1522         GST_TIME_FORMAT, GST_TIME_ARGS (*rstart), GST_TIME_ARGS (cstart));
1523     if (*rstart == -1)
1524       *rstart = current->start + current->position;
1525
1526     current->duration = *rstart - current->start;
1527
1528     segment->time =
1529         gst_segment_to_stream_time (segment, segment->format, cstart);
1530     segment->start = cstart;
1531
1532     *rstart = current->start;
1533     if (*rstop != -1)
1534       *rstop -= current->duration;
1535   } else {
1536     GST_DEBUG_OBJECT (sink,
1537         "step stop at running_time %" GST_TIME_FORMAT ", timestamp %"
1538         GST_TIME_FORMAT, GST_TIME_ARGS (*rstop), GST_TIME_ARGS (cstop));
1539     if (*rstop == -1)
1540       *rstop = current->start + current->position;
1541
1542     current->duration = *rstop - current->start;
1543
1544     segment->stop = cstop;
1545     *rstop = current->start;
1546     if (*rstart != -1)
1547       *rstart -= current->duration;
1548   }
1549   segment->accum = current->start;
1550
1551   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1552       GST_TIME_ARGS (current->duration));
1553
1554   /* the clip segment is used for position report in paused... */
1555   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1556
1557   message =
1558       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1559       current->amount, current->rate, current->flush, current->intermediate,
1560       current->duration);
1561   gst_message_set_seqnum (message, current->seqnum);
1562   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1563
1564   if (!current->intermediate)
1565     sink->need_preroll = current->need_preroll;
1566
1567   /* and the current step info finished and becomes invalid */
1568   current->valid = FALSE;
1569 }
1570
1571 static gboolean
1572 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1573     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1574     gint64 * rstop)
1575 {
1576   GstBaseSinkPrivate *priv;
1577   gboolean step_end = FALSE;
1578
1579   priv = sink->priv;
1580
1581   /* see if we need to skip this buffer because of stepping */
1582   switch (current->format) {
1583     case GST_FORMAT_TIME:
1584     {
1585       guint64 end;
1586       gint64 first, last;
1587
1588       if (segment->rate > 0.0) {
1589         first = *rstart;
1590         last = *rstop;
1591       } else {
1592         first = *rstop;
1593         last = *rstart;
1594       }
1595
1596       end = current->start + current->amount;
1597       current->position = first - current->start;
1598
1599       GST_DEBUG_OBJECT (sink,
1600           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1601           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1602       GST_DEBUG_OBJECT (sink,
1603           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1604           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1605           GST_TIME_ARGS (last - current->start),
1606           GST_TIME_ARGS (current->amount));
1607
1608       if (current->position >= current->amount || last >= end) {
1609         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1610         step_end = TRUE;
1611         if (segment->rate > 0.0) {
1612           *cstart += end - *rstart;
1613           *rstart = end;
1614         } else {
1615           *cstop += *rstop - end;
1616           *rstop = end;
1617         }
1618       }
1619       GST_DEBUG_OBJECT (sink,
1620           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1621           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1622       GST_DEBUG_OBJECT (sink,
1623           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1624           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1625       break;
1626     }
1627     case GST_FORMAT_BUFFERS:
1628       GST_DEBUG_OBJECT (sink,
1629           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1630           current->position, current->amount);
1631
1632       if (current->position < current->amount) {
1633         current->position++;
1634       } else {
1635         step_end = TRUE;
1636       }
1637       break;
1638     case GST_FORMAT_DEFAULT:
1639     default:
1640       GST_DEBUG_OBJECT (sink,
1641           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1642           current->position, current->amount);
1643       break;
1644   }
1645   return step_end;
1646 }
1647
1648 /* with STREAM_LOCK, PREROLL_LOCK
1649  *
1650  * Returns TRUE if the object needs synchronisation and takes therefore
1651  * part in prerolling.
1652  *
1653  * rsstart/rsstop contain the start/stop in stream time.
1654  * rrstart/rrstop contain the start/stop in running time.
1655  */
1656 static gboolean
1657 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1658     GstClockTime * rsstart, GstClockTime * rsstop,
1659     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1660     gboolean * stepped, GstSegment * segment, GstStepInfo * step)
1661 {
1662   GstBaseSinkClass *bclass;
1663   GstBuffer *buffer;
1664   GstClockTime start, stop;     /* raw start/stop timestamps */
1665   gint64 cstart, cstop;         /* clipped raw timestamps */
1666   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1667   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1668   GstFormat format;
1669   GstBaseSinkPrivate *priv;
1670   gboolean step_end;
1671
1672   priv = basesink->priv;
1673
1674   /* start with nothing */
1675   start = stop = -1;
1676
1677   step_end = FALSE;
1678
1679   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1680     GstEvent *event = GST_EVENT_CAST (obj);
1681
1682     switch (GST_EVENT_TYPE (event)) {
1683         /* EOS event needs syncing */
1684       case GST_EVENT_EOS:
1685       {
1686         if (basesink->segment.rate >= 0.0) {
1687           sstart = sstop = priv->current_sstop;
1688           if (sstart == -1) {
1689             /* we have not seen a buffer yet, use the segment values */
1690             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1691                 basesink->segment.format, basesink->segment.stop);
1692           }
1693         } else {
1694           sstart = sstop = priv->current_sstart;
1695           if (sstart == -1) {
1696             /* we have not seen a buffer yet, use the segment values */
1697             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1698                 basesink->segment.format, basesink->segment.start);
1699           }
1700         }
1701
1702         rstart = rstop = priv->eos_rtime;
1703         *do_sync = rstart != -1;
1704         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1705             GST_TIME_ARGS (rstart));
1706         /* if we are stepping, we end now */
1707         step_end = step->valid;
1708         goto eos_done;
1709       }
1710       default:
1711         /* other events do not need syncing */
1712         /* FIXME, maybe NEWSEGMENT might need synchronisation
1713          * since the POSITION query depends on accumulated times and
1714          * we cannot accumulate the current segment before the previous
1715          * one completed.
1716          */
1717         return FALSE;
1718     }
1719   }
1720
1721   /* else do buffer sync code */
1722   buffer = GST_BUFFER_CAST (obj);
1723
1724   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1725
1726   /* just get the times to see if we need syncing, if the start returns -1 we
1727    * don't sync. */
1728   if (bclass->get_times)
1729     bclass->get_times (basesink, buffer, &start, &stop);
1730
1731   if (start == -1) {
1732     /* we don't need to sync but we still want to get the timestamps for
1733      * tracking the position */
1734     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1735     *do_sync = FALSE;
1736   } else {
1737     *do_sync = TRUE;
1738   }
1739
1740   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1741       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1742       GST_TIME_ARGS (stop), *do_sync);
1743
1744   /* collect segment and format for code clarity */
1745   format = segment->format;
1746
1747   /* no timestamp clipping if we did not get a TIME segment format */
1748   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1749     cstart = start;
1750     cstop = stop;
1751     /* do running and stream time in TIME format */
1752     format = GST_FORMAT_TIME;
1753     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1754     goto do_times;
1755   }
1756
1757   /* clip, only when we know about time */
1758   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1759               (gint64) start, (gint64) stop, &cstart, &cstop)))
1760     goto out_of_segment;
1761
1762   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1763     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1764         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1765         GST_TIME_ARGS (cstop));
1766   }
1767
1768   /* set last stop position */
1769   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1770     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1771   else
1772     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1773
1774 do_times:
1775   rstart = gst_segment_to_running_time (segment, format, cstart);
1776   rstop = gst_segment_to_running_time (segment, format, cstop);
1777
1778   if (G_UNLIKELY (step->valid)) {
1779     if (!(step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1780                 &rstart, &rstop)))
1781       *stepped = TRUE;
1782   }
1783   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1784    * upstream is behaving very badly */
1785   sstart = gst_segment_to_stream_time (segment, format, cstart);
1786   sstop = gst_segment_to_stream_time (segment, format, cstop);
1787
1788 eos_done:
1789   /* done label only called when doing EOS, we also stop stepping then */
1790   if (step_end)
1791     stop_stepping (basesink, segment, step, cstart, cstop, &rstart, &rstop);
1792
1793   /* save times */
1794   *rsstart = sstart;
1795   *rsstop = sstop;
1796   *rrstart = rstart;
1797   *rrstop = rstop;
1798
1799   /* buffers and EOS always need syncing and preroll */
1800   return TRUE;
1801
1802   /* special cases */
1803 out_of_segment:
1804   {
1805     /* we usually clip in the chain function already but stepping could cause
1806      * the segment to be updated later. we return FALSE so that we don't try
1807      * to sync on it. */
1808     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1809     return FALSE;
1810   }
1811 }
1812
1813 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1814  * adjust a timestamp with the latency and timestamp offset */
1815 static GstClockTime
1816 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1817 {
1818   GstClockTimeDiff ts_offset;
1819
1820   /* don't do anything funny with invalid timestamps */
1821   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1822     return time;
1823
1824   time += basesink->priv->latency;
1825
1826   /* apply offset, be carefull for underflows */
1827   ts_offset = basesink->priv->ts_offset;
1828   if (ts_offset < 0) {
1829     ts_offset = -ts_offset;
1830     if (ts_offset < time)
1831       time -= ts_offset;
1832     else
1833       time = 0;
1834   } else
1835     time += ts_offset;
1836
1837   return time;
1838 }
1839
1840 /**
1841  * gst_base_sink_wait_clock:
1842  * @sink: the sink
1843  * @time: the running_time to be reached
1844  * @jitter: the jitter to be filled with time diff (can be NULL)
1845  *
1846  * This function will block until @time is reached. It is usually called by
1847  * subclasses that use their own internal synchronisation.
1848  *
1849  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1850  * returned. Likewise, if synchronisation is disabled in the element or there
1851  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1852  *
1853  * This function should only be called with the PREROLL_LOCK held, like when
1854  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1855  * the ::render vmethod.
1856  *
1857  * The @time argument should be the running_time of when this method should
1858  * return and is not adjusted with any latency or offset configured in the
1859  * sink.
1860  *
1861  * Since 0.10.20
1862  *
1863  * Returns: #GstClockReturn
1864  */
1865 GstClockReturn
1866 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1867     GstClockTimeDiff * jitter)
1868 {
1869   GstClockID id;
1870   GstClockReturn ret;
1871   GstClock *clock;
1872
1873   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1874     goto invalid_time;
1875
1876   GST_OBJECT_LOCK (sink);
1877   if (G_UNLIKELY (!sink->sync))
1878     goto no_sync;
1879
1880   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1881     goto no_clock;
1882
1883   /* add base_time to running_time to get the time against the clock */
1884   time += GST_ELEMENT_CAST (sink)->base_time;
1885
1886   id = gst_clock_new_single_shot_id (clock, time);
1887   GST_OBJECT_UNLOCK (sink);
1888
1889   /* A blocking wait is performed on the clock. We save the ClockID
1890    * so we can unlock the entry at any time. While we are blocking, we 
1891    * release the PREROLL_LOCK so that other threads can interrupt the
1892    * entry. */
1893   sink->clock_id = id;
1894   /* release the preroll lock while waiting */
1895   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1896
1897   ret = gst_clock_id_wait (id, jitter);
1898
1899   GST_PAD_PREROLL_LOCK (sink->sinkpad);
1900   gst_clock_id_unref (id);
1901   sink->clock_id = NULL;
1902
1903   return ret;
1904
1905   /* no syncing needed */
1906 invalid_time:
1907   {
1908     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
1909     return GST_CLOCK_BADTIME;
1910   }
1911 no_sync:
1912   {
1913     GST_DEBUG_OBJECT (sink, "sync disabled");
1914     GST_OBJECT_UNLOCK (sink);
1915     return GST_CLOCK_BADTIME;
1916   }
1917 no_clock:
1918   {
1919     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
1920     GST_OBJECT_UNLOCK (sink);
1921     return GST_CLOCK_BADTIME;
1922   }
1923 }
1924
1925 /**
1926  * gst_base_sink_wait_preroll:
1927  * @sink: the sink
1928  *
1929  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1930  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1931  * this method before continuing to render the remaining data.
1932  *
1933  * This function will block until a state change to PLAYING happens (in which
1934  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1935  * to a state change to READY or a FLUSH event (in which case this function
1936  * returns #GST_FLOW_WRONG_STATE).
1937  *
1938  * This function should only be called with the PREROLL_LOCK held, like in the
1939  * render function.
1940  *
1941  * Since: 0.10.11
1942  *
1943  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1944  * continue. Any other return value should be returned from the render vmethod.
1945  */
1946 GstFlowReturn
1947 gst_base_sink_wait_preroll (GstBaseSink * sink)
1948 {
1949   sink->have_preroll = TRUE;
1950   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1951   /* block until the state changes, or we get a flush, or something */
1952   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1953   sink->have_preroll = FALSE;
1954   if (G_UNLIKELY (sink->flushing))
1955     goto stopping;
1956   if (G_UNLIKELY (sink->priv->step_unlock))
1957     goto step_unlocked;
1958   GST_DEBUG_OBJECT (sink, "continue after preroll");
1959
1960   return GST_FLOW_OK;
1961
1962   /* ERRORS */
1963 stopping:
1964   {
1965     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
1966     return GST_FLOW_WRONG_STATE;
1967   }
1968 step_unlocked:
1969   {
1970     sink->priv->step_unlock = FALSE;
1971     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
1972     return GST_FLOW_STEP;
1973   }
1974 }
1975
1976 /**
1977  * gst_base_sink_do_preroll:
1978  * @sink: the sink
1979  * @obj: the object that caused the preroll
1980  *
1981  * If the @sink spawns its own thread for pulling buffers from upstream it
1982  * should call this method after it has pulled a buffer. If the element needed
1983  * to preroll, this function will perform the preroll and will then block
1984  * until the element state is changed.
1985  *
1986  * This function should be called with the PREROLL_LOCK held.
1987  *
1988  * Since 0.10.22
1989  *
1990  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1991  * continue. Any other return value should be returned from the render vmethod.
1992  */
1993 GstFlowReturn
1994 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
1995 {
1996   GstFlowReturn ret;
1997
1998   while (G_UNLIKELY (sink->need_preroll)) {
1999     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2000
2001     ret = gst_base_sink_preroll_object (sink, obj);
2002     if (ret != GST_FLOW_OK)
2003       goto preroll_failed;
2004
2005     /* need to recheck here because the commit state could have
2006      * made us not need the preroll anymore */
2007     if (G_LIKELY (sink->need_preroll)) {
2008       /* block until the state changes, or we get a flush, or something */
2009       ret = gst_base_sink_wait_preroll (sink);
2010       if (ret != GST_FLOW_OK) {
2011         if (ret == GST_FLOW_STEP)
2012           ret = GST_FLOW_OK;
2013         else
2014           goto preroll_failed;
2015       }
2016     }
2017   }
2018   return GST_FLOW_OK;
2019
2020   /* ERRORS */
2021 preroll_failed:
2022   {
2023     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2024     return ret;
2025   }
2026 }
2027
2028 /**
2029  * gst_base_sink_wait_eos:
2030  * @sink: the sink
2031  * @time: the running_time to be reached
2032  * @jitter: the jitter to be filled with time diff (can be NULL)
2033  *
2034  * This function will block until @time is reached. It is usually called by
2035  * subclasses that use their own internal synchronisation but want to let the
2036  * EOS be handled by the base class.
2037  *
2038  * This function should only be called with the PREROLL_LOCK held, like when
2039  * receiving an EOS event in the ::event vmethod.
2040  *
2041  * The @time argument should be the running_time of when the EOS should happen
2042  * and will be adjusted with any latency and offset configured in the sink.
2043  *
2044  * Since 0.10.15
2045  *
2046  * Returns: #GstFlowReturn
2047  */
2048 GstFlowReturn
2049 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2050     GstClockTimeDiff * jitter)
2051 {
2052   GstClockReturn status;
2053   GstFlowReturn ret;
2054
2055   do {
2056     GstClockTime stime;
2057
2058     GST_DEBUG_OBJECT (sink, "checking preroll");
2059
2060     /* first wait for the playing state before we can continue */
2061     if (G_UNLIKELY (sink->need_preroll)) {
2062       ret = gst_base_sink_wait_preroll (sink);
2063       if (ret != GST_FLOW_OK) {
2064         if (ret == GST_FLOW_STEP)
2065           ret = GST_FLOW_OK;
2066         else
2067           goto flushing;
2068       }
2069     }
2070
2071     /* preroll done, we can sync since we are in PLAYING now. */
2072     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2073         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2074
2075     /* compensate for latency and ts_offset. We don't adjust for render delay
2076      * because we don't interact with the device on EOS normally. */
2077     stime = gst_base_sink_adjust_time (sink, time);
2078
2079     /* wait for the clock, this can be interrupted because we got shut down or 
2080      * we PAUSED. */
2081     status = gst_base_sink_wait_clock (sink, stime, jitter);
2082
2083     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2084
2085     /* invalid time, no clock or sync disabled, just continue then */
2086     if (status == GST_CLOCK_BADTIME)
2087       break;
2088
2089     /* waiting could have been interrupted and we can be flushing now */
2090     if (G_UNLIKELY (sink->flushing))
2091       goto flushing;
2092
2093     /* retry if we got unscheduled, which means we did not reach the timeout
2094      * yet. if some other error occures, we continue. */
2095   } while (status == GST_CLOCK_UNSCHEDULED);
2096
2097   GST_DEBUG_OBJECT (sink, "end of stream");
2098
2099   return GST_FLOW_OK;
2100
2101   /* ERRORS */
2102 flushing:
2103   {
2104     GST_DEBUG_OBJECT (sink, "we are flushing");
2105     return GST_FLOW_WRONG_STATE;
2106   }
2107 }
2108
2109 /* with STREAM_LOCK, PREROLL_LOCK
2110  *
2111  * Make sure we are in PLAYING and synchronize an object to the clock.
2112  *
2113  * If we need preroll, we are not in PLAYING. We try to commit the state
2114  * if needed and then block if we still are not PLAYING.
2115  *
2116  * We start waiting on the clock in PLAYING. If we got interrupted, we
2117  * immediatly try to re-preroll.
2118  *
2119  * Some objects do not need synchronisation (most events) and so this function
2120  * immediatly returns GST_FLOW_OK.
2121  *
2122  * for objects that arrive later than max-lateness to be synchronized to the 
2123  * clock have the @late boolean set to TRUE.
2124  *
2125  * This function keeps a running average of the jitter (the diff between the
2126  * clock time and the requested sync time). The jitter is negative for
2127  * objects that arrive in time and positive for late buffers.
2128  *
2129  * does not take ownership of obj.
2130  */
2131 static GstFlowReturn
2132 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2133     GstMiniObject * obj, gboolean * late)
2134 {
2135   GstClockTimeDiff jitter;
2136   gboolean syncable;
2137   GstClockReturn status = GST_CLOCK_OK;
2138   GstClockTime rstart, rstop, sstart, sstop, stime;
2139   gboolean do_sync;
2140   GstBaseSinkPrivate *priv;
2141   GstFlowReturn ret;
2142   GstStepInfo *current, *pending;
2143   gboolean stepped;
2144
2145   priv = basesink->priv;
2146
2147 do_step:
2148   sstart = sstop = rstart = rstop = -1;
2149   do_sync = TRUE;
2150   stepped = FALSE;
2151
2152   priv->current_rstart = -1;
2153
2154   /* get stepping info */
2155   current = &priv->current_step;
2156   pending = &priv->pending_step;
2157
2158   /* get timing information for this object against the render segment */
2159   syncable = gst_base_sink_get_sync_times (basesink, obj,
2160       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2161       current);
2162
2163   if (G_UNLIKELY (stepped))
2164     goto step_skipped;
2165
2166   /* a syncable object needs to participate in preroll and
2167    * clocking. All buffers and EOS are syncable. */
2168   if (G_UNLIKELY (!syncable))
2169     goto not_syncable;
2170
2171   /* store timing info for current object */
2172   priv->current_rstart = rstart;
2173   priv->current_rstop = (rstop != -1 ? rstop : rstart);
2174
2175   /* save sync time for eos when the previous object needed sync */
2176   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
2177
2178 again:
2179   /* first do preroll, this makes sure we commit our state
2180    * to PAUSED and can continue to PLAYING. We cannot perform
2181    * any clock sync in PAUSED because there is no clock. */
2182   ret = gst_base_sink_do_preroll (basesink, obj);
2183   if (G_UNLIKELY (ret != GST_FLOW_OK))
2184     goto preroll_failed;
2185
2186   /* After rendering we store the position of the last buffer so that we can use
2187    * it to report the position. We need to take the lock here. */
2188   GST_OBJECT_LOCK (basesink);
2189   priv->current_sstart = sstart;
2190   priv->current_sstop = (sstop != -1 ? sstop : sstart);
2191   GST_OBJECT_UNLOCK (basesink);
2192
2193   /* update the segment with a pending step if the current one is invalid and we
2194    * have a new pending one. We only accept new step updates after a preroll */
2195   if (G_UNLIKELY (pending->valid && !current->valid)) {
2196     start_stepping (basesink, &basesink->segment, pending, current);
2197     goto do_step;
2198   }
2199
2200   if (!do_sync)
2201     goto done;
2202
2203   /* adjust for latency */
2204   stime = gst_base_sink_adjust_time (basesink, rstart);
2205
2206   /* adjust for render-delay, avoid underflows */
2207   if (stime != -1) {
2208     if (stime > priv->render_delay)
2209       stime -= priv->render_delay;
2210     else
2211       stime = 0;
2212   }
2213
2214   /* preroll done, we can sync since we are in PLAYING now. */
2215   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2216       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2217       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2218
2219   /* This function will return immediatly if start == -1, no clock
2220    * or sync is disabled with GST_CLOCK_BADTIME. */
2221   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2222
2223   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
2224
2225   /* invalid time, no clock or sync disabled, just render */
2226   if (status == GST_CLOCK_BADTIME)
2227     goto done;
2228
2229   /* waiting could have been interrupted and we can be flushing now */
2230   if (G_UNLIKELY (basesink->flushing))
2231     goto flushing;
2232
2233   /* check for unlocked by a state change, we are not flushing so
2234    * we can try to preroll on the current buffer. */
2235   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2236     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2237     priv->call_preroll = TRUE;
2238     goto again;
2239   }
2240
2241   /* successful syncing done, record observation */
2242   priv->current_jitter = jitter;
2243
2244   /* check if the object should be dropped */
2245   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2246       status, jitter);
2247
2248 done:
2249   return GST_FLOW_OK;
2250
2251   /* ERRORS */
2252 step_skipped:
2253   {
2254     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2255     *late = TRUE;
2256     return GST_FLOW_OK;
2257   }
2258 not_syncable:
2259   {
2260     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2261     return GST_FLOW_OK;
2262   }
2263 flushing:
2264   {
2265     GST_DEBUG_OBJECT (basesink, "we are flushing");
2266     return GST_FLOW_WRONG_STATE;
2267   }
2268 preroll_failed:
2269   {
2270     GST_DEBUG_OBJECT (basesink, "preroll failed");
2271     return ret;
2272   }
2273 }
2274
2275 static gboolean
2276 gst_base_sink_send_qos (GstBaseSink * basesink,
2277     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2278 {
2279   GstEvent *event;
2280   gboolean res;
2281
2282   /* generate Quality-of-Service event */
2283   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2284       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2285       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2286
2287   event = gst_event_new_qos (proportion, diff, time);
2288
2289   /* send upstream */
2290   res = gst_pad_push_event (basesink->sinkpad, event);
2291
2292   return res;
2293 }
2294
2295 static void
2296 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2297 {
2298   GstBaseSinkPrivate *priv;
2299   GstClockTime start, stop;
2300   GstClockTimeDiff jitter;
2301   GstClockTime pt, entered, left;
2302   GstClockTime duration;
2303   gdouble rate;
2304
2305   priv = sink->priv;
2306
2307   start = priv->current_rstart;
2308
2309   /* if Quality-of-Service disabled, do nothing */
2310   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2311     return;
2312
2313   stop = priv->current_rstop;
2314   jitter = priv->current_jitter;
2315
2316   if (jitter < 0) {
2317     /* this is the time the buffer entered the sink */
2318     if (start < -jitter)
2319       entered = 0;
2320     else
2321       entered = start + jitter;
2322     left = start;
2323   } else {
2324     /* this is the time the buffer entered the sink */
2325     entered = start + jitter;
2326     /* this is the time the buffer left the sink */
2327     left = start + jitter;
2328   }
2329
2330   /* calculate duration of the buffer */
2331   if (stop != -1)
2332     duration = stop - start;
2333   else
2334     duration = -1;
2335
2336   /* if we have the time when the last buffer left us, calculate
2337    * processing time */
2338   if (priv->last_left != -1) {
2339     if (entered > priv->last_left) {
2340       pt = entered - priv->last_left;
2341     } else {
2342       pt = 0;
2343     }
2344   } else {
2345     pt = priv->avg_pt;
2346   }
2347
2348   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2349       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2350       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2351       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2352       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2353       jitter);
2354
2355   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2356       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2357       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2358       priv->avg_rate);
2359
2360   /* collect running averages. for first observations, we copy the
2361    * values */
2362   if (priv->avg_duration == -1)
2363     priv->avg_duration = duration;
2364   else
2365     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2366
2367   if (priv->avg_pt == -1)
2368     priv->avg_pt = pt;
2369   else
2370     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2371
2372   if (priv->avg_duration != 0)
2373     rate =
2374         gst_guint64_to_gdouble (priv->avg_pt) /
2375         gst_guint64_to_gdouble (priv->avg_duration);
2376   else
2377     rate = 0.0;
2378
2379   if (priv->last_left != -1) {
2380     if (dropped || priv->avg_rate < 0.0) {
2381       priv->avg_rate = rate;
2382     } else {
2383       if (rate > 1.0)
2384         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2385       else
2386         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2387     }
2388   }
2389
2390   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2391       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2392       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2393       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2394
2395
2396   if (priv->avg_rate >= 0.0) {
2397     /* if we have a valid rate, start sending QoS messages */
2398     if (priv->current_jitter < 0) {
2399       /* make sure we never go below 0 when adding the jitter to the
2400        * timestamp. */
2401       if (priv->current_rstart < -priv->current_jitter)
2402         priv->current_jitter = -priv->current_rstart;
2403     }
2404     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2405         priv->current_jitter);
2406   }
2407
2408   /* record when this buffer will leave us */
2409   priv->last_left = left;
2410 }
2411
2412 /* reset all qos measuring */
2413 static void
2414 gst_base_sink_reset_qos (GstBaseSink * sink)
2415 {
2416   GstBaseSinkPrivate *priv;
2417
2418   priv = sink->priv;
2419
2420   priv->last_in_time = -1;
2421   priv->last_left = -1;
2422   priv->avg_duration = -1;
2423   priv->avg_pt = -1;
2424   priv->avg_rate = -1.0;
2425   priv->avg_render = -1;
2426   priv->rendered = 0;
2427   priv->dropped = 0;
2428
2429 }
2430
2431 /* Checks if the object was scheduled too late.
2432  *
2433  * start/stop contain the raw timestamp start and stop values
2434  * of the object.
2435  *
2436  * status and jitter contain the return values from the clock wait.
2437  *
2438  * returns TRUE if the buffer was too late.
2439  */
2440 static gboolean
2441 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2442     GstClockTime start, GstClockTime stop,
2443     GstClockReturn status, GstClockTimeDiff jitter)
2444 {
2445   gboolean late;
2446   gint64 max_lateness;
2447   GstBaseSinkPrivate *priv;
2448
2449   priv = basesink->priv;
2450
2451   late = FALSE;
2452
2453   /* only for objects that were too late */
2454   if (G_LIKELY (status != GST_CLOCK_EARLY))
2455     goto in_time;
2456
2457   max_lateness = basesink->abidata.ABI.max_lateness;
2458
2459   /* check if frame dropping is enabled */
2460   if (max_lateness == -1)
2461     goto no_drop;
2462
2463   /* only check for buffers */
2464   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2465     goto not_buffer;
2466
2467   /* can't do check if we don't have a timestamp */
2468   if (G_UNLIKELY (start == -1))
2469     goto no_timestamp;
2470
2471   /* we can add a valid stop time */
2472   if (stop != -1)
2473     max_lateness += stop;
2474   else
2475     max_lateness += start;
2476
2477   /* if the jitter bigger than duration and lateness we are too late */
2478   if ((late = start + jitter > max_lateness)) {
2479     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2480         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2481         GST_TIME_ARGS (max_lateness));
2482     /* !!emergency!!, if we did not receive anything valid for more than a 
2483      * second, render it anyway so the user sees something */
2484     if (priv->last_in_time != -1 && start - priv->last_in_time > GST_SECOND) {
2485       late = FALSE;
2486       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2487           (_("A lot of buffers are dropped.")),
2488           ("Maybe there is a timestamp problem or this computer is too slow."));
2489       GST_DEBUG_OBJECT (basesink,
2490           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2491           GST_TIME_ARGS (priv->last_in_time));
2492     }
2493   }
2494
2495 done:
2496   if (!late) {
2497     priv->last_in_time = start;
2498   }
2499   return late;
2500
2501   /* all is fine */
2502 in_time:
2503   {
2504     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2505     goto done;
2506   }
2507 no_drop:
2508   {
2509     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2510     goto done;
2511   }
2512 not_buffer:
2513   {
2514     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2515     return FALSE;
2516   }
2517 no_timestamp:
2518   {
2519     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2520     return FALSE;
2521   }
2522 }
2523
2524 /* called before and after calling the render vmethod. It keeps track of how
2525  * much time was spent in the render method and is used to check if we are
2526  * flooded */
2527 static void
2528 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2529 {
2530   GstBaseSinkPrivate *priv;
2531
2532   priv = basesink->priv;
2533
2534   if (start) {
2535     priv->start = gst_util_get_timestamp ();
2536   } else {
2537     GstClockTime elapsed;
2538
2539     priv->stop = gst_util_get_timestamp ();
2540
2541     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2542
2543     if (priv->avg_render == -1)
2544       priv->avg_render = elapsed;
2545     else
2546       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2547
2548     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2549         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2550   }
2551 }
2552
2553 /* with STREAM_LOCK, PREROLL_LOCK,
2554  *
2555  * Synchronize the object on the clock and then render it.
2556  *
2557  * takes ownership of obj.
2558  */
2559 static GstFlowReturn
2560 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2561     GstMiniObject * obj)
2562 {
2563   GstFlowReturn ret;
2564   GstBaseSinkClass *bclass;
2565   gboolean late;
2566
2567   GstBaseSinkPrivate *priv;
2568
2569   priv = basesink->priv;
2570
2571 again:
2572   late = FALSE;
2573   ret = GST_FLOW_OK;
2574
2575   /* synchronize this object, non syncable objects return OK
2576    * immediatly. */
2577   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2578   if (G_UNLIKELY (ret != GST_FLOW_OK))
2579     goto sync_failed;
2580
2581   /* and now render, event or buffer. */
2582   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2583     GstBuffer *buf;
2584
2585     /* drop late buffers unconditionally, let's hope it's unlikely */
2586     if (G_UNLIKELY (late))
2587       goto dropped;
2588
2589     buf = GST_BUFFER_CAST (obj);
2590
2591     gst_base_sink_set_last_buffer (basesink, buf);
2592
2593     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2594
2595     if (G_LIKELY (bclass->render)) {
2596       gint do_qos;
2597
2598       /* read once, to get same value before and after */
2599       do_qos = g_atomic_int_get (&priv->qos_enabled);
2600
2601       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2602
2603       /* record rendering time for QoS and stats */
2604       if (do_qos)
2605         gst_base_sink_do_render_stats (basesink, TRUE);
2606
2607       ret = bclass->render (basesink, buf);
2608
2609       if (do_qos)
2610         gst_base_sink_do_render_stats (basesink, FALSE);
2611
2612       if (ret == GST_FLOW_STEP)
2613         goto again;
2614
2615       priv->rendered++;
2616     }
2617   } else {
2618     GstEvent *event = GST_EVENT_CAST (obj);
2619     gboolean event_res = TRUE;
2620     GstEventType type;
2621
2622     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2623
2624     type = GST_EVENT_TYPE (event);
2625
2626     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2627         gst_event_type_get_name (type));
2628
2629     if (bclass->event)
2630       event_res = bclass->event (basesink, event);
2631
2632     /* when we get here we could be flushing again when the event handler calls
2633      * _wait_eos(). We have to ignore this object in that case. */
2634     if (G_UNLIKELY (basesink->flushing))
2635       goto flushing;
2636
2637     if (G_LIKELY (event_res)) {
2638       guint32 seqnum;
2639
2640       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2641       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2642
2643       switch (type) {
2644         case GST_EVENT_EOS:
2645         {
2646           GstMessage *message;
2647
2648           /* the EOS event is completely handled so we mark
2649            * ourselves as being in the EOS state. eos is also 
2650            * protected by the object lock so we can read it when 
2651            * answering the POSITION query. */
2652           GST_OBJECT_LOCK (basesink);
2653           basesink->eos = TRUE;
2654           GST_OBJECT_UNLOCK (basesink);
2655
2656           /* ok, now we can post the message */
2657           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2658
2659           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2660           gst_message_set_seqnum (message, seqnum);
2661           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2662           break;
2663         }
2664         case GST_EVENT_NEWSEGMENT:
2665           /* configure the segment */
2666           gst_base_sink_configure_segment (basesink, pad, event,
2667               &basesink->segment);
2668           break;
2669         default:
2670           break;
2671       }
2672     }
2673   }
2674
2675 done:
2676   gst_base_sink_perform_qos (basesink, late);
2677
2678   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2679   gst_mini_object_unref (obj);
2680
2681   return ret;
2682
2683   /* ERRORS */
2684 sync_failed:
2685   {
2686     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2687     goto done;
2688   }
2689 dropped:
2690   {
2691     priv->dropped++;
2692     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2693     goto done;
2694   }
2695 flushing:
2696   {
2697     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2698     gst_mini_object_unref (obj);
2699     return GST_FLOW_WRONG_STATE;
2700   }
2701 }
2702
2703 /* with STREAM_LOCK, PREROLL_LOCK
2704  *
2705  * Perform preroll on the given object. For buffers this means 
2706  * calling the preroll subclass method. 
2707  * If that succeeds, the state will be commited.
2708  *
2709  * function does not take ownership of obj.
2710  */
2711 static GstFlowReturn
2712 gst_base_sink_preroll_object (GstBaseSink * basesink, GstMiniObject * obj)
2713 {
2714   GstFlowReturn ret;
2715
2716   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2717
2718   /* if it's a buffer, we need to call the preroll method */
2719   if (G_LIKELY (GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2720     GstBaseSinkClass *bclass;
2721     GstBuffer *buf;
2722     GstClockTime timestamp;
2723
2724     buf = GST_BUFFER_CAST (obj);
2725     timestamp = GST_BUFFER_TIMESTAMP (buf);
2726
2727     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2728         GST_TIME_ARGS (timestamp));
2729
2730     gst_base_sink_set_last_buffer (basesink, buf);
2731
2732     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2733     if (bclass->preroll)
2734       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2735         goto preroll_failed;
2736
2737     basesink->priv->call_preroll = FALSE;
2738   }
2739
2740   /* commit state */
2741   if (G_LIKELY (basesink->playing_async)) {
2742     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2743       goto stopping;
2744   }
2745
2746   return GST_FLOW_OK;
2747
2748   /* ERRORS */
2749 preroll_failed:
2750   {
2751     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2752     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2753     return ret;
2754   }
2755 stopping:
2756   {
2757     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2758     return GST_FLOW_WRONG_STATE;
2759   }
2760 }
2761
2762 /* with STREAM_LOCK, PREROLL_LOCK 
2763  *
2764  * Queue an object for rendering.
2765  * The first prerollable object queued will complete the preroll. If the
2766  * preroll queue if filled, we render all the objects in the queue.
2767  *
2768  * This function takes ownership of the object.
2769  */
2770 static GstFlowReturn
2771 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2772     GstMiniObject * obj, gboolean prerollable)
2773 {
2774   GstFlowReturn ret = GST_FLOW_OK;
2775   gint length;
2776   GQueue *q;
2777
2778   if (G_UNLIKELY (basesink->need_preroll)) {
2779     if (G_LIKELY (prerollable))
2780       basesink->preroll_queued++;
2781
2782     length = basesink->preroll_queued;
2783
2784     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2785
2786     /* first prerollable item needs to finish the preroll */
2787     if (length == 1) {
2788       ret = gst_base_sink_preroll_object (basesink, obj);
2789       if (G_UNLIKELY (ret != GST_FLOW_OK))
2790         goto preroll_failed;
2791     }
2792     /* need to recheck if we need preroll, commmit state during preroll 
2793      * could have made us not need more preroll. */
2794     if (G_UNLIKELY (basesink->need_preroll)) {
2795       /* see if we can render now, if we can't add the object to the preroll
2796        * queue. */
2797       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2798         goto more_preroll;
2799     }
2800   }
2801
2802   /* we can start rendering (or blocking) the queued object
2803    * if any. */
2804   q = basesink->preroll_queue;
2805   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2806     GstMiniObject *o;
2807
2808     o = g_queue_pop_head (q);
2809     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2810
2811     /* do something with the return value */
2812     ret = gst_base_sink_render_object (basesink, pad, o);
2813     if (ret != GST_FLOW_OK)
2814       goto dequeue_failed;
2815   }
2816
2817   /* now render the object */
2818   ret = gst_base_sink_render_object (basesink, pad, obj);
2819   basesink->preroll_queued = 0;
2820
2821   return ret;
2822
2823   /* special cases */
2824 preroll_failed:
2825   {
2826     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2827         gst_flow_get_name (ret));
2828     gst_mini_object_unref (obj);
2829     return ret;
2830   }
2831 more_preroll:
2832   {
2833     /* add object to the queue and return */
2834     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2835         length, basesink->preroll_queue_max_len);
2836     g_queue_push_tail (basesink->preroll_queue, obj);
2837     return GST_FLOW_OK;
2838   }
2839 dequeue_failed:
2840   {
2841     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2842         gst_flow_get_name (ret));
2843     gst_mini_object_unref (obj);
2844     return ret;
2845   }
2846 }
2847
2848 /* with STREAM_LOCK
2849  *
2850  * This function grabs the PREROLL_LOCK and adds the object to
2851  * the queue.
2852  *
2853  * This function takes ownership of obj.
2854  */
2855 static GstFlowReturn
2856 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2857     GstMiniObject * obj, gboolean prerollable)
2858 {
2859   GstFlowReturn ret;
2860
2861   GST_PAD_PREROLL_LOCK (pad);
2862   if (G_UNLIKELY (basesink->flushing))
2863     goto flushing;
2864
2865   if (G_UNLIKELY (basesink->priv->received_eos))
2866     goto was_eos;
2867
2868   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2869   GST_PAD_PREROLL_UNLOCK (pad);
2870
2871   return ret;
2872
2873   /* ERRORS */
2874 flushing:
2875   {
2876     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2877     GST_PAD_PREROLL_UNLOCK (pad);
2878     gst_mini_object_unref (obj);
2879     return GST_FLOW_WRONG_STATE;
2880   }
2881 was_eos:
2882   {
2883     GST_DEBUG_OBJECT (basesink,
2884         "we are EOS, dropping object, return UNEXPECTED");
2885     GST_PAD_PREROLL_UNLOCK (pad);
2886     gst_mini_object_unref (obj);
2887     return GST_FLOW_UNEXPECTED;
2888   }
2889 }
2890
2891 static void
2892 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2893 {
2894   /* make sure we are not blocked on the clock also clear any pending
2895    * eos state. */
2896   gst_base_sink_set_flushing (basesink, pad, TRUE);
2897
2898   /* we grab the stream lock but that is not needed since setting the
2899    * sink to flushing would make sure no state commit is being done
2900    * anymore */
2901   GST_PAD_STREAM_LOCK (pad);
2902   gst_base_sink_reset_qos (basesink);
2903   if (basesink->priv->async_enabled) {
2904     /* and we need to commit our state again on the next
2905      * prerolled buffer */
2906     basesink->playing_async = TRUE;
2907     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2908   } else {
2909     basesink->priv->have_latency = TRUE;
2910     basesink->need_preroll = FALSE;
2911   }
2912   gst_base_sink_set_last_buffer (basesink, NULL);
2913   GST_PAD_STREAM_UNLOCK (pad);
2914 }
2915
2916 static void
2917 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2918 {
2919   /* unset flushing so we can accept new data, this also flushes out any EOS
2920    * event. */
2921   gst_base_sink_set_flushing (basesink, pad, FALSE);
2922
2923   /* for position reporting */
2924   GST_OBJECT_LOCK (basesink);
2925   basesink->priv->current_sstart = -1;
2926   basesink->priv->current_sstop = -1;
2927   basesink->priv->eos_rtime = -1;
2928   basesink->priv->call_preroll = TRUE;
2929   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2930     /* we need new segment info after the flush. */
2931     basesink->have_newsegment = FALSE;
2932     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2933     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2934   }
2935   GST_OBJECT_UNLOCK (basesink);
2936 }
2937
2938 static gboolean
2939 gst_base_sink_event (GstPad * pad, GstEvent * event)
2940 {
2941   GstBaseSink *basesink;
2942   gboolean result = TRUE;
2943   GstBaseSinkClass *bclass;
2944
2945   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2946
2947   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2948
2949   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2950       GST_EVENT_TYPE_NAME (event));
2951
2952   switch (GST_EVENT_TYPE (event)) {
2953     case GST_EVENT_EOS:
2954     {
2955       GstFlowReturn ret;
2956
2957       GST_PAD_PREROLL_LOCK (pad);
2958       if (G_UNLIKELY (basesink->flushing))
2959         goto flushing;
2960
2961       if (G_UNLIKELY (basesink->priv->received_eos)) {
2962         /* we can't accept anything when we are EOS */
2963         result = FALSE;
2964         gst_event_unref (event);
2965       } else {
2966         /* we set the received EOS flag here so that we can use it when testing if
2967          * we are prerolled and to refuse more buffers. */
2968         basesink->priv->received_eos = TRUE;
2969
2970         /* EOS is a prerollable object, we call the unlocked version because it
2971          * does not check the received_eos flag. */
2972         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2973             GST_MINI_OBJECT_CAST (event), TRUE);
2974         if (G_UNLIKELY (ret != GST_FLOW_OK))
2975           result = FALSE;
2976       }
2977       GST_PAD_PREROLL_UNLOCK (pad);
2978       break;
2979     }
2980     case GST_EVENT_NEWSEGMENT:
2981     {
2982       GstFlowReturn ret;
2983
2984       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2985
2986       GST_PAD_PREROLL_LOCK (pad);
2987       if (G_UNLIKELY (basesink->flushing))
2988         goto flushing;
2989
2990       if (G_UNLIKELY (basesink->priv->received_eos)) {
2991         /* we can't accept anything when we are EOS */
2992         result = FALSE;
2993         gst_event_unref (event);
2994       } else {
2995         /* the new segment is a non prerollable item and does not block anything,
2996          * we need to configure the current clipping segment and insert the event 
2997          * in the queue to serialize it with the buffers for rendering. */
2998         gst_base_sink_configure_segment (basesink, pad, event,
2999             basesink->abidata.ABI.clip_segment);
3000
3001         ret =
3002             gst_base_sink_queue_object_unlocked (basesink, pad,
3003             GST_MINI_OBJECT_CAST (event), FALSE);
3004         if (G_UNLIKELY (ret != GST_FLOW_OK))
3005           result = FALSE;
3006         else {
3007           GST_OBJECT_LOCK (basesink);
3008           basesink->have_newsegment = TRUE;
3009           GST_OBJECT_UNLOCK (basesink);
3010         }
3011       }
3012       GST_PAD_PREROLL_UNLOCK (pad);
3013       break;
3014     }
3015     case GST_EVENT_FLUSH_START:
3016       if (bclass->event)
3017         bclass->event (basesink, event);
3018
3019       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3020
3021       gst_base_sink_flush_start (basesink, pad);
3022
3023       gst_event_unref (event);
3024       break;
3025     case GST_EVENT_FLUSH_STOP:
3026       if (bclass->event)
3027         bclass->event (basesink, event);
3028
3029       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3030
3031       gst_base_sink_flush_stop (basesink, pad);
3032
3033       gst_event_unref (event);
3034       break;
3035     default:
3036       /* other events are sent to queue or subclass depending on if they
3037        * are serialized. */
3038       if (GST_EVENT_IS_SERIALIZED (event)) {
3039         gst_base_sink_queue_object (basesink, pad,
3040             GST_MINI_OBJECT_CAST (event), FALSE);
3041       } else {
3042         if (bclass->event)
3043           bclass->event (basesink, event);
3044         gst_event_unref (event);
3045       }
3046       break;
3047   }
3048 done:
3049   gst_object_unref (basesink);
3050
3051   return result;
3052
3053   /* ERRORS */
3054 flushing:
3055   {
3056     GST_DEBUG_OBJECT (basesink, "we are flushing");
3057     GST_PAD_PREROLL_UNLOCK (pad);
3058     result = FALSE;
3059     gst_event_unref (event);
3060     goto done;
3061   }
3062 }
3063
3064 /* default implementation to calculate the start and end
3065  * timestamps on a buffer, subclasses can override
3066  */
3067 static void
3068 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3069     GstClockTime * start, GstClockTime * end)
3070 {
3071   GstClockTime timestamp, duration;
3072
3073   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3074   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3075
3076     /* get duration to calculate end time */
3077     duration = GST_BUFFER_DURATION (buffer);
3078     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3079       *end = timestamp + duration;
3080     }
3081     *start = timestamp;
3082   }
3083 }
3084
3085 /* must be called with PREROLL_LOCK */
3086 static gboolean
3087 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3088 {
3089   gboolean is_prerolled, res;
3090
3091   /* we have 2 cases where the PREROLL_LOCK is released:
3092    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3093    *  2) we are syncing on the clock
3094    */
3095   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3096   res = !is_prerolled;
3097
3098   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3099       basesink->have_preroll, basesink->priv->received_eos, res);
3100
3101   return res;
3102 }
3103
3104 /* with STREAM_LOCK, PREROLL_LOCK 
3105  *
3106  * Takes a buffer and compare the timestamps with the last segment.
3107  * If the buffer falls outside of the segment boundaries, drop it.
3108  * Else queue the buffer for preroll and rendering.
3109  *
3110  * This function takes ownership of the buffer.
3111  */
3112 static GstFlowReturn
3113 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3114     GstBuffer * buf)
3115 {
3116   GstBaseSinkClass *bclass;
3117   GstFlowReturn result;
3118   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3119   GstSegment *clip_segment;
3120
3121   if (G_UNLIKELY (basesink->flushing))
3122     goto flushing;
3123
3124   if (G_UNLIKELY (basesink->priv->received_eos))
3125     goto was_eos;
3126
3127   /* for code clarity */
3128   clip_segment = basesink->abidata.ABI.clip_segment;
3129
3130   if (G_UNLIKELY (!basesink->have_newsegment)) {
3131     gboolean sync;
3132
3133     sync = gst_base_sink_get_sync (basesink);
3134     if (sync) {
3135       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3136           (_("Internal data flow problem.")),
3137           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3138     }
3139
3140     /* this means this sink will assume timestamps start from 0 */
3141     GST_OBJECT_LOCK (basesink);
3142     clip_segment->start = 0;
3143     clip_segment->stop = -1;
3144     basesink->segment.start = 0;
3145     basesink->segment.stop = -1;
3146     basesink->have_newsegment = TRUE;
3147     GST_OBJECT_UNLOCK (basesink);
3148   }
3149
3150   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3151
3152   /* check if the buffer needs to be dropped, we first ask the subclass for the
3153    * start and end */
3154   if (bclass->get_times)
3155     bclass->get_times (basesink, buf, &start, &end);
3156
3157   if (start == -1) {
3158     /* if the subclass does not want sync, we use our own values so that we at
3159      * least clip the buffer to the segment */
3160     gst_base_sink_get_times (basesink, buf, &start, &end);
3161   }
3162
3163   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3164       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3165
3166   /* a dropped buffer does not participate in anything */
3167   if (GST_CLOCK_TIME_IS_VALID (start) &&
3168       (clip_segment->format == GST_FORMAT_TIME)) {
3169     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3170                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3171       goto out_of_segment;
3172   }
3173
3174   /* now we can process the buffer in the queue, this function takes ownership
3175    * of the buffer */
3176   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3177       GST_MINI_OBJECT_CAST (buf), TRUE);
3178
3179   return result;
3180
3181   /* ERRORS */
3182 flushing:
3183   {
3184     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3185     gst_buffer_unref (buf);
3186     return GST_FLOW_WRONG_STATE;
3187   }
3188 was_eos:
3189   {
3190     GST_DEBUG_OBJECT (basesink,
3191         "we are EOS, dropping object, return UNEXPECTED");
3192     gst_buffer_unref (buf);
3193     return GST_FLOW_UNEXPECTED;
3194   }
3195 out_of_segment:
3196   {
3197     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3198     gst_buffer_unref (buf);
3199     return GST_FLOW_OK;
3200   }
3201 }
3202
3203 /* with STREAM_LOCK
3204  */
3205 static GstFlowReturn
3206 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3207 {
3208   GstBaseSink *basesink;
3209   GstFlowReturn result;
3210
3211   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3212
3213   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3214     goto wrong_mode;
3215
3216   GST_PAD_PREROLL_LOCK (pad);
3217   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3218   GST_PAD_PREROLL_UNLOCK (pad);
3219
3220 done:
3221   return result;
3222
3223   /* ERRORS */
3224 wrong_mode:
3225   {
3226     GST_OBJECT_LOCK (pad);
3227     GST_WARNING_OBJECT (basesink,
3228         "Push on pad %s:%s, but it was not activated in push mode",
3229         GST_DEBUG_PAD_NAME (pad));
3230     GST_OBJECT_UNLOCK (pad);
3231     gst_buffer_unref (buf);
3232     /* we don't post an error message this will signal to the peer
3233      * pushing that EOS is reached. */
3234     result = GST_FLOW_UNEXPECTED;
3235     goto done;
3236   }
3237 }
3238
3239 static gboolean
3240 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3241 {
3242   gboolean res = TRUE;
3243
3244   /* update our offset if the start/stop position was updated */
3245   if (segment->format == GST_FORMAT_BYTES) {
3246     segment->time = segment->start;
3247   } else if (segment->start == 0) {
3248     /* seek to start, we can implement a default for this. */
3249     segment->time = 0;
3250   } else {
3251     res = FALSE;
3252     GST_INFO_OBJECT (sink, "Can't do a default seek");
3253   }
3254
3255   return res;
3256 }
3257
3258 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3259
3260 static gboolean
3261 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3262     GstEvent * event, GstSegment * segment)
3263 {
3264   /* By default, we try one of 2 things:
3265    *   - For absolute seek positions, convert the requested position to our 
3266    *     configured processing format and place it in the output segment \
3267    *   - For relative seek positions, convert our current (input) values to the
3268    *     seek format, adjust by the relative seek offset and then convert back to
3269    *     the processing format
3270    */
3271   GstSeekType cur_type, stop_type;
3272   gint64 cur, stop;
3273   GstSeekFlags flags;
3274   GstFormat seek_format, dest_format;
3275   gdouble rate;
3276   gboolean update;
3277   gboolean res = TRUE;
3278
3279   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3280       &cur_type, &cur, &stop_type, &stop);
3281   dest_format = segment->format;
3282
3283   if (seek_format == dest_format) {
3284     gst_segment_set_seek (segment, rate, seek_format, flags,
3285         cur_type, cur, stop_type, stop, &update);
3286     return TRUE;
3287   }
3288
3289   if (cur_type != GST_SEEK_TYPE_NONE) {
3290     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3291     res =
3292         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3293         &cur);
3294     cur_type = GST_SEEK_TYPE_SET;
3295   }
3296
3297   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3298     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3299     res =
3300         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3301         &stop);
3302     stop_type = GST_SEEK_TYPE_SET;
3303   }
3304
3305   /* And finally, configure our output segment in the desired format */
3306   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3307       stop_type, stop, &update);
3308
3309   if (!res)
3310     goto no_format;
3311
3312   return res;
3313
3314 no_format:
3315   {
3316     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3317     return FALSE;
3318   }
3319 }
3320
3321 /* perform a seek, only executed in pull mode */
3322 static gboolean
3323 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3324 {
3325   gboolean flush;
3326   gdouble rate;
3327   GstFormat seek_format, dest_format;
3328   GstSeekFlags flags;
3329   GstSeekType cur_type, stop_type;
3330   gboolean seekseg_configured = FALSE;
3331   gint64 cur, stop;
3332   gboolean update, res = TRUE;
3333   GstSegment seeksegment;
3334
3335   dest_format = sink->segment.format;
3336
3337   if (event) {
3338     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3339     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3340         &cur_type, &cur, &stop_type, &stop);
3341
3342     flush = flags & GST_SEEK_FLAG_FLUSH;
3343   } else {
3344     GST_DEBUG_OBJECT (sink, "performing seek without event");
3345     flush = FALSE;
3346   }
3347
3348   if (flush) {
3349     GST_DEBUG_OBJECT (sink, "flushing upstream");
3350     gst_pad_push_event (pad, gst_event_new_flush_start ());
3351     gst_base_sink_flush_start (sink, pad);
3352   } else {
3353     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3354   }
3355
3356   GST_PAD_STREAM_LOCK (pad);
3357
3358   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3359    * copy the current segment info into the temp segment that we can actually
3360    * attempt the seek with. We only update the real segment if the seek suceeds. */
3361   if (!seekseg_configured) {
3362     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3363
3364     /* now configure the final seek segment */
3365     if (event) {
3366       if (sink->segment.format != seek_format) {
3367         /* OK, here's where we give the subclass a chance to convert the relative
3368          * seek into an absolute one in the processing format. We set up any
3369          * absolute seek above, before taking the stream lock. */
3370         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3371                 &seeksegment)) {
3372           GST_DEBUG_OBJECT (sink,
3373               "Preparing the seek failed after flushing. " "Aborting seek");
3374           res = FALSE;
3375         }
3376       } else {
3377         /* The seek format matches our processing format, no need to ask the
3378          * the subclass to configure the segment. */
3379         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3380             cur_type, cur, stop_type, stop, &update);
3381       }
3382     }
3383     /* Else, no seek event passed, so we're just (re)starting the 
3384        current segment. */
3385   }
3386
3387   if (res) {
3388     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3389         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3390         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3391
3392     /* do the seek, segment.last_stop contains the new position. */
3393     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3394   }
3395
3396
3397   if (flush) {
3398     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3399     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3400     gst_base_sink_flush_stop (sink, pad);
3401   } else if (res && sink->abidata.ABI.running) {
3402     /* we are running the current segment and doing a non-flushing seek, 
3403      * close the segment first based on the last_stop. */
3404     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3405         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3406   }
3407
3408   /* The subclass must have converted the segment to the processing format 
3409    * by now */
3410   if (res && seeksegment.format != dest_format) {
3411     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3412         "in the correct format. Aborting seek.");
3413     res = FALSE;
3414   }
3415
3416   /* if successfull seek, we update our real segment and push
3417    * out the new segment. */
3418   if (res) {
3419     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3420
3421     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3422       gst_element_post_message (GST_ELEMENT (sink),
3423           gst_message_new_segment_start (GST_OBJECT (sink),
3424               sink->segment.format, sink->segment.last_stop));
3425     }
3426   }
3427
3428   sink->priv->discont = TRUE;
3429   sink->abidata.ABI.running = TRUE;
3430
3431   GST_PAD_STREAM_UNLOCK (pad);
3432
3433   return res;
3434 }
3435
3436 static gboolean
3437 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3438 {
3439   GstBaseSinkPrivate *priv;
3440   GstBaseSinkClass *bclass;
3441   gboolean flush, intermediate;
3442   gdouble rate;
3443   GstFormat format;
3444   guint64 amount;
3445   guint seqnum;
3446   GstStepInfo *pending;
3447
3448   bclass = GST_BASE_SINK_GET_CLASS (sink);
3449   priv = sink->priv;
3450
3451   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3452
3453   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3454   seqnum = gst_event_get_seqnum (event);
3455
3456   pending = &priv->pending_step;
3457
3458   if (flush) {
3459     /* we need to call ::unlock before locking PREROLL_LOCK
3460      * since we lock it before going into ::render */
3461     if (bclass->unlock)
3462       bclass->unlock (sink);
3463
3464     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3465     /* now that we have the PREROLL lock, clear our unlock request */
3466     if (bclass->unlock_stop)
3467       bclass->unlock_stop (sink);
3468
3469     /* update the stepinfo and make it valid */
3470     pending->seqnum = seqnum;
3471     pending->format = format;
3472     pending->amount = amount;
3473     pending->position = 0;
3474     pending->rate = rate;
3475     pending->flush = flush;
3476     pending->intermediate = intermediate;
3477     pending->valid = TRUE;
3478
3479     if (sink->priv->async_enabled) {
3480       /* and we need to commit our state again on the next
3481        * prerolled buffer */
3482       sink->playing_async = TRUE;
3483       priv->pending_step.need_preroll = TRUE;
3484       sink->need_preroll = FALSE;
3485       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3486     } else {
3487       sink->priv->have_latency = TRUE;
3488       sink->need_preroll = FALSE;
3489     }
3490     priv->current_sstart = -1;
3491     priv->current_sstop = -1;
3492     priv->eos_rtime = -1;
3493     priv->call_preroll = TRUE;
3494     gst_base_sink_set_last_buffer (sink, NULL);
3495     gst_base_sink_reset_qos (sink);
3496
3497     if (sink->clock_id) {
3498       gst_clock_id_unschedule (sink->clock_id);
3499     }
3500
3501     if (sink->have_preroll) {
3502       GST_DEBUG_OBJECT (sink, "signal waiter");
3503       priv->step_unlock = TRUE;
3504       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3505     }
3506     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3507   }
3508
3509   return TRUE;
3510 }
3511
3512 /* with STREAM_LOCK
3513  */
3514 static void
3515 gst_base_sink_loop (GstPad * pad)
3516 {
3517   GstBaseSink *basesink;
3518   GstBuffer *buf = NULL;
3519   GstFlowReturn result;
3520   guint blocksize;
3521   guint64 offset;
3522
3523   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3524
3525   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3526
3527   if ((blocksize = basesink->priv->blocksize) == 0)
3528     blocksize = -1;
3529
3530   offset = basesink->segment.last_stop;
3531
3532   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3533       offset, blocksize);
3534
3535   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3536   if (G_UNLIKELY (result != GST_FLOW_OK))
3537     goto paused;
3538
3539   if (G_UNLIKELY (buf == NULL))
3540     goto no_buffer;
3541
3542   offset += GST_BUFFER_SIZE (buf);
3543
3544   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3545
3546   GST_PAD_PREROLL_LOCK (pad);
3547   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3548   GST_PAD_PREROLL_UNLOCK (pad);
3549   if (G_UNLIKELY (result != GST_FLOW_OK))
3550     goto paused;
3551
3552   return;
3553
3554   /* ERRORS */
3555 paused:
3556   {
3557     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3558         gst_flow_get_name (result));
3559     gst_pad_pause_task (pad);
3560     /* fatal errors and NOT_LINKED cause EOS */
3561     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3562       if (result == GST_FLOW_UNEXPECTED) {
3563         /* perform EOS logic */
3564         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3565           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3566               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3567                   basesink->segment.format, basesink->segment.last_stop));
3568         } else {
3569           gst_base_sink_event (pad, gst_event_new_eos ());
3570         }
3571       } else {
3572         /* for fatal errors we post an error message, post the error
3573          * first so the app knows about the error first. */
3574         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3575             (_("Internal data stream error.")),
3576             ("stream stopped, reason %s", gst_flow_get_name (result)));
3577         gst_base_sink_event (pad, gst_event_new_eos ());
3578       }
3579     }
3580     return;
3581   }
3582 no_buffer:
3583   {
3584     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3585     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3586         (_("Internal data flow error.")), ("element returned NULL buffer"));
3587     result = GST_FLOW_ERROR;
3588     goto paused;
3589   }
3590 }
3591
3592 static gboolean
3593 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3594     gboolean flushing)
3595 {
3596   GstBaseSinkClass *bclass;
3597
3598   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3599
3600   if (flushing) {
3601     /* unlock any subclasses, we need to do this before grabbing the
3602      * PREROLL_LOCK since we hold this lock before going into ::render. */
3603     if (bclass->unlock)
3604       bclass->unlock (basesink);
3605   }
3606
3607   GST_PAD_PREROLL_LOCK (pad);
3608   basesink->flushing = flushing;
3609   if (flushing) {
3610     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3611     if (bclass->unlock_stop)
3612       bclass->unlock_stop (basesink);
3613
3614     /* set need_preroll before we unblock the clock. If the clock is unblocked
3615      * before timing out, we can reuse the buffer for preroll. */
3616     basesink->need_preroll = TRUE;
3617
3618     /* step 2, unblock clock sync (if any) or any other blocking thing */
3619     if (basesink->clock_id) {
3620       gst_clock_id_unschedule (basesink->clock_id);
3621     }
3622
3623     /* flush out the data thread if it's locked in finish_preroll, this will
3624      * also flush out the EOS state */
3625     GST_DEBUG_OBJECT (basesink,
3626         "flushing out data thread, need preroll to TRUE");
3627     gst_base_sink_preroll_queue_flush (basesink, pad);
3628   }
3629   GST_PAD_PREROLL_UNLOCK (pad);
3630
3631   return TRUE;
3632 }
3633
3634 static gboolean
3635 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3636 {
3637   gboolean result;
3638
3639   if (active) {
3640     /* start task */
3641     result = gst_pad_start_task (basesink->sinkpad,
3642         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3643   } else {
3644     /* step 2, make sure streaming finishes */
3645     result = gst_pad_stop_task (basesink->sinkpad);
3646   }
3647
3648   return result;
3649 }
3650
3651 static gboolean
3652 gst_base_sink_pad_activate (GstPad * pad)
3653 {
3654   gboolean result = FALSE;
3655   GstBaseSink *basesink;
3656
3657   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3658
3659   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3660
3661   gst_base_sink_set_flushing (basesink, pad, FALSE);
3662
3663   /* we need to have the pull mode enabled */
3664   if (!basesink->can_activate_pull) {
3665     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3666     goto fallback;
3667   }
3668
3669   /* check if downstreams supports pull mode at all */
3670   if (!gst_pad_check_pull_range (pad)) {
3671     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3672     goto fallback;
3673   }
3674
3675   /* set the pad mode before starting the task so that it's in the
3676    * correct state for the new thread. also the sink set_caps and get_caps
3677    * function checks this */
3678   basesink->pad_mode = GST_ACTIVATE_PULL;
3679
3680   /* we first try to negotiate a format so that when we try to activate
3681    * downstream, it knows about our format */
3682   if (!gst_base_sink_negotiate_pull (basesink)) {
3683     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3684     goto fallback;
3685   }
3686
3687   /* ok activate now */
3688   if (!gst_pad_activate_pull (pad, TRUE)) {
3689     /* clear any pending caps */
3690     GST_OBJECT_LOCK (basesink);
3691     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3692     GST_OBJECT_UNLOCK (basesink);
3693     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3694     goto fallback;
3695   }
3696
3697   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3698   result = TRUE;
3699   goto done;
3700
3701   /* push mode fallback */
3702 fallback:
3703   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3704   if ((result = gst_pad_activate_push (pad, TRUE))) {
3705     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3706   }
3707
3708 done:
3709   if (!result) {
3710     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3711     gst_base_sink_set_flushing (basesink, pad, TRUE);
3712   }
3713
3714   gst_object_unref (basesink);
3715
3716   return result;
3717 }
3718
3719 static gboolean
3720 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3721 {
3722   gboolean result;
3723   GstBaseSink *basesink;
3724
3725   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3726
3727   if (active) {
3728     if (!basesink->can_activate_push) {
3729       result = FALSE;
3730       basesink->pad_mode = GST_ACTIVATE_NONE;
3731     } else {
3732       result = TRUE;
3733       basesink->pad_mode = GST_ACTIVATE_PUSH;
3734     }
3735   } else {
3736     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3737       g_warning ("Internal GStreamer activation error!!!");
3738       result = FALSE;
3739     } else {
3740       gst_base_sink_set_flushing (basesink, pad, TRUE);
3741       result = TRUE;
3742       basesink->pad_mode = GST_ACTIVATE_NONE;
3743     }
3744   }
3745
3746   gst_object_unref (basesink);
3747
3748   return result;
3749 }
3750
3751 static gboolean
3752 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3753 {
3754   GstCaps *caps;
3755   gboolean result;
3756
3757   result = FALSE;
3758
3759   /* this returns the intersection between our caps and the peer caps. If there
3760    * is no peer, it returns NULL and we can't operate in pull mode so we can
3761    * fail the negotiation. */
3762   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3763   if (caps == NULL || gst_caps_is_empty (caps))
3764     goto no_caps_possible;
3765
3766   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3767
3768   caps = gst_caps_make_writable (caps);
3769   /* get the first (prefered) format */
3770   gst_caps_truncate (caps);
3771   /* try to fixate */
3772   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3773
3774   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3775
3776   if (gst_caps_is_any (caps)) {
3777     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3778         "allowing pull()");
3779     /* neither side has template caps in this case, so they are prepared for
3780        pull() without setcaps() */
3781     result = TRUE;
3782   } else if (gst_caps_is_fixed (caps)) {
3783     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3784       goto could_not_set_caps;
3785
3786     GST_OBJECT_LOCK (basesink);
3787     gst_caps_replace (&basesink->priv->pull_caps, caps);
3788     GST_OBJECT_UNLOCK (basesink);
3789
3790     result = TRUE;
3791   }
3792
3793   gst_caps_unref (caps);
3794
3795   return result;
3796
3797 no_caps_possible:
3798   {
3799     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3800     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3801     if (caps)
3802       gst_caps_unref (caps);
3803     return FALSE;
3804   }
3805 could_not_set_caps:
3806   {
3807     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3808     gst_caps_unref (caps);
3809     return FALSE;
3810   }
3811 }
3812
3813 /* this won't get called until we implement an activate function */
3814 static gboolean
3815 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3816 {
3817   gboolean result = FALSE;
3818   GstBaseSink *basesink;
3819   GstBaseSinkClass *bclass;
3820
3821   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3822   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3823
3824   if (active) {
3825     GstFormat format;
3826     gint64 duration;
3827
3828     /* we mark we have a newsegment here because pull based
3829      * mode works just fine without having a newsegment before the
3830      * first buffer */
3831     format = GST_FORMAT_BYTES;
3832
3833     gst_segment_init (&basesink->segment, format);
3834     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3835     GST_OBJECT_LOCK (basesink);
3836     basesink->have_newsegment = TRUE;
3837     GST_OBJECT_UNLOCK (basesink);
3838
3839     /* get the peer duration in bytes */
3840     result = gst_pad_query_peer_duration (pad, &format, &duration);
3841     if (result) {
3842       GST_DEBUG_OBJECT (basesink,
3843           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3844       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3845           duration);
3846       gst_segment_set_duration (&basesink->segment, format, duration);
3847     } else {
3848       GST_DEBUG_OBJECT (basesink, "unknown duration");
3849     }
3850
3851     if (bclass->activate_pull)
3852       result = bclass->activate_pull (basesink, TRUE);
3853     else
3854       result = FALSE;
3855
3856     if (!result)
3857       goto activate_failed;
3858
3859   } else {
3860     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3861       g_warning ("Internal GStreamer activation error!!!");
3862       result = FALSE;
3863     } else {
3864       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3865       if (bclass->activate_pull)
3866         result &= bclass->activate_pull (basesink, FALSE);
3867       basesink->pad_mode = GST_ACTIVATE_NONE;
3868       /* clear any pending caps */
3869       GST_OBJECT_LOCK (basesink);
3870       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3871       GST_OBJECT_UNLOCK (basesink);
3872     }
3873   }
3874   gst_object_unref (basesink);
3875
3876   return result;
3877
3878   /* ERRORS */
3879 activate_failed:
3880   {
3881     /* reset, as starting the thread failed */
3882     basesink->pad_mode = GST_ACTIVATE_NONE;
3883
3884     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3885     return FALSE;
3886   }
3887 }
3888
3889 /* send an event to our sinkpad peer. */
3890 static gboolean
3891 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3892 {
3893   GstPad *pad;
3894   GstBaseSink *basesink = GST_BASE_SINK (element);
3895   gboolean forward, result = TRUE;
3896   GstActivateMode mode;
3897
3898   GST_OBJECT_LOCK (element);
3899   /* get the pad and the scheduling mode */
3900   pad = gst_object_ref (basesink->sinkpad);
3901   mode = basesink->pad_mode;
3902   GST_OBJECT_UNLOCK (element);
3903
3904   /* only push UPSTREAM events upstream */
3905   forward = GST_EVENT_IS_UPSTREAM (event);
3906
3907   switch (GST_EVENT_TYPE (event)) {
3908     case GST_EVENT_LATENCY:
3909     {
3910       GstClockTime latency;
3911
3912       gst_event_parse_latency (event, &latency);
3913
3914       /* store the latency. We use this to adjust the running_time before syncing
3915        * it to the clock. */
3916       GST_OBJECT_LOCK (element);
3917       basesink->priv->latency = latency;
3918       if (!basesink->priv->have_latency)
3919         forward = FALSE;
3920       GST_OBJECT_UNLOCK (element);
3921       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3922           GST_TIME_ARGS (latency));
3923
3924       /* We forward this event so that all elements know about the global pipeline
3925        * latency. This is interesting for an element when it wants to figure out
3926        * when a particular piece of data will be rendered. */
3927       break;
3928     }
3929     case GST_EVENT_SEEK:
3930       /* in pull mode we will execute the seek */
3931       if (mode == GST_ACTIVATE_PULL)
3932         result = gst_base_sink_perform_seek (basesink, pad, event);
3933       break;
3934     case GST_EVENT_STEP:
3935       result = gst_base_sink_perform_step (basesink, pad, event);
3936       forward = FALSE;
3937       break;
3938     default:
3939       break;
3940   }
3941
3942   if (forward) {
3943     result = gst_pad_push_event (pad, event);
3944   } else {
3945     /* not forwarded, unref the event */
3946     gst_event_unref (event);
3947   }
3948
3949   gst_object_unref (pad);
3950   return result;
3951 }
3952
3953 static gboolean
3954 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3955 {
3956   GstPad *peer;
3957   gboolean res = FALSE;
3958
3959   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3960     res = gst_pad_query (peer, query);
3961     gst_object_unref (peer);
3962   }
3963   return res;
3964 }
3965
3966 /* get the end position of the last seen object, this is used
3967  * for EOS and for making sure that we don't report a position we
3968  * have not reached yet. With LOCK. */
3969 static gboolean
3970 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
3971     gint64 * cur)
3972 {
3973   GstFormat oformat;
3974   GstSegment *segment;
3975   gboolean ret = TRUE;
3976
3977   segment = &basesink->segment;
3978   oformat = segment->format;
3979
3980   if (oformat == GST_FORMAT_TIME) {
3981     /* return last observed stream time, we keep the stream time around in the
3982      * time format. */
3983     *cur = basesink->priv->current_sstop;
3984   } else {
3985     /* convert last stop to stream time */
3986     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
3987   }
3988
3989   if (*cur != -1 && oformat != format) {
3990     GST_OBJECT_UNLOCK (basesink);
3991     /* convert to the target format if we need to, release lock first */
3992     ret =
3993         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
3994     if (!ret)
3995       *cur = -1;
3996     GST_OBJECT_LOCK (basesink);
3997   }
3998
3999   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
4000       GST_TIME_ARGS (*cur));
4001
4002   return ret;
4003 }
4004
4005 /* get the position when we are PAUSED, this is the stream time of the buffer
4006  * that prerolled. If no buffer is prerolled (we are still flushing), this
4007  * value will be -1. With LOCK. */
4008 static gboolean
4009 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4010     gint64 * cur)
4011 {
4012   gboolean res;
4013   gint64 time;
4014   GstSegment *segment;
4015   GstFormat oformat;
4016
4017   /* we don't use the clip segment in pull mode, when seeking we update the
4018    * main segment directly with the new segment values without it having to be
4019    * activated by the rendering after preroll */
4020   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4021     segment = basesink->abidata.ABI.clip_segment;
4022   else
4023     segment = &basesink->segment;
4024   oformat = segment->format;
4025
4026   if (oformat == GST_FORMAT_TIME) {
4027     *cur = basesink->priv->current_sstart;
4028     if (segment->rate < 0.0 && basesink->priv->current_sstop != -1) {
4029       /* for reverse playback we prefer the stream time stop position if we have
4030        * one */
4031       *cur = basesink->priv->current_sstop;
4032     }
4033   } else {
4034     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4035   }
4036
4037   time = segment->time;
4038
4039   if (*cur != -1) {
4040     *cur = MAX (*cur, time);
4041     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4042         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4043   } else {
4044     /* we have no buffer, use the segment times. */
4045     if (segment->rate >= 0.0) {
4046       /* forward, next position is always the time of the segment */
4047       *cur = time;
4048       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4049           GST_TIME_ARGS (*cur));
4050     } else {
4051       /* reverse, next expected timestamp is segment->stop. We use the function
4052        * to get things right for negative applied_rates. */
4053       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4054       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4055           GST_TIME_ARGS (*cur));
4056     }
4057   }
4058
4059   res = (*cur != -1);
4060   if (res && oformat != format) {
4061     GST_OBJECT_UNLOCK (basesink);
4062     res =
4063         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4064     if (!res)
4065       *cur = -1;
4066     GST_OBJECT_LOCK (basesink);
4067   }
4068
4069   return res;
4070 }
4071
4072 static gboolean
4073 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4074     gint64 * cur, gboolean * upstream)
4075 {
4076   GstClock *clock;
4077   gboolean res = FALSE;
4078   GstFormat oformat, tformat;
4079   GstClockTime now, base, latency;
4080   gint64 time, accum, duration;
4081   gdouble rate;
4082   gint64 last;
4083
4084   GST_OBJECT_LOCK (basesink);
4085   /* our intermediate time format */
4086   tformat = GST_FORMAT_TIME;
4087   /* get the format in the segment */
4088   oformat = basesink->segment.format;
4089
4090   /* can only give answer based on the clock if not EOS */
4091   if (G_UNLIKELY (basesink->eos))
4092     goto in_eos;
4093
4094   /* we can only get the segment when we are not NULL or READY */
4095   if (!basesink->have_newsegment)
4096     goto wrong_state;
4097
4098   /* when not in PLAYING or when we're busy with a state change, we
4099    * cannot read from the clock so we report time based on the
4100    * last seen timestamp. */
4101   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4102       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4103     goto in_pause;
4104
4105   /* we need to sync on the clock. */
4106   if (basesink->sync == FALSE)
4107     goto no_sync;
4108
4109   /* and we need a clock */
4110   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4111     goto no_sync;
4112
4113   /* collect all data we need holding the lock */
4114   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4115     time = basesink->segment.time;
4116   else
4117     time = 0;
4118
4119   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4120     duration = basesink->segment.stop - basesink->segment.start;
4121   else
4122     duration = 0;
4123
4124   base = GST_ELEMENT_CAST (basesink)->base_time;
4125   accum = basesink->segment.accum;
4126   rate = basesink->segment.rate * basesink->segment.applied_rate;
4127   latency = basesink->priv->latency;
4128
4129   gst_object_ref (clock);
4130
4131   /* this function might release the LOCK */
4132   gst_base_sink_get_position_last (basesink, format, &last);
4133
4134   /* need to release the object lock before we can get the time, 
4135    * a clock might take the LOCK of the provider, which could be
4136    * a basesink subclass. */
4137   GST_OBJECT_UNLOCK (basesink);
4138
4139   now = gst_clock_get_time (clock);
4140
4141   if (oformat != tformat) {
4142     /* convert accum, time and duration to time */
4143     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4144             &accum))
4145       goto convert_failed;
4146     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4147             &duration))
4148       goto convert_failed;
4149     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4150             &time))
4151       goto convert_failed;
4152   }
4153
4154   /* subtract base time and accumulated time from the clock time. 
4155    * Make sure we don't go negative. This is the current time in
4156    * the segment which we need to scale with the combined 
4157    * rate and applied rate. */
4158   base += accum;
4159   base += latency;
4160   base = MIN (now, base);
4161
4162   /* for negative rates we need to count back from from the segment
4163    * duration. */
4164   if (rate < 0.0)
4165     time += duration;
4166
4167   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4168
4169   /* never report more than last seen position */
4170   if (last != -1)
4171     *cur = MIN (last, *cur);
4172
4173   gst_object_unref (clock);
4174
4175   GST_DEBUG_OBJECT (basesink,
4176       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4177       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4178       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4179       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4180
4181   if (oformat != format) {
4182     /* convert time to final format */
4183     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4184       goto convert_failed;
4185   }
4186
4187   res = TRUE;
4188
4189 done:
4190   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4191       res, GST_TIME_ARGS (*cur));
4192   return res;
4193
4194   /* special cases */
4195 in_eos:
4196   {
4197     GST_DEBUG_OBJECT (basesink, "position in EOS");
4198     res = gst_base_sink_get_position_last (basesink, format, cur);
4199     GST_OBJECT_UNLOCK (basesink);
4200     goto done;
4201   }
4202 in_pause:
4203   {
4204     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4205     res = gst_base_sink_get_position_paused (basesink, format, cur);
4206     GST_OBJECT_UNLOCK (basesink);
4207     goto done;
4208   }
4209 wrong_state:
4210   {
4211     /* in NULL or READY we always return FALSE and -1 */
4212     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4213     res = FALSE;
4214     *cur = -1;
4215     GST_OBJECT_UNLOCK (basesink);
4216     goto done;
4217   }
4218 no_sync:
4219   {
4220     /* report last seen timestamp if any, else ask upstream to answer */
4221     if ((*cur = basesink->priv->current_sstart) != -1)
4222       res = TRUE;
4223     else
4224       *upstream = TRUE;
4225
4226     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4227         res, GST_TIME_ARGS (*cur));
4228     GST_OBJECT_UNLOCK (basesink);
4229     return res;
4230   }
4231 convert_failed:
4232   {
4233     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4234     *upstream = TRUE;
4235     return FALSE;
4236   }
4237 }
4238
4239 static gboolean
4240 gst_base_sink_query (GstElement * element, GstQuery * query)
4241 {
4242   gboolean res = FALSE;
4243
4244   GstBaseSink *basesink = GST_BASE_SINK (element);
4245
4246   switch (GST_QUERY_TYPE (query)) {
4247     case GST_QUERY_POSITION:
4248     {
4249       gint64 cur = 0;
4250       GstFormat format;
4251       gboolean upstream = FALSE;
4252
4253       gst_query_parse_position (query, &format, NULL);
4254
4255       GST_DEBUG_OBJECT (basesink, "position format %d", format);
4256
4257       /* first try to get the position based on the clock */
4258       if ((res =
4259               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4260         gst_query_set_position (query, format, cur);
4261       } else if (upstream) {
4262         /* fallback to peer query */
4263         res = gst_base_sink_peer_query (basesink, query);
4264       }
4265       break;
4266     }
4267     case GST_QUERY_DURATION:
4268     {
4269       GstFormat format, uformat;
4270       gint64 duration, uduration;
4271
4272       gst_query_parse_duration (query, &format, NULL);
4273
4274       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4275           gst_format_get_name (format));
4276
4277       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4278         uformat = GST_FORMAT_BYTES;
4279
4280         /* get the duration in bytes, in pull mode that's all we are sure to
4281          * know. We have to explicitly get this value from upstream instead of
4282          * using our cached value because it might change. Duration caching
4283          * should be done at a higher level. */
4284         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4285             &uduration);
4286         if (res) {
4287           gst_segment_set_duration (&basesink->segment, uformat, uduration);
4288           if (format != uformat) {
4289             /* convert to the requested format */
4290             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4291                 &format, &duration);
4292           } else {
4293             duration = uduration;
4294           }
4295           if (res) {
4296             /* set the result */
4297             gst_query_set_duration (query, format, duration);
4298           }
4299         }
4300       } else {
4301         /* in push mode we simply forward upstream */
4302         res = gst_base_sink_peer_query (basesink, query);
4303       }
4304       break;
4305     }
4306     case GST_QUERY_LATENCY:
4307     {
4308       gboolean live, us_live;
4309       GstClockTime min, max;
4310
4311       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4312                   &max))) {
4313         gst_query_set_latency (query, live, min, max);
4314       }
4315       break;
4316     }
4317     case GST_QUERY_JITTER:
4318       break;
4319     case GST_QUERY_RATE:
4320       /* gst_query_set_rate (query, basesink->segment_rate); */
4321       res = TRUE;
4322       break;
4323     case GST_QUERY_SEGMENT:
4324     {
4325       /* FIXME, bring start/stop to stream time */
4326       gst_query_set_segment (query, basesink->segment.rate,
4327           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4328       break;
4329     }
4330     case GST_QUERY_SEEKING:
4331     case GST_QUERY_CONVERT:
4332     case GST_QUERY_FORMATS:
4333     default:
4334       res = gst_base_sink_peer_query (basesink, query);
4335       break;
4336   }
4337   return res;
4338 }
4339
4340 static GstStateChangeReturn
4341 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4342 {
4343   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4344   GstBaseSink *basesink = GST_BASE_SINK (element);
4345   GstBaseSinkClass *bclass;
4346   GstBaseSinkPrivate *priv;
4347
4348   priv = basesink->priv;
4349
4350   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4351
4352   switch (transition) {
4353     case GST_STATE_CHANGE_NULL_TO_READY:
4354       if (bclass->start)
4355         if (!bclass->start (basesink))
4356           goto start_failed;
4357       break;
4358     case GST_STATE_CHANGE_READY_TO_PAUSED:
4359       /* need to complete preroll before this state change completes, there
4360        * is no data flow in READY so we can safely assume we need to preroll. */
4361       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4362       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4363       basesink->have_newsegment = FALSE;
4364       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4365       gst_segment_init (basesink->abidata.ABI.clip_segment,
4366           GST_FORMAT_UNDEFINED);
4367       basesink->offset = 0;
4368       basesink->have_preroll = FALSE;
4369       priv->step_unlock = FALSE;
4370       basesink->need_preroll = TRUE;
4371       basesink->playing_async = TRUE;
4372       priv->current_sstart = -1;
4373       priv->current_sstop = -1;
4374       priv->eos_rtime = -1;
4375       priv->latency = 0;
4376       basesink->eos = FALSE;
4377       priv->received_eos = FALSE;
4378       gst_base_sink_reset_qos (basesink);
4379       priv->commited = FALSE;
4380       priv->call_preroll = TRUE;
4381       priv->current_step.valid = FALSE;
4382       priv->pending_step.valid = FALSE;
4383       if (priv->async_enabled) {
4384         GST_DEBUG_OBJECT (basesink, "doing async state change");
4385         /* when async enabled, post async-start message and return ASYNC from
4386          * the state change function */
4387         ret = GST_STATE_CHANGE_ASYNC;
4388         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4389             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4390       } else {
4391         priv->have_latency = TRUE;
4392       }
4393       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4394       break;
4395     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4396       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4397       if (!gst_base_sink_needs_preroll (basesink)) {
4398         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4399         /* no preroll needed anymore now. */
4400         basesink->playing_async = FALSE;
4401         basesink->need_preroll = FALSE;
4402         if (basesink->eos) {
4403           GstMessage *message;
4404
4405           /* need to post EOS message here */
4406           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4407           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4408           gst_message_set_seqnum (message, basesink->priv->seqnum);
4409           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4410         } else {
4411           GST_DEBUG_OBJECT (basesink, "signal preroll");
4412           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4413         }
4414       } else {
4415         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4416         basesink->need_preroll = TRUE;
4417         basesink->playing_async = TRUE;
4418         priv->call_preroll = TRUE;
4419         priv->commited = FALSE;
4420         if (priv->async_enabled) {
4421           GST_DEBUG_OBJECT (basesink, "doing async state change");
4422           ret = GST_STATE_CHANGE_ASYNC;
4423           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4424               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4425         }
4426       }
4427       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4428       break;
4429     default:
4430       break;
4431   }
4432
4433   {
4434     GstStateChangeReturn bret;
4435
4436     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4437     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4438       goto activate_failed;
4439   }
4440
4441   switch (transition) {
4442     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4443       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4444       /* FIXME, make sure we cannot enter _render first */
4445
4446       /* we need to call ::unlock before locking PREROLL_LOCK
4447        * since we lock it before going into ::render */
4448       if (bclass->unlock)
4449         bclass->unlock (basesink);
4450
4451       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4452       /* now that we have the PREROLL lock, clear our unlock request */
4453       if (bclass->unlock_stop)
4454         bclass->unlock_stop (basesink);
4455
4456       /* we need preroll again and we set the flag before unlocking the clockid
4457        * because if the clockid is unlocked before a current buffer expired, we
4458        * can use that buffer to preroll with */
4459       basesink->need_preroll = TRUE;
4460
4461       if (basesink->clock_id) {
4462         gst_clock_id_unschedule (basesink->clock_id);
4463       }
4464
4465       /* if we don't have a preroll buffer we need to wait for a preroll and
4466        * return ASYNC. */
4467       if (!gst_base_sink_needs_preroll (basesink)) {
4468         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4469         basesink->playing_async = FALSE;
4470       } else {
4471         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4472           ret = GST_STATE_CHANGE_SUCCESS;
4473         } else {
4474           GST_DEBUG_OBJECT (basesink,
4475               "PLAYING to PAUSED, we are not prerolled");
4476           basesink->playing_async = TRUE;
4477           priv->commited = FALSE;
4478           priv->call_preroll = TRUE;
4479           if (priv->async_enabled) {
4480             GST_DEBUG_OBJECT (basesink, "doing async state change");
4481             ret = GST_STATE_CHANGE_ASYNC;
4482             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4483                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4484                     FALSE));
4485           }
4486         }
4487       }
4488       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4489           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4490
4491       gst_base_sink_reset_qos (basesink);
4492       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4493       break;
4494     case GST_STATE_CHANGE_PAUSED_TO_READY:
4495       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4496       /* start by reseting our position state with the object lock so that the
4497        * position query gets the right idea. We do this before we post the
4498        * messages so that the message handlers pick this up. */
4499       GST_OBJECT_LOCK (basesink);
4500       basesink->have_newsegment = FALSE;
4501       priv->current_sstart = -1;
4502       priv->current_sstop = -1;
4503       priv->have_latency = FALSE;
4504       GST_OBJECT_UNLOCK (basesink);
4505
4506       gst_base_sink_set_last_buffer (basesink, NULL);
4507       priv->call_preroll = FALSE;
4508
4509       if (!priv->commited) {
4510         if (priv->async_enabled) {
4511           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4512
4513           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4514               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4515                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4516
4517           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4518               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4519         }
4520         priv->commited = TRUE;
4521       } else {
4522         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4523       }
4524       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4525       break;
4526     case GST_STATE_CHANGE_READY_TO_NULL:
4527       if (bclass->stop) {
4528         if (!bclass->stop (basesink)) {
4529           GST_WARNING_OBJECT (basesink, "failed to stop");
4530         }
4531       }
4532       gst_base_sink_set_last_buffer (basesink, NULL);
4533       priv->call_preroll = FALSE;
4534       break;
4535     default:
4536       break;
4537   }
4538
4539   return ret;
4540
4541   /* ERRORS */
4542 start_failed:
4543   {
4544     GST_DEBUG_OBJECT (basesink, "failed to start");
4545     return GST_STATE_CHANGE_FAILURE;
4546   }
4547 activate_failed:
4548   {
4549     GST_DEBUG_OBJECT (basesink,
4550         "element failed to change states -- activation problem?");
4551     return GST_STATE_CHANGE_FAILURE;
4552   }
4553 }