basesink: fix clipped start/stop after step
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
156
157 typedef struct
158 {
159   gboolean valid;               /* if this info is valid */
160   guint32 seqnum;               /* the seqnum of the STEP event */
161   GstFormat format;             /* the format of the amount */
162   guint64 amount;               /* the total amount of data to skip */
163   guint64 position;             /* the position in the stepped data */
164   guint64 duration;             /* the duration in time of the skipped data */
165   guint64 start;                /* running_time of the start */
166   gdouble rate;                 /* rate of skipping */
167   gdouble start_rate;           /* rate before skipping */
168   gboolean flush;               /* if this was a flushing step */
169   gboolean intermediate;        /* if this is an intermediate step */
170   gboolean need_preroll;        /* if we need preroll after this step */
171 } GstStepInfo;
172
173 /* FIXME, some stuff in ABI.data and other in Private...
174  * Make up your mind please.
175  */
176 struct _GstBaseSinkPrivate
177 {
178   gint qos_enabled;             /* ATOMIC */
179   gboolean async_enabled;
180   GstClockTimeDiff ts_offset;
181   GstClockTime render_delay;
182
183   /* start, stop of current buffer, stream time, used to report position */
184   GstClockTime current_sstart;
185   GstClockTime current_sstop;
186
187   /* start, stop and jitter of current buffer, running time */
188   GstClockTime current_rstart;
189   GstClockTime current_rstop;
190   GstClockTimeDiff current_jitter;
191
192   /* EOS sync time in running time */
193   GstClockTime eos_rtime;
194
195   /* last buffer that arrived in time, running time */
196   GstClockTime last_in_time;
197   /* when the last buffer left the sink, running time */
198   GstClockTime last_left;
199
200   /* running averages go here these are done on running time */
201   GstClockTime avg_pt;
202   GstClockTime avg_duration;
203   gdouble avg_rate;
204
205   /* these are done on system time. avg_jitter and avg_render are
206    * compared to eachother to see if the rendering time takes a
207    * huge amount of the processing, If so we are flooded with
208    * buffers. */
209   GstClockTime last_left_systime;
210   GstClockTime avg_jitter;
211   GstClockTime start, stop;
212   GstClockTime avg_render;
213
214   /* number of rendered and dropped frames */
215   guint64 rendered;
216   guint64 dropped;
217
218   /* latency stuff */
219   GstClockTime latency;
220
221   /* if we already commited the state */
222   gboolean commited;
223
224   /* when we received EOS */
225   gboolean received_eos;
226
227   /* when we are prerolled and able to report latency */
228   gboolean have_latency;
229
230   /* the last buffer we prerolled or rendered. Useful for making snapshots */
231   GstBuffer *last_buffer;
232
233   /* caps for pull based scheduling */
234   GstCaps *pull_caps;
235
236   /* blocksize for pulling */
237   guint blocksize;
238
239   gboolean discont;
240
241   /* seqnum of the stream */
242   guint32 seqnum;
243
244   gboolean call_preroll;
245   gboolean step_unlock;
246
247   /* we have a pending and a current step operation */
248   GstStepInfo current_step;
249   GstStepInfo pending_step;
250 };
251
252 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
253
254 /* generic running average, this has a neutral window size */
255 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
256
257 /* the windows for these running averages are experimentally obtained.
258  * possitive values get averaged more while negative values use a small
259  * window so we can react faster to badness. */
260 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
261 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
262
263 /* BaseSink properties */
264
265 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
266 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
267
268 #define DEFAULT_PREROLL_QUEUE_LEN       0
269 #define DEFAULT_SYNC                    TRUE
270 #define DEFAULT_MAX_LATENESS            -1
271 #define DEFAULT_QOS                     FALSE
272 #define DEFAULT_ASYNC                   TRUE
273 #define DEFAULT_TS_OFFSET               0
274 #define DEFAULT_BLOCKSIZE               4096
275 #define DEFAULT_RENDER_DELAY            0
276
277 enum
278 {
279   PROP_0,
280   PROP_PREROLL_QUEUE_LEN,
281   PROP_SYNC,
282   PROP_MAX_LATENESS,
283   PROP_QOS,
284   PROP_ASYNC,
285   PROP_TS_OFFSET,
286   PROP_LAST_BUFFER,
287   PROP_BLOCKSIZE,
288   PROP_RENDER_DELAY,
289   PROP_LAST
290 };
291
292 static GstElementClass *parent_class = NULL;
293
294 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
295 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
296 static void gst_base_sink_finalize (GObject * object);
297
298 GType
299 gst_base_sink_get_type (void)
300 {
301   static volatile gsize base_sink_type = 0;
302
303   if (g_once_init_enter (&base_sink_type)) {
304     GType _type;
305     static const GTypeInfo base_sink_info = {
306       sizeof (GstBaseSinkClass),
307       NULL,
308       NULL,
309       (GClassInitFunc) gst_base_sink_class_init,
310       NULL,
311       NULL,
312       sizeof (GstBaseSink),
313       0,
314       (GInstanceInitFunc) gst_base_sink_init,
315     };
316
317     _type = g_type_register_static (GST_TYPE_ELEMENT,
318         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
319     g_once_init_leave (&base_sink_type, _type);
320   }
321   return base_sink_type;
322 }
323
324 static void gst_base_sink_set_property (GObject * object, guint prop_id,
325     const GValue * value, GParamSpec * pspec);
326 static void gst_base_sink_get_property (GObject * object, guint prop_id,
327     GValue * value, GParamSpec * pspec);
328
329 static gboolean gst_base_sink_send_event (GstElement * element,
330     GstEvent * event);
331 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
332
333 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
334 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
335 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
336     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
337 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
338     GstClockTime * start, GstClockTime * end);
339 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
340     GstPad * pad, gboolean flushing);
341 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
342     gboolean active);
343 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
344     GstSegment * segment);
345 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
346     GstEvent * event, GstSegment * segment);
347
348 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
349     GstStateChange transition);
350
351 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
352 static void gst_base_sink_loop (GstPad * pad);
353 static gboolean gst_base_sink_pad_activate (GstPad * pad);
354 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
355 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
356 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
357 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
358
359 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
360
361 /* check if an object was too late */
362 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
363     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
364     GstClockReturn status, GstClockTimeDiff jitter);
365 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
366     GstMiniObject * obj);
367
368 static void
369 gst_base_sink_class_init (GstBaseSinkClass * klass)
370 {
371   GObjectClass *gobject_class;
372   GstElementClass *gstelement_class;
373
374   gobject_class = G_OBJECT_CLASS (klass);
375   gstelement_class = GST_ELEMENT_CLASS (klass);
376
377   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
378       "basesink element");
379
380   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
381
382   parent_class = g_type_class_peek_parent (klass);
383
384   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
385   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
386   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
387
388   /* FIXME, this next value should be configured using an event from the
389    * upstream element, ie, the BUFFER_SIZE event. */
390   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
391       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
392           "Number of buffers to queue during preroll", 0, G_MAXUINT,
393           DEFAULT_PREROLL_QUEUE_LEN,
394           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
395
396   g_object_class_install_property (gobject_class, PROP_SYNC,
397       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
401       g_param_spec_int64 ("max-lateness", "Max Lateness",
402           "Maximum number of nanoseconds that a buffer can be late before it "
403           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
404           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
405
406   g_object_class_install_property (gobject_class, PROP_QOS,
407       g_param_spec_boolean ("qos", "Qos",
408           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
409           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410   /**
411    * GstBaseSink:async
412    *
413    * If set to #TRUE, the basesink will perform asynchronous state changes.
414    * When set to #FALSE, the sink will not signal the parent when it prerolls.
415    * Use this option when dealing with sparse streams or when synchronisation is
416    * not required.
417    *
418    * Since: 0.10.15
419    */
420   g_object_class_install_property (gobject_class, PROP_ASYNC,
421       g_param_spec_boolean ("async", "Async",
422           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424   /**
425    * GstBaseSink:ts-offset
426    *
427    * Controls the final synchronisation, a negative value will render the buffer
428    * earlier while a positive value delays playback. This property can be 
429    * used to fix synchronisation in bad files.
430    *
431    * Since: 0.10.15
432    */
433   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
434       g_param_spec_int64 ("ts-offset", "TS Offset",
435           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
436           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437   /**
438    * GstBaseSink:last-buffer
439    *
440    * The last buffer that arrived in the sink and was used for preroll or for
441    * rendering. This property can be used to generate thumbnails. This property
442    * can be NULL when the sink has not yet received a bufer.
443    *
444    * Since: 0.10.15
445    */
446   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
447       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
448           "The last buffer received in the sink", GST_TYPE_BUFFER,
449           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
450   /**
451    * GstBaseSink:blocksize
452    *
453    * The amount of bytes to pull when operating in pull mode.
454    *
455    * Since: 0.10.22
456    */
457   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
458       g_param_spec_uint ("blocksize", "Block size",
459           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
460           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
461   /**
462    * GstBaseSink:render-delay
463    *
464    * The additional delay between synchronisation and actual rendering of the
465    * media. This property will add additional latency to the device in order to
466    * make other sinks compensate for the delay.
467    *
468    * Since: 0.10.22
469    */
470   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
471       g_param_spec_uint64 ("render-delay", "Render Delay",
472           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
473           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
475   gstelement_class->change_state =
476       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
477   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
478   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
479
480   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
481   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
482   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
483   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
484   klass->activate_pull =
485       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
486 }
487
488 static GstCaps *
489 gst_base_sink_pad_getcaps (GstPad * pad)
490 {
491   GstBaseSinkClass *bclass;
492   GstBaseSink *bsink;
493   GstCaps *caps = NULL;
494
495   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
496   bclass = GST_BASE_SINK_GET_CLASS (bsink);
497
498   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
499     /* if we are operating in pull mode we only accept the negotiated caps */
500     GST_OBJECT_LOCK (pad);
501     if ((caps = GST_PAD_CAPS (pad)))
502       gst_caps_ref (caps);
503     GST_OBJECT_UNLOCK (pad);
504   }
505   if (caps == NULL) {
506     if (bclass->get_caps)
507       caps = bclass->get_caps (bsink);
508
509     if (caps == NULL) {
510       GstPadTemplate *pad_template;
511
512       pad_template =
513           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
514           "sink");
515       if (pad_template != NULL) {
516         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
517       }
518     }
519   }
520   gst_object_unref (bsink);
521
522   return caps;
523 }
524
525 static gboolean
526 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
527 {
528   GstBaseSinkClass *bclass;
529   GstBaseSink *bsink;
530   gboolean res = TRUE;
531
532   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
533   bclass = GST_BASE_SINK_GET_CLASS (bsink);
534
535   if (res && bclass->set_caps)
536     res = bclass->set_caps (bsink, caps);
537
538   gst_object_unref (bsink);
539
540   return res;
541 }
542
543 static void
544 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
545 {
546   GstBaseSinkClass *bclass;
547   GstBaseSink *bsink;
548
549   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
550   bclass = GST_BASE_SINK_GET_CLASS (bsink);
551
552   if (bclass->fixate)
553     bclass->fixate (bsink, caps);
554
555   gst_object_unref (bsink);
556 }
557
558 static GstFlowReturn
559 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
560     GstCaps * caps, GstBuffer ** buf)
561 {
562   GstBaseSinkClass *bclass;
563   GstBaseSink *bsink;
564   GstFlowReturn result = GST_FLOW_OK;
565
566   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
567   bclass = GST_BASE_SINK_GET_CLASS (bsink);
568
569   if (bclass->buffer_alloc)
570     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
571   else
572     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
573
574   gst_object_unref (bsink);
575
576   return result;
577 }
578
579 static void
580 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
581 {
582   GstPadTemplate *pad_template;
583   GstBaseSinkPrivate *priv;
584
585   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
586
587   pad_template =
588       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
589   g_return_if_fail (pad_template != NULL);
590
591   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
592
593   gst_pad_set_getcaps_function (basesink->sinkpad,
594       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
595   gst_pad_set_setcaps_function (basesink->sinkpad,
596       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
597   gst_pad_set_fixatecaps_function (basesink->sinkpad,
598       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
599   gst_pad_set_bufferalloc_function (basesink->sinkpad,
600       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
601   gst_pad_set_activate_function (basesink->sinkpad,
602       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
603   gst_pad_set_activatepush_function (basesink->sinkpad,
604       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
605   gst_pad_set_activatepull_function (basesink->sinkpad,
606       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
607   gst_pad_set_event_function (basesink->sinkpad,
608       GST_DEBUG_FUNCPTR (gst_base_sink_event));
609   gst_pad_set_chain_function (basesink->sinkpad,
610       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
611   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
612
613   basesink->pad_mode = GST_ACTIVATE_NONE;
614   basesink->preroll_queue = g_queue_new ();
615   basesink->abidata.ABI.clip_segment = gst_segment_new ();
616   priv->have_latency = FALSE;
617
618   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
619   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
620
621   basesink->sync = DEFAULT_SYNC;
622   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
623   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
624   priv->async_enabled = DEFAULT_ASYNC;
625   priv->ts_offset = DEFAULT_TS_OFFSET;
626   priv->render_delay = DEFAULT_RENDER_DELAY;
627   priv->blocksize = DEFAULT_BLOCKSIZE;
628
629   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
630 }
631
632 static void
633 gst_base_sink_finalize (GObject * object)
634 {
635   GstBaseSink *basesink;
636
637   basesink = GST_BASE_SINK (object);
638
639   g_queue_free (basesink->preroll_queue);
640   gst_segment_free (basesink->abidata.ABI.clip_segment);
641
642   G_OBJECT_CLASS (parent_class)->finalize (object);
643 }
644
645 /**
646  * gst_base_sink_set_sync:
647  * @sink: the sink
648  * @sync: the new sync value.
649  *
650  * Configures @sink to synchronize on the clock or not. When
651  * @sync is FALSE, incomming samples will be played as fast as
652  * possible. If @sync is TRUE, the timestamps of the incomming
653  * buffers will be used to schedule the exact render time of its
654  * contents.
655  *
656  * Since: 0.10.4
657  */
658 void
659 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
660 {
661   g_return_if_fail (GST_IS_BASE_SINK (sink));
662
663   GST_OBJECT_LOCK (sink);
664   sink->sync = sync;
665   GST_OBJECT_UNLOCK (sink);
666 }
667
668 /**
669  * gst_base_sink_get_sync:
670  * @sink: the sink
671  *
672  * Checks if @sink is currently configured to synchronize against the
673  * clock.
674  *
675  * Returns: TRUE if the sink is configured to synchronize against the clock.
676  *
677  * Since: 0.10.4
678  */
679 gboolean
680 gst_base_sink_get_sync (GstBaseSink * sink)
681 {
682   gboolean res;
683
684   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
685
686   GST_OBJECT_LOCK (sink);
687   res = sink->sync;
688   GST_OBJECT_UNLOCK (sink);
689
690   return res;
691 }
692
693 /**
694  * gst_base_sink_set_max_lateness:
695  * @sink: the sink
696  * @max_lateness: the new max lateness value.
697  *
698  * Sets the new max lateness value to @max_lateness. This value is
699  * used to decide if a buffer should be dropped or not based on the
700  * buffer timestamp and the current clock time. A value of -1 means
701  * an unlimited time.
702  *
703  * Since: 0.10.4
704  */
705 void
706 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
707 {
708   g_return_if_fail (GST_IS_BASE_SINK (sink));
709
710   GST_OBJECT_LOCK (sink);
711   sink->abidata.ABI.max_lateness = max_lateness;
712   GST_OBJECT_UNLOCK (sink);
713 }
714
715 /**
716  * gst_base_sink_get_max_lateness:
717  * @sink: the sink
718  *
719  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
720  * more details.
721  *
722  * Returns: The maximum time in nanoseconds that a buffer can be late
723  * before it is dropped and not rendered. A value of -1 means an
724  * unlimited time.
725  *
726  * Since: 0.10.4
727  */
728 gint64
729 gst_base_sink_get_max_lateness (GstBaseSink * sink)
730 {
731   gint64 res;
732
733   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
734
735   GST_OBJECT_LOCK (sink);
736   res = sink->abidata.ABI.max_lateness;
737   GST_OBJECT_UNLOCK (sink);
738
739   return res;
740 }
741
742 /**
743  * gst_base_sink_set_qos_enabled:
744  * @sink: the sink
745  * @enabled: the new qos value.
746  *
747  * Configures @sink to send Quality-of-Service events upstream.
748  *
749  * Since: 0.10.5
750  */
751 void
752 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
753 {
754   g_return_if_fail (GST_IS_BASE_SINK (sink));
755
756   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
757 }
758
759 /**
760  * gst_base_sink_is_qos_enabled:
761  * @sink: the sink
762  *
763  * Checks if @sink is currently configured to send Quality-of-Service events
764  * upstream.
765  *
766  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
767  *
768  * Since: 0.10.5
769  */
770 gboolean
771 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
772 {
773   gboolean res;
774
775   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
776
777   res = g_atomic_int_get (&sink->priv->qos_enabled);
778
779   return res;
780 }
781
782 /**
783  * gst_base_sink_set_async_enabled:
784  * @sink: the sink
785  * @enabled: the new async value.
786  *
787  * Configures @sink to perform all state changes asynchronusly. When async is
788  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
789  * preroll buffer. This feature is usefull if the sink does not synchronize
790  * against the clock or when it is dealing with sparse streams.
791  *
792  * Since: 0.10.15
793  */
794 void
795 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
796 {
797   g_return_if_fail (GST_IS_BASE_SINK (sink));
798
799   GST_PAD_PREROLL_LOCK (sink->sinkpad);
800   sink->priv->async_enabled = enabled;
801   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
802   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
803 }
804
805 /**
806  * gst_base_sink_is_async_enabled:
807  * @sink: the sink
808  *
809  * Checks if @sink is currently configured to perform asynchronous state
810  * changes to PAUSED.
811  *
812  * Returns: TRUE if the sink is configured to perform asynchronous state
813  * changes.
814  *
815  * Since: 0.10.15
816  */
817 gboolean
818 gst_base_sink_is_async_enabled (GstBaseSink * sink)
819 {
820   gboolean res;
821
822   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
823
824   GST_PAD_PREROLL_LOCK (sink->sinkpad);
825   res = sink->priv->async_enabled;
826   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
827
828   return res;
829 }
830
831 /**
832  * gst_base_sink_set_ts_offset:
833  * @sink: the sink
834  * @offset: the new offset
835  *
836  * Adjust the synchronisation of @sink with @offset. A negative value will
837  * render buffers earlier than their timestamp. A positive value will delay
838  * rendering. This function can be used to fix playback of badly timestamped
839  * buffers.
840  *
841  * Since: 0.10.15
842  */
843 void
844 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
845 {
846   g_return_if_fail (GST_IS_BASE_SINK (sink));
847
848   GST_OBJECT_LOCK (sink);
849   sink->priv->ts_offset = offset;
850   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
851   GST_OBJECT_UNLOCK (sink);
852 }
853
854 /**
855  * gst_base_sink_get_ts_offset:
856  * @sink: the sink
857  *
858  * Get the synchronisation offset of @sink.
859  *
860  * Returns: The synchronisation offset.
861  *
862  * Since: 0.10.15
863  */
864 GstClockTimeDiff
865 gst_base_sink_get_ts_offset (GstBaseSink * sink)
866 {
867   GstClockTimeDiff res;
868
869   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
870
871   GST_OBJECT_LOCK (sink);
872   res = sink->priv->ts_offset;
873   GST_OBJECT_UNLOCK (sink);
874
875   return res;
876 }
877
878 /**
879  * gst_base_sink_get_last_buffer:
880  * @sink: the sink
881  *
882  * Get the last buffer that arrived in the sink and was used for preroll or for
883  * rendering. This property can be used to generate thumbnails.
884  *
885  * The #GstCaps on the buffer can be used to determine the type of the buffer.
886  * 
887  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
888  * NULL when no buffer has arrived in the sink yet or when the sink is not in
889  * PAUSED or PLAYING.
890  *
891  * Since: 0.10.15
892  */
893 GstBuffer *
894 gst_base_sink_get_last_buffer (GstBaseSink * sink)
895 {
896   GstBuffer *res;
897
898   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
899
900   GST_OBJECT_LOCK (sink);
901   if ((res = sink->priv->last_buffer))
902     gst_buffer_ref (res);
903   GST_OBJECT_UNLOCK (sink);
904
905   return res;
906 }
907
908 static void
909 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
910 {
911   GstBuffer *old;
912
913   GST_OBJECT_LOCK (sink);
914   old = sink->priv->last_buffer;
915   if (G_LIKELY (old != buffer)) {
916     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
917     if (G_LIKELY (buffer))
918       gst_buffer_ref (buffer);
919     sink->priv->last_buffer = buffer;
920   } else {
921     old = NULL;
922   }
923   GST_OBJECT_UNLOCK (sink);
924
925   /* avoid unreffing with the lock because cleanup code might want to take the
926    * lock too */
927   if (G_LIKELY (old))
928     gst_buffer_unref (old);
929 }
930
931 /**
932  * gst_base_sink_get_latency:
933  * @sink: the sink
934  *
935  * Get the currently configured latency.
936  *
937  * Returns: The configured latency.
938  *
939  * Since: 0.10.12
940  */
941 GstClockTime
942 gst_base_sink_get_latency (GstBaseSink * sink)
943 {
944   GstClockTime res;
945
946   GST_OBJECT_LOCK (sink);
947   res = sink->priv->latency;
948   GST_OBJECT_UNLOCK (sink);
949
950   return res;
951 }
952
953 /**
954  * gst_base_sink_query_latency:
955  * @sink: the sink
956  * @live: if the sink is live
957  * @upstream_live: if an upstream element is live
958  * @min_latency: the min latency of the upstream elements
959  * @max_latency: the max latency of the upstream elements
960  *
961  * Query the sink for the latency parameters. The latency will be queried from
962  * the upstream elements. @live will be TRUE if @sink is configured to
963  * synchronize against the clock. @upstream_live will be TRUE if an upstream
964  * element is live. 
965  *
966  * If both @live and @upstream_live are TRUE, the sink will want to compensate
967  * for the latency introduced by the upstream elements by setting the
968  * @min_latency to a strictly possitive value.
969  *
970  * This function is mostly used by subclasses. 
971  *
972  * Returns: TRUE if the query succeeded.
973  *
974  * Since: 0.10.12
975  */
976 gboolean
977 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
978     gboolean * upstream_live, GstClockTime * min_latency,
979     GstClockTime * max_latency)
980 {
981   gboolean l, us_live, res, have_latency;
982   GstClockTime min, max, render_delay;
983   GstQuery *query;
984   GstClockTime us_min, us_max;
985
986   /* we are live when we sync to the clock */
987   GST_OBJECT_LOCK (sink);
988   l = sink->sync;
989   have_latency = sink->priv->have_latency;
990   render_delay = sink->priv->render_delay;
991   GST_OBJECT_UNLOCK (sink);
992
993   /* assume no latency */
994   min = 0;
995   max = -1;
996   us_live = FALSE;
997
998   if (have_latency) {
999     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1000     /* we are ready for a latency query this is when we preroll or when we are
1001      * not async. */
1002     query = gst_query_new_latency ();
1003
1004     /* ask the peer for the latency */
1005     if ((res = gst_base_sink_peer_query (sink, query))) {
1006       /* get upstream min and max latency */
1007       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1008
1009       if (us_live) {
1010         /* upstream live, use its latency, subclasses should use these
1011          * values to create the complete latency. */
1012         min = us_min;
1013         max = us_max;
1014       }
1015       if (l) {
1016         /* we need to add the render delay if we are live */
1017         if (min != -1)
1018           min += render_delay;
1019         if (max != -1)
1020           max += render_delay;
1021       }
1022     }
1023     gst_query_unref (query);
1024   } else {
1025     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1026     res = FALSE;
1027   }
1028
1029   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1030   if (!res) {
1031     if (!l) {
1032       res = TRUE;
1033       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1034     } else {
1035       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1036     }
1037   }
1038
1039   if (res) {
1040     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1041         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1042         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1043
1044     if (live)
1045       *live = l;
1046     if (upstream_live)
1047       *upstream_live = us_live;
1048     if (min_latency)
1049       *min_latency = min;
1050     if (max_latency)
1051       *max_latency = max;
1052   }
1053   return res;
1054 }
1055
1056 /**
1057  * gst_base_sink_set_render_delay:
1058  * @sink: a #GstBaseSink
1059  * @delay: the new delay
1060  *
1061  * Set the render delay in @sink to @delay. The render delay is the time 
1062  * between actual rendering of a buffer and its synchronisation time. Some
1063  * devices might delay media rendering which can be compensated for with this
1064  * function. 
1065  *
1066  * After calling this function, this sink will report additional latency and
1067  * other sinks will adjust their latency to delay the rendering of their media.
1068  *
1069  * This function is usually called by subclasses.
1070  *
1071  * Since: 0.10.21
1072  */
1073 void
1074 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1075 {
1076   GstClockTime old_render_delay;
1077
1078   g_return_if_fail (GST_IS_BASE_SINK (sink));
1079
1080   GST_OBJECT_LOCK (sink);
1081   old_render_delay = sink->priv->render_delay;
1082   sink->priv->render_delay = delay;
1083   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1084       GST_TIME_ARGS (delay));
1085   GST_OBJECT_UNLOCK (sink);
1086
1087   if (delay != old_render_delay) {
1088     GST_DEBUG_OBJECT (sink, "posting latency changed");
1089     gst_element_post_message (GST_ELEMENT_CAST (sink),
1090         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1091   }
1092 }
1093
1094 /**
1095  * gst_base_sink_get_render_delay:
1096  * @sink: a #GstBaseSink
1097  *
1098  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1099  * information about the render delay.
1100  *
1101  * Returns: the render delay of @sink.
1102  *
1103  * Since: 0.10.21
1104  */
1105 GstClockTime
1106 gst_base_sink_get_render_delay (GstBaseSink * sink)
1107 {
1108   GstClockTimeDiff res;
1109
1110   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1111
1112   GST_OBJECT_LOCK (sink);
1113   res = sink->priv->render_delay;
1114   GST_OBJECT_UNLOCK (sink);
1115
1116   return res;
1117 }
1118
1119 /**
1120  * gst_base_sink_set_blocksize:
1121  * @sink: a #GstBaseSink
1122  * @blocksize: the blocksize in bytes
1123  *
1124  * Set the number of bytes that the sink will pull when it is operating in pull
1125  * mode.
1126  *
1127  * Since: 0.10.22
1128  */
1129 void
1130 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1131 {
1132   g_return_if_fail (GST_IS_BASE_SINK (sink));
1133
1134   GST_OBJECT_LOCK (sink);
1135   sink->priv->blocksize = blocksize;
1136   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1137   GST_OBJECT_UNLOCK (sink);
1138 }
1139
1140 /**
1141  * gst_base_sink_get_blocksize:
1142  * @sink: a #GstBaseSink
1143  *
1144  * Get the number of bytes that the sink will pull when it is operating in pull
1145  * mode.
1146  *
1147  * Returns: the number of bytes @sink will pull in pull mode.
1148  *
1149  * Since: 0.10.22
1150  */
1151 guint
1152 gst_base_sink_get_blocksize (GstBaseSink * sink)
1153 {
1154   guint res;
1155
1156   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1157
1158   GST_OBJECT_LOCK (sink);
1159   res = sink->priv->blocksize;
1160   GST_OBJECT_UNLOCK (sink);
1161
1162   return res;
1163 }
1164
1165 static void
1166 gst_base_sink_set_property (GObject * object, guint prop_id,
1167     const GValue * value, GParamSpec * pspec)
1168 {
1169   GstBaseSink *sink = GST_BASE_SINK (object);
1170
1171   switch (prop_id) {
1172     case PROP_PREROLL_QUEUE_LEN:
1173       /* preroll lock necessary to serialize with finish_preroll */
1174       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1175       sink->preroll_queue_max_len = g_value_get_uint (value);
1176       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1177       break;
1178     case PROP_SYNC:
1179       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1180       break;
1181     case PROP_MAX_LATENESS:
1182       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1183       break;
1184     case PROP_QOS:
1185       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1186       break;
1187     case PROP_ASYNC:
1188       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1189       break;
1190     case PROP_TS_OFFSET:
1191       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1192       break;
1193     case PROP_BLOCKSIZE:
1194       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1195       break;
1196     case PROP_RENDER_DELAY:
1197       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1198       break;
1199     default:
1200       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1201       break;
1202   }
1203 }
1204
1205 static void
1206 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1207     GParamSpec * pspec)
1208 {
1209   GstBaseSink *sink = GST_BASE_SINK (object);
1210
1211   switch (prop_id) {
1212     case PROP_PREROLL_QUEUE_LEN:
1213       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1214       g_value_set_uint (value, sink->preroll_queue_max_len);
1215       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1216       break;
1217     case PROP_SYNC:
1218       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1219       break;
1220     case PROP_MAX_LATENESS:
1221       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1222       break;
1223     case PROP_QOS:
1224       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1225       break;
1226     case PROP_ASYNC:
1227       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1228       break;
1229     case PROP_TS_OFFSET:
1230       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1231       break;
1232     case PROP_LAST_BUFFER:
1233       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1234       break;
1235     case PROP_BLOCKSIZE:
1236       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1237       break;
1238     case PROP_RENDER_DELAY:
1239       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1240       break;
1241     default:
1242       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1243       break;
1244   }
1245 }
1246
1247
1248 static GstCaps *
1249 gst_base_sink_get_caps (GstBaseSink * sink)
1250 {
1251   return NULL;
1252 }
1253
1254 static gboolean
1255 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1256 {
1257   return TRUE;
1258 }
1259
1260 static GstFlowReturn
1261 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1262     GstCaps * caps, GstBuffer ** buf)
1263 {
1264   *buf = NULL;
1265   return GST_FLOW_OK;
1266 }
1267
1268 /* with PREROLL_LOCK, STREAM_LOCK */
1269 static void
1270 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1271 {
1272   GstMiniObject *obj;
1273
1274   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1275   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1276     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1277     gst_mini_object_unref (obj);
1278   }
1279   /* we can't have EOS anymore now */
1280   basesink->eos = FALSE;
1281   basesink->priv->received_eos = FALSE;
1282   basesink->have_preroll = FALSE;
1283   basesink->priv->step_unlock = FALSE;
1284   basesink->eos_queued = FALSE;
1285   basesink->preroll_queued = 0;
1286   basesink->buffers_queued = 0;
1287   basesink->events_queued = 0;
1288   /* can't report latency anymore until we preroll again */
1289   if (basesink->priv->async_enabled) {
1290     GST_OBJECT_LOCK (basesink);
1291     basesink->priv->have_latency = FALSE;
1292     GST_OBJECT_UNLOCK (basesink);
1293   }
1294   /* and signal any waiters now */
1295   GST_PAD_PREROLL_SIGNAL (pad);
1296 }
1297
1298 /* with STREAM_LOCK, configures given segment with the event information. */
1299 static void
1300 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1301     GstEvent * event, GstSegment * segment)
1302 {
1303   gboolean update;
1304   gdouble rate, arate;
1305   GstFormat format;
1306   gint64 start;
1307   gint64 stop;
1308   gint64 time;
1309
1310   /* the newsegment event is needed to bring the buffer timestamps to the
1311    * stream time and to drop samples outside of the playback segment. */
1312   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1313       &start, &stop, &time);
1314
1315   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1316    * We protect with the OBJECT_LOCK so that we can use the values to
1317    * safely answer a POSITION query. */
1318   GST_OBJECT_LOCK (basesink);
1319   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1320       stop, time);
1321
1322   if (format == GST_FORMAT_TIME) {
1323     GST_DEBUG_OBJECT (basesink,
1324         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1325         "format GST_FORMAT_TIME, "
1326         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1327         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1328         update, rate, arate, GST_TIME_ARGS (segment->start),
1329         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1330         GST_TIME_ARGS (segment->accum));
1331   } else {
1332     GST_DEBUG_OBJECT (basesink,
1333         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1334         "format %d, "
1335         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1336         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1337         segment->format, segment->start, segment->stop, segment->time,
1338         segment->accum);
1339   }
1340   GST_OBJECT_UNLOCK (basesink);
1341 }
1342
1343 /* with PREROLL_LOCK, STREAM_LOCK */
1344 static gboolean
1345 gst_base_sink_commit_state (GstBaseSink * basesink)
1346 {
1347   /* commit state and proceed to next pending state */
1348   GstState current, next, pending, post_pending;
1349   gboolean post_paused = FALSE;
1350   gboolean post_async_done = FALSE;
1351   gboolean post_playing = FALSE;
1352
1353   /* we are certainly not playing async anymore now */
1354   basesink->playing_async = FALSE;
1355
1356   GST_OBJECT_LOCK (basesink);
1357   current = GST_STATE (basesink);
1358   next = GST_STATE_NEXT (basesink);
1359   pending = GST_STATE_PENDING (basesink);
1360   post_pending = pending;
1361
1362   switch (pending) {
1363     case GST_STATE_PLAYING:
1364     {
1365       GstBaseSinkClass *bclass;
1366       GstStateChangeReturn ret;
1367
1368       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1369
1370       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1371
1372       basesink->need_preroll = FALSE;
1373       post_async_done = TRUE;
1374       basesink->priv->commited = TRUE;
1375       post_playing = TRUE;
1376       /* post PAUSED too when we were READY */
1377       if (current == GST_STATE_READY) {
1378         post_paused = TRUE;
1379       }
1380
1381       /* make sure we notify the subclass of async playing */
1382       if (bclass->async_play) {
1383         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1384         ret = bclass->async_play (basesink);
1385         if (ret == GST_STATE_CHANGE_FAILURE)
1386           goto async_failed;
1387       }
1388       break;
1389     }
1390     case GST_STATE_PAUSED:
1391       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1392       post_paused = TRUE;
1393       post_async_done = TRUE;
1394       basesink->priv->commited = TRUE;
1395       post_pending = GST_STATE_VOID_PENDING;
1396       break;
1397     case GST_STATE_READY:
1398     case GST_STATE_NULL:
1399       goto stopping;
1400     case GST_STATE_VOID_PENDING:
1401       goto nothing_pending;
1402     default:
1403       break;
1404   }
1405
1406   /* we can report latency queries now */
1407   basesink->priv->have_latency = TRUE;
1408
1409   GST_STATE (basesink) = pending;
1410   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1411   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1412   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1413   GST_OBJECT_UNLOCK (basesink);
1414
1415   if (post_paused) {
1416     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1417     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1418         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1419             current, next, post_pending));
1420   }
1421   if (post_async_done) {
1422     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1423     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1424         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1425   }
1426   if (post_playing) {
1427     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1428     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1429         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1430             next, pending, GST_STATE_VOID_PENDING));
1431   }
1432
1433   GST_STATE_BROADCAST (basesink);
1434
1435   return TRUE;
1436
1437 nothing_pending:
1438   {
1439     /* Depending on the state, set our vars. We get in this situation when the
1440      * state change function got a change to update the state vars before the
1441      * streaming thread did. This is fine but we need to make sure that we
1442      * update the need_preroll var since it was TRUE when we got here and might
1443      * become FALSE if we got to PLAYING. */
1444     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1445         gst_element_state_get_name (current));
1446     switch (current) {
1447       case GST_STATE_PLAYING:
1448         basesink->need_preroll = FALSE;
1449         break;
1450       case GST_STATE_PAUSED:
1451         basesink->need_preroll = TRUE;
1452         break;
1453       default:
1454         basesink->need_preroll = FALSE;
1455         basesink->flushing = TRUE;
1456         break;
1457     }
1458     /* we can report latency queries now */
1459     basesink->priv->have_latency = TRUE;
1460     GST_OBJECT_UNLOCK (basesink);
1461     return TRUE;
1462   }
1463 stopping:
1464   {
1465     /* app is going to READY */
1466     GST_DEBUG_OBJECT (basesink, "stopping");
1467     basesink->need_preroll = FALSE;
1468     basesink->flushing = TRUE;
1469     GST_OBJECT_UNLOCK (basesink);
1470     return FALSE;
1471   }
1472 async_failed:
1473   {
1474     GST_DEBUG_OBJECT (basesink, "async commit failed");
1475     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1476     GST_OBJECT_UNLOCK (basesink);
1477     return FALSE;
1478   }
1479 }
1480
1481 static void
1482 start_stepping (GstBaseSink * sink, GstSegment * segment,
1483     GstStepInfo * pending, GstStepInfo * current)
1484 {
1485   GST_DEBUG_OBJECT (sink, "update pending step");
1486   memcpy (current, pending, sizeof (GstStepInfo));
1487   pending->valid = FALSE;
1488
1489   /* get the running time of where we paused and remember it */
1490   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1491   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1492
1493   /* set the new rate for the remainder of the segment */
1494   current->start_rate = segment->rate;
1495   /* no rate yet, it only works in playing */
1496   /* segment->rate *= current->rate; 
1497    * segment->abs_rate = ABS (segment->rate);
1498    * */
1499
1500   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1501       GST_TIME_ARGS (current->start));
1502
1503   if (current->amount == -1) {
1504     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1505     current->valid = FALSE;
1506   } else {
1507     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1508         "rate: %f", current->amount, gst_format_get_name (current->format),
1509         current->rate);
1510   }
1511 }
1512
1513 static void
1514 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1515     GstStepInfo * current, guint64 cstart, guint64 cstop, gint64 * rstart,
1516     gint64 * rstop)
1517 {
1518   gint64 stop;
1519   GstMessage *message;
1520
1521   GST_DEBUG_OBJECT (sink, "step complete");
1522
1523   if (segment->rate > 0.0)
1524     stop = *rstart;
1525   else
1526     stop = *rstop;
1527
1528   GST_DEBUG_OBJECT (sink,
1529       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1530
1531   if (stop == -1)
1532     current->duration = current->position;
1533   else
1534     current->duration = stop - current->start;
1535
1536   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1537       GST_TIME_ARGS (current->duration));
1538
1539   /* now move the segment to the new running time */
1540   gst_segment_set_running_time (segment, GST_FORMAT_TIME,
1541       current->start + current->duration);
1542   /* restore the previous rate */
1543   segment->rate = current->start_rate;
1544   segment->abs_rate = ABS (segment->rate);
1545   /* and remove the accumulated time we stepped */
1546   segment->accum = current->start;
1547
1548   /* the clip segment is used for position report in paused... */
1549   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1550
1551   message =
1552       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1553       current->amount, current->rate, current->flush, current->intermediate,
1554       current->duration);
1555   gst_message_set_seqnum (message, current->seqnum);
1556   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1557
1558   if (!current->intermediate)
1559     sink->need_preroll = current->need_preroll;
1560
1561   /* and the current step info finished and becomes invalid */
1562   current->valid = FALSE;
1563 }
1564
1565 static gboolean
1566 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1567     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1568     gint64 * rstop)
1569 {
1570   GstBaseSinkPrivate *priv;
1571   gboolean step_end = FALSE;
1572
1573   priv = sink->priv;
1574
1575   /* see if we need to skip this buffer because of stepping */
1576   switch (current->format) {
1577     case GST_FORMAT_TIME:
1578     {
1579       guint64 end;
1580       gint64 first, last;
1581
1582       if (segment->rate > 0.0) {
1583         first = *rstart;
1584         last = *rstop;
1585       } else {
1586         first = *rstop;
1587         last = *rstart;
1588       }
1589
1590       end = current->start + current->amount;
1591       current->position = first - current->start;
1592
1593       GST_DEBUG_OBJECT (sink,
1594           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1595           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1596       GST_DEBUG_OBJECT (sink,
1597           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1598           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1599           GST_TIME_ARGS (last - current->start),
1600           GST_TIME_ARGS (current->amount));
1601
1602       if (current->position >= current->amount || last >= end) {
1603         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1604         step_end = TRUE;
1605         if (segment->rate > 0.0) {
1606           *rstart = end;
1607           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1608         } else {
1609           *rstop = end;
1610           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1611         }
1612       }
1613       GST_DEBUG_OBJECT (sink,
1614           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1615           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1616       GST_DEBUG_OBJECT (sink,
1617           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1618           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1619       break;
1620     }
1621     case GST_FORMAT_BUFFERS:
1622       GST_DEBUG_OBJECT (sink,
1623           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1624           current->position, current->amount);
1625
1626       if (current->position < current->amount) {
1627         current->position++;
1628       } else {
1629         step_end = TRUE;
1630       }
1631       break;
1632     case GST_FORMAT_DEFAULT:
1633     default:
1634       GST_DEBUG_OBJECT (sink,
1635           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1636           current->position, current->amount);
1637       break;
1638   }
1639   return step_end;
1640 }
1641
1642 /* with STREAM_LOCK, PREROLL_LOCK
1643  *
1644  * Returns TRUE if the object needs synchronisation and takes therefore
1645  * part in prerolling.
1646  *
1647  * rsstart/rsstop contain the start/stop in stream time.
1648  * rrstart/rrstop contain the start/stop in running time.
1649  */
1650 static gboolean
1651 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1652     GstClockTime * rsstart, GstClockTime * rsstop,
1653     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1654     gboolean * stepped, GstSegment * segment, GstStepInfo * step)
1655 {
1656   GstBaseSinkClass *bclass;
1657   GstBuffer *buffer;
1658   GstClockTime start, stop;     /* raw start/stop timestamps */
1659   gint64 cstart, cstop;         /* clipped raw timestamps */
1660   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1661   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1662   GstFormat format;
1663   GstBaseSinkPrivate *priv;
1664   gboolean step_end;
1665
1666   priv = basesink->priv;
1667
1668   /* start with nothing */
1669   start = stop = -1;
1670
1671   step_end = FALSE;
1672
1673   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1674     GstEvent *event = GST_EVENT_CAST (obj);
1675
1676     switch (GST_EVENT_TYPE (event)) {
1677         /* EOS event needs syncing */
1678       case GST_EVENT_EOS:
1679       {
1680         if (basesink->segment.rate >= 0.0) {
1681           sstart = sstop = priv->current_sstop;
1682           if (sstart == -1) {
1683             /* we have not seen a buffer yet, use the segment values */
1684             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1685                 basesink->segment.format, basesink->segment.stop);
1686           }
1687         } else {
1688           sstart = sstop = priv->current_sstart;
1689           if (sstart == -1) {
1690             /* we have not seen a buffer yet, use the segment values */
1691             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1692                 basesink->segment.format, basesink->segment.start);
1693           }
1694         }
1695
1696         rstart = rstop = priv->eos_rtime;
1697         *do_sync = rstart != -1;
1698         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1699             GST_TIME_ARGS (rstart));
1700         /* if we are stepping, we end now */
1701         step_end = step->valid;
1702         goto eos_done;
1703       }
1704       default:
1705         /* other events do not need syncing */
1706         /* FIXME, maybe NEWSEGMENT might need synchronisation
1707          * since the POSITION query depends on accumulated times and
1708          * we cannot accumulate the current segment before the previous
1709          * one completed.
1710          */
1711         return FALSE;
1712     }
1713   }
1714
1715   /* else do buffer sync code */
1716   buffer = GST_BUFFER_CAST (obj);
1717
1718   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1719
1720   /* just get the times to see if we need syncing, if the start returns -1 we
1721    * don't sync. */
1722   if (bclass->get_times)
1723     bclass->get_times (basesink, buffer, &start, &stop);
1724
1725   if (start == -1) {
1726     /* we don't need to sync but we still want to get the timestamps for
1727      * tracking the position */
1728     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1729     *do_sync = FALSE;
1730   } else {
1731     *do_sync = TRUE;
1732   }
1733
1734   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1735       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1736       GST_TIME_ARGS (stop), *do_sync);
1737
1738   /* collect segment and format for code clarity */
1739   format = segment->format;
1740
1741   /* no timestamp clipping if we did not get a TIME segment format */
1742   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1743     cstart = start;
1744     cstop = stop;
1745     /* do running and stream time in TIME format */
1746     format = GST_FORMAT_TIME;
1747     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1748     goto do_times;
1749   }
1750
1751   /* clip, only when we know about time */
1752   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1753               (gint64) start, (gint64) stop, &cstart, &cstop)))
1754     goto out_of_segment;
1755
1756   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1757     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1758         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1759         GST_TIME_ARGS (cstop));
1760   }
1761
1762   /* set last stop position */
1763   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1764     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1765   else
1766     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1767
1768 do_times:
1769   rstart = gst_segment_to_running_time (segment, format, cstart);
1770   rstop = gst_segment_to_running_time (segment, format, cstop);
1771
1772   if (G_UNLIKELY (step->valid)) {
1773     if (!(step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1774                 &rstart, &rstop)))
1775       *stepped = TRUE;
1776   }
1777   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1778    * upstream is behaving very badly */
1779   sstart = gst_segment_to_stream_time (segment, format, cstart);
1780   sstop = gst_segment_to_stream_time (segment, format, cstop);
1781
1782 eos_done:
1783   /* done label only called when doing EOS, we also stop stepping then */
1784   if (step_end)
1785     stop_stepping (basesink, segment, step, cstart, cstop, &rstart, &rstop);
1786
1787   /* save times */
1788   *rsstart = sstart;
1789   *rsstop = sstop;
1790   *rrstart = rstart;
1791   *rrstop = rstop;
1792
1793   /* buffers and EOS always need syncing and preroll */
1794   return TRUE;
1795
1796   /* special cases */
1797 out_of_segment:
1798   {
1799     /* we usually clip in the chain function already but stepping could cause
1800      * the segment to be updated later. we return FALSE so that we don't try
1801      * to sync on it. */
1802     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1803     return FALSE;
1804   }
1805 }
1806
1807 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1808  * adjust a timestamp with the latency and timestamp offset */
1809 static GstClockTime
1810 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1811 {
1812   GstClockTimeDiff ts_offset;
1813
1814   /* don't do anything funny with invalid timestamps */
1815   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1816     return time;
1817
1818   time += basesink->priv->latency;
1819
1820   /* apply offset, be carefull for underflows */
1821   ts_offset = basesink->priv->ts_offset;
1822   if (ts_offset < 0) {
1823     ts_offset = -ts_offset;
1824     if (ts_offset < time)
1825       time -= ts_offset;
1826     else
1827       time = 0;
1828   } else
1829     time += ts_offset;
1830
1831   return time;
1832 }
1833
1834 /**
1835  * gst_base_sink_wait_clock:
1836  * @sink: the sink
1837  * @time: the running_time to be reached
1838  * @jitter: the jitter to be filled with time diff (can be NULL)
1839  *
1840  * This function will block until @time is reached. It is usually called by
1841  * subclasses that use their own internal synchronisation.
1842  *
1843  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1844  * returned. Likewise, if synchronisation is disabled in the element or there
1845  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1846  *
1847  * This function should only be called with the PREROLL_LOCK held, like when
1848  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1849  * the ::render vmethod.
1850  *
1851  * The @time argument should be the running_time of when this method should
1852  * return and is not adjusted with any latency or offset configured in the
1853  * sink.
1854  *
1855  * Since 0.10.20
1856  *
1857  * Returns: #GstClockReturn
1858  */
1859 GstClockReturn
1860 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1861     GstClockTimeDiff * jitter)
1862 {
1863   GstClockID id;
1864   GstClockReturn ret;
1865   GstClock *clock;
1866
1867   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1868     goto invalid_time;
1869
1870   GST_OBJECT_LOCK (sink);
1871   if (G_UNLIKELY (!sink->sync))
1872     goto no_sync;
1873
1874   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1875     goto no_clock;
1876
1877   /* add base_time to running_time to get the time against the clock */
1878   time += GST_ELEMENT_CAST (sink)->base_time;
1879
1880   id = gst_clock_new_single_shot_id (clock, time);
1881   GST_OBJECT_UNLOCK (sink);
1882
1883   /* A blocking wait is performed on the clock. We save the ClockID
1884    * so we can unlock the entry at any time. While we are blocking, we 
1885    * release the PREROLL_LOCK so that other threads can interrupt the
1886    * entry. */
1887   sink->clock_id = id;
1888   /* release the preroll lock while waiting */
1889   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1890
1891   ret = gst_clock_id_wait (id, jitter);
1892
1893   GST_PAD_PREROLL_LOCK (sink->sinkpad);
1894   gst_clock_id_unref (id);
1895   sink->clock_id = NULL;
1896
1897   return ret;
1898
1899   /* no syncing needed */
1900 invalid_time:
1901   {
1902     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
1903     return GST_CLOCK_BADTIME;
1904   }
1905 no_sync:
1906   {
1907     GST_DEBUG_OBJECT (sink, "sync disabled");
1908     GST_OBJECT_UNLOCK (sink);
1909     return GST_CLOCK_BADTIME;
1910   }
1911 no_clock:
1912   {
1913     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
1914     GST_OBJECT_UNLOCK (sink);
1915     return GST_CLOCK_BADTIME;
1916   }
1917 }
1918
1919 /**
1920  * gst_base_sink_wait_preroll:
1921  * @sink: the sink
1922  *
1923  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1924  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1925  * this method before continuing to render the remaining data.
1926  *
1927  * This function will block until a state change to PLAYING happens (in which
1928  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1929  * to a state change to READY or a FLUSH event (in which case this function
1930  * returns #GST_FLOW_WRONG_STATE).
1931  *
1932  * This function should only be called with the PREROLL_LOCK held, like in the
1933  * render function.
1934  *
1935  * Since: 0.10.11
1936  *
1937  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1938  * continue. Any other return value should be returned from the render vmethod.
1939  */
1940 GstFlowReturn
1941 gst_base_sink_wait_preroll (GstBaseSink * sink)
1942 {
1943   sink->have_preroll = TRUE;
1944   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1945   /* block until the state changes, or we get a flush, or something */
1946   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1947   sink->have_preroll = FALSE;
1948   if (G_UNLIKELY (sink->flushing))
1949     goto stopping;
1950   if (G_UNLIKELY (sink->priv->step_unlock))
1951     goto step_unlocked;
1952   GST_DEBUG_OBJECT (sink, "continue after preroll");
1953
1954   return GST_FLOW_OK;
1955
1956   /* ERRORS */
1957 stopping:
1958   {
1959     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
1960     return GST_FLOW_WRONG_STATE;
1961   }
1962 step_unlocked:
1963   {
1964     sink->priv->step_unlock = FALSE;
1965     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
1966     return GST_FLOW_STEP;
1967   }
1968 }
1969
1970 /**
1971  * gst_base_sink_do_preroll:
1972  * @sink: the sink
1973  * @obj: the object that caused the preroll
1974  *
1975  * If the @sink spawns its own thread for pulling buffers from upstream it
1976  * should call this method after it has pulled a buffer. If the element needed
1977  * to preroll, this function will perform the preroll and will then block
1978  * until the element state is changed.
1979  *
1980  * This function should be called with the PREROLL_LOCK held.
1981  *
1982  * Since 0.10.22
1983  *
1984  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1985  * continue. Any other return value should be returned from the render vmethod.
1986  */
1987 GstFlowReturn
1988 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
1989 {
1990   GstFlowReturn ret;
1991
1992   while (G_UNLIKELY (sink->need_preroll)) {
1993     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
1994
1995     ret = gst_base_sink_preroll_object (sink, obj);
1996     if (ret != GST_FLOW_OK)
1997       goto preroll_failed;
1998
1999     /* need to recheck here because the commit state could have
2000      * made us not need the preroll anymore */
2001     if (G_LIKELY (sink->need_preroll)) {
2002       /* block until the state changes, or we get a flush, or something */
2003       ret = gst_base_sink_wait_preroll (sink);
2004       if (ret != GST_FLOW_OK) {
2005         if (ret == GST_FLOW_STEP)
2006           ret = GST_FLOW_OK;
2007         else
2008           goto preroll_failed;
2009       }
2010     }
2011   }
2012   return GST_FLOW_OK;
2013
2014   /* ERRORS */
2015 preroll_failed:
2016   {
2017     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2018     return ret;
2019   }
2020 }
2021
2022 /**
2023  * gst_base_sink_wait_eos:
2024  * @sink: the sink
2025  * @time: the running_time to be reached
2026  * @jitter: the jitter to be filled with time diff (can be NULL)
2027  *
2028  * This function will block until @time is reached. It is usually called by
2029  * subclasses that use their own internal synchronisation but want to let the
2030  * EOS be handled by the base class.
2031  *
2032  * This function should only be called with the PREROLL_LOCK held, like when
2033  * receiving an EOS event in the ::event vmethod.
2034  *
2035  * The @time argument should be the running_time of when the EOS should happen
2036  * and will be adjusted with any latency and offset configured in the sink.
2037  *
2038  * Since 0.10.15
2039  *
2040  * Returns: #GstFlowReturn
2041  */
2042 GstFlowReturn
2043 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2044     GstClockTimeDiff * jitter)
2045 {
2046   GstClockReturn status;
2047   GstFlowReturn ret;
2048
2049   do {
2050     GstClockTime stime;
2051
2052     GST_DEBUG_OBJECT (sink, "checking preroll");
2053
2054     /* first wait for the playing state before we can continue */
2055     if (G_UNLIKELY (sink->need_preroll)) {
2056       ret = gst_base_sink_wait_preroll (sink);
2057       if (ret != GST_FLOW_OK) {
2058         if (ret == GST_FLOW_STEP)
2059           ret = GST_FLOW_OK;
2060         else
2061           goto flushing;
2062       }
2063     }
2064
2065     /* preroll done, we can sync since we are in PLAYING now. */
2066     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2067         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2068
2069     /* compensate for latency and ts_offset. We don't adjust for render delay
2070      * because we don't interact with the device on EOS normally. */
2071     stime = gst_base_sink_adjust_time (sink, time);
2072
2073     /* wait for the clock, this can be interrupted because we got shut down or 
2074      * we PAUSED. */
2075     status = gst_base_sink_wait_clock (sink, stime, jitter);
2076
2077     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2078
2079     /* invalid time, no clock or sync disabled, just continue then */
2080     if (status == GST_CLOCK_BADTIME)
2081       break;
2082
2083     /* waiting could have been interrupted and we can be flushing now */
2084     if (G_UNLIKELY (sink->flushing))
2085       goto flushing;
2086
2087     /* retry if we got unscheduled, which means we did not reach the timeout
2088      * yet. if some other error occures, we continue. */
2089   } while (status == GST_CLOCK_UNSCHEDULED);
2090
2091   GST_DEBUG_OBJECT (sink, "end of stream");
2092
2093   return GST_FLOW_OK;
2094
2095   /* ERRORS */
2096 flushing:
2097   {
2098     GST_DEBUG_OBJECT (sink, "we are flushing");
2099     return GST_FLOW_WRONG_STATE;
2100   }
2101 }
2102
2103 /* with STREAM_LOCK, PREROLL_LOCK
2104  *
2105  * Make sure we are in PLAYING and synchronize an object to the clock.
2106  *
2107  * If we need preroll, we are not in PLAYING. We try to commit the state
2108  * if needed and then block if we still are not PLAYING.
2109  *
2110  * We start waiting on the clock in PLAYING. If we got interrupted, we
2111  * immediatly try to re-preroll.
2112  *
2113  * Some objects do not need synchronisation (most events) and so this function
2114  * immediatly returns GST_FLOW_OK.
2115  *
2116  * for objects that arrive later than max-lateness to be synchronized to the 
2117  * clock have the @late boolean set to TRUE.
2118  *
2119  * This function keeps a running average of the jitter (the diff between the
2120  * clock time and the requested sync time). The jitter is negative for
2121  * objects that arrive in time and positive for late buffers.
2122  *
2123  * does not take ownership of obj.
2124  */
2125 static GstFlowReturn
2126 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2127     GstMiniObject * obj, gboolean * late)
2128 {
2129   GstClockTimeDiff jitter;
2130   gboolean syncable;
2131   GstClockReturn status = GST_CLOCK_OK;
2132   GstClockTime rstart, rstop, sstart, sstop, stime;
2133   gboolean do_sync;
2134   GstBaseSinkPrivate *priv;
2135   GstFlowReturn ret;
2136   GstStepInfo *current, *pending;
2137   gboolean stepped;
2138
2139   priv = basesink->priv;
2140
2141 do_step:
2142   sstart = sstop = rstart = rstop = -1;
2143   do_sync = TRUE;
2144   stepped = FALSE;
2145
2146   priv->current_rstart = -1;
2147
2148   /* get stepping info */
2149   current = &priv->current_step;
2150   pending = &priv->pending_step;
2151
2152   /* get timing information for this object against the render segment */
2153   syncable = gst_base_sink_get_sync_times (basesink, obj,
2154       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2155       current);
2156
2157   if (G_UNLIKELY (stepped))
2158     goto step_skipped;
2159
2160   /* a syncable object needs to participate in preroll and
2161    * clocking. All buffers and EOS are syncable. */
2162   if (G_UNLIKELY (!syncable))
2163     goto not_syncable;
2164
2165   /* store timing info for current object */
2166   priv->current_rstart = rstart;
2167   priv->current_rstop = (rstop != -1 ? rstop : rstart);
2168
2169   /* save sync time for eos when the previous object needed sync */
2170   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
2171
2172 again:
2173   /* first do preroll, this makes sure we commit our state
2174    * to PAUSED and can continue to PLAYING. We cannot perform
2175    * any clock sync in PAUSED because there is no clock. */
2176   ret = gst_base_sink_do_preroll (basesink, obj);
2177   if (G_UNLIKELY (ret != GST_FLOW_OK))
2178     goto preroll_failed;
2179
2180   /* After rendering we store the position of the last buffer so that we can use
2181    * it to report the position. We need to take the lock here. */
2182   GST_OBJECT_LOCK (basesink);
2183   priv->current_sstart = sstart;
2184   priv->current_sstop = (sstop != -1 ? sstop : sstart);
2185   GST_OBJECT_UNLOCK (basesink);
2186
2187   /* update the segment with a pending step if the current one is invalid and we
2188    * have a new pending one. We only accept new step updates after a preroll */
2189   if (G_UNLIKELY (pending->valid && !current->valid)) {
2190     start_stepping (basesink, &basesink->segment, pending, current);
2191     goto do_step;
2192   }
2193
2194   if (!do_sync)
2195     goto done;
2196
2197   /* adjust for latency */
2198   stime = gst_base_sink_adjust_time (basesink, rstart);
2199
2200   /* adjust for render-delay, avoid underflows */
2201   if (stime != -1) {
2202     if (stime > priv->render_delay)
2203       stime -= priv->render_delay;
2204     else
2205       stime = 0;
2206   }
2207
2208   /* preroll done, we can sync since we are in PLAYING now. */
2209   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2210       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2211       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2212
2213   /* This function will return immediatly if start == -1, no clock
2214    * or sync is disabled with GST_CLOCK_BADTIME. */
2215   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2216
2217   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
2218
2219   /* invalid time, no clock or sync disabled, just render */
2220   if (status == GST_CLOCK_BADTIME)
2221     goto done;
2222
2223   /* waiting could have been interrupted and we can be flushing now */
2224   if (G_UNLIKELY (basesink->flushing))
2225     goto flushing;
2226
2227   /* check for unlocked by a state change, we are not flushing so
2228    * we can try to preroll on the current buffer. */
2229   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2230     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2231     priv->call_preroll = TRUE;
2232     goto again;
2233   }
2234
2235   /* successful syncing done, record observation */
2236   priv->current_jitter = jitter;
2237
2238   /* check if the object should be dropped */
2239   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2240       status, jitter);
2241
2242 done:
2243   return GST_FLOW_OK;
2244
2245   /* ERRORS */
2246 step_skipped:
2247   {
2248     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2249     *late = TRUE;
2250     return GST_FLOW_OK;
2251   }
2252 not_syncable:
2253   {
2254     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2255     return GST_FLOW_OK;
2256   }
2257 flushing:
2258   {
2259     GST_DEBUG_OBJECT (basesink, "we are flushing");
2260     return GST_FLOW_WRONG_STATE;
2261   }
2262 preroll_failed:
2263   {
2264     GST_DEBUG_OBJECT (basesink, "preroll failed");
2265     return ret;
2266   }
2267 }
2268
2269 static gboolean
2270 gst_base_sink_send_qos (GstBaseSink * basesink,
2271     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2272 {
2273   GstEvent *event;
2274   gboolean res;
2275
2276   /* generate Quality-of-Service event */
2277   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2278       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2279       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2280
2281   event = gst_event_new_qos (proportion, diff, time);
2282
2283   /* send upstream */
2284   res = gst_pad_push_event (basesink->sinkpad, event);
2285
2286   return res;
2287 }
2288
2289 static void
2290 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2291 {
2292   GstBaseSinkPrivate *priv;
2293   GstClockTime start, stop;
2294   GstClockTimeDiff jitter;
2295   GstClockTime pt, entered, left;
2296   GstClockTime duration;
2297   gdouble rate;
2298
2299   priv = sink->priv;
2300
2301   start = priv->current_rstart;
2302
2303   /* if Quality-of-Service disabled, do nothing */
2304   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2305     return;
2306
2307   stop = priv->current_rstop;
2308   jitter = priv->current_jitter;
2309
2310   if (jitter < 0) {
2311     /* this is the time the buffer entered the sink */
2312     if (start < -jitter)
2313       entered = 0;
2314     else
2315       entered = start + jitter;
2316     left = start;
2317   } else {
2318     /* this is the time the buffer entered the sink */
2319     entered = start + jitter;
2320     /* this is the time the buffer left the sink */
2321     left = start + jitter;
2322   }
2323
2324   /* calculate duration of the buffer */
2325   if (stop != -1)
2326     duration = stop - start;
2327   else
2328     duration = -1;
2329
2330   /* if we have the time when the last buffer left us, calculate
2331    * processing time */
2332   if (priv->last_left != -1) {
2333     if (entered > priv->last_left) {
2334       pt = entered - priv->last_left;
2335     } else {
2336       pt = 0;
2337     }
2338   } else {
2339     pt = priv->avg_pt;
2340   }
2341
2342   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2343       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2344       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2345       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2346       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2347       jitter);
2348
2349   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2350       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2351       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2352       priv->avg_rate);
2353
2354   /* collect running averages. for first observations, we copy the
2355    * values */
2356   if (priv->avg_duration == -1)
2357     priv->avg_duration = duration;
2358   else
2359     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2360
2361   if (priv->avg_pt == -1)
2362     priv->avg_pt = pt;
2363   else
2364     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2365
2366   if (priv->avg_duration != 0)
2367     rate =
2368         gst_guint64_to_gdouble (priv->avg_pt) /
2369         gst_guint64_to_gdouble (priv->avg_duration);
2370   else
2371     rate = 0.0;
2372
2373   if (priv->last_left != -1) {
2374     if (dropped || priv->avg_rate < 0.0) {
2375       priv->avg_rate = rate;
2376     } else {
2377       if (rate > 1.0)
2378         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2379       else
2380         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2381     }
2382   }
2383
2384   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2385       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2386       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2387       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2388
2389
2390   if (priv->avg_rate >= 0.0) {
2391     /* if we have a valid rate, start sending QoS messages */
2392     if (priv->current_jitter < 0) {
2393       /* make sure we never go below 0 when adding the jitter to the
2394        * timestamp. */
2395       if (priv->current_rstart < -priv->current_jitter)
2396         priv->current_jitter = -priv->current_rstart;
2397     }
2398     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2399         priv->current_jitter);
2400   }
2401
2402   /* record when this buffer will leave us */
2403   priv->last_left = left;
2404 }
2405
2406 /* reset all qos measuring */
2407 static void
2408 gst_base_sink_reset_qos (GstBaseSink * sink)
2409 {
2410   GstBaseSinkPrivate *priv;
2411
2412   priv = sink->priv;
2413
2414   priv->last_in_time = -1;
2415   priv->last_left = -1;
2416   priv->avg_duration = -1;
2417   priv->avg_pt = -1;
2418   priv->avg_rate = -1.0;
2419   priv->avg_render = -1;
2420   priv->rendered = 0;
2421   priv->dropped = 0;
2422
2423 }
2424
2425 /* Checks if the object was scheduled too late.
2426  *
2427  * start/stop contain the raw timestamp start and stop values
2428  * of the object.
2429  *
2430  * status and jitter contain the return values from the clock wait.
2431  *
2432  * returns TRUE if the buffer was too late.
2433  */
2434 static gboolean
2435 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2436     GstClockTime start, GstClockTime stop,
2437     GstClockReturn status, GstClockTimeDiff jitter)
2438 {
2439   gboolean late;
2440   gint64 max_lateness;
2441   GstBaseSinkPrivate *priv;
2442
2443   priv = basesink->priv;
2444
2445   late = FALSE;
2446
2447   /* only for objects that were too late */
2448   if (G_LIKELY (status != GST_CLOCK_EARLY))
2449     goto in_time;
2450
2451   max_lateness = basesink->abidata.ABI.max_lateness;
2452
2453   /* check if frame dropping is enabled */
2454   if (max_lateness == -1)
2455     goto no_drop;
2456
2457   /* only check for buffers */
2458   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2459     goto not_buffer;
2460
2461   /* can't do check if we don't have a timestamp */
2462   if (G_UNLIKELY (start == -1))
2463     goto no_timestamp;
2464
2465   /* we can add a valid stop time */
2466   if (stop != -1)
2467     max_lateness += stop;
2468   else
2469     max_lateness += start;
2470
2471   /* if the jitter bigger than duration and lateness we are too late */
2472   if ((late = start + jitter > max_lateness)) {
2473     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2474         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2475         GST_TIME_ARGS (max_lateness));
2476     /* !!emergency!!, if we did not receive anything valid for more than a 
2477      * second, render it anyway so the user sees something */
2478     if (priv->last_in_time != -1 && start - priv->last_in_time > GST_SECOND) {
2479       late = FALSE;
2480       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2481           (_("A lot of buffers are dropped.")),
2482           ("Maybe there is a timestamp problem or this computer is too slow."));
2483       GST_DEBUG_OBJECT (basesink,
2484           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2485           GST_TIME_ARGS (priv->last_in_time));
2486     }
2487   }
2488
2489 done:
2490   if (!late) {
2491     priv->last_in_time = start;
2492   }
2493   return late;
2494
2495   /* all is fine */
2496 in_time:
2497   {
2498     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2499     goto done;
2500   }
2501 no_drop:
2502   {
2503     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2504     goto done;
2505   }
2506 not_buffer:
2507   {
2508     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2509     return FALSE;
2510   }
2511 no_timestamp:
2512   {
2513     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2514     return FALSE;
2515   }
2516 }
2517
2518 /* called before and after calling the render vmethod. It keeps track of how
2519  * much time was spent in the render method and is used to check if we are
2520  * flooded */
2521 static void
2522 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2523 {
2524   GstBaseSinkPrivate *priv;
2525
2526   priv = basesink->priv;
2527
2528   if (start) {
2529     priv->start = gst_util_get_timestamp ();
2530   } else {
2531     GstClockTime elapsed;
2532
2533     priv->stop = gst_util_get_timestamp ();
2534
2535     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2536
2537     if (priv->avg_render == -1)
2538       priv->avg_render = elapsed;
2539     else
2540       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2541
2542     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2543         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2544   }
2545 }
2546
2547 /* with STREAM_LOCK, PREROLL_LOCK,
2548  *
2549  * Synchronize the object on the clock and then render it.
2550  *
2551  * takes ownership of obj.
2552  */
2553 static GstFlowReturn
2554 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2555     GstMiniObject * obj)
2556 {
2557   GstFlowReturn ret;
2558   GstBaseSinkClass *bclass;
2559   gboolean late;
2560
2561   GstBaseSinkPrivate *priv;
2562
2563   priv = basesink->priv;
2564
2565 again:
2566   late = FALSE;
2567   ret = GST_FLOW_OK;
2568
2569   /* synchronize this object, non syncable objects return OK
2570    * immediatly. */
2571   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2572   if (G_UNLIKELY (ret != GST_FLOW_OK))
2573     goto sync_failed;
2574
2575   /* and now render, event or buffer. */
2576   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2577     GstBuffer *buf;
2578
2579     /* drop late buffers unconditionally, let's hope it's unlikely */
2580     if (G_UNLIKELY (late))
2581       goto dropped;
2582
2583     buf = GST_BUFFER_CAST (obj);
2584
2585     gst_base_sink_set_last_buffer (basesink, buf);
2586
2587     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2588
2589     if (G_LIKELY (bclass->render)) {
2590       gint do_qos;
2591
2592       /* read once, to get same value before and after */
2593       do_qos = g_atomic_int_get (&priv->qos_enabled);
2594
2595       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2596
2597       /* record rendering time for QoS and stats */
2598       if (do_qos)
2599         gst_base_sink_do_render_stats (basesink, TRUE);
2600
2601       ret = bclass->render (basesink, buf);
2602
2603       if (do_qos)
2604         gst_base_sink_do_render_stats (basesink, FALSE);
2605
2606       if (ret == GST_FLOW_STEP)
2607         goto again;
2608
2609       priv->rendered++;
2610     }
2611   } else {
2612     GstEvent *event = GST_EVENT_CAST (obj);
2613     gboolean event_res = TRUE;
2614     GstEventType type;
2615
2616     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2617
2618     type = GST_EVENT_TYPE (event);
2619
2620     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2621         gst_event_type_get_name (type));
2622
2623     if (bclass->event)
2624       event_res = bclass->event (basesink, event);
2625
2626     /* when we get here we could be flushing again when the event handler calls
2627      * _wait_eos(). We have to ignore this object in that case. */
2628     if (G_UNLIKELY (basesink->flushing))
2629       goto flushing;
2630
2631     if (G_LIKELY (event_res)) {
2632       guint32 seqnum;
2633
2634       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2635       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2636
2637       switch (type) {
2638         case GST_EVENT_EOS:
2639         {
2640           GstMessage *message;
2641
2642           /* the EOS event is completely handled so we mark
2643            * ourselves as being in the EOS state. eos is also 
2644            * protected by the object lock so we can read it when 
2645            * answering the POSITION query. */
2646           GST_OBJECT_LOCK (basesink);
2647           basesink->eos = TRUE;
2648           GST_OBJECT_UNLOCK (basesink);
2649
2650           /* ok, now we can post the message */
2651           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2652
2653           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2654           gst_message_set_seqnum (message, seqnum);
2655           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2656           break;
2657         }
2658         case GST_EVENT_NEWSEGMENT:
2659           /* configure the segment */
2660           gst_base_sink_configure_segment (basesink, pad, event,
2661               &basesink->segment);
2662           break;
2663         default:
2664           break;
2665       }
2666     }
2667   }
2668
2669 done:
2670   gst_base_sink_perform_qos (basesink, late);
2671
2672   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2673   gst_mini_object_unref (obj);
2674
2675   return ret;
2676
2677   /* ERRORS */
2678 sync_failed:
2679   {
2680     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2681     goto done;
2682   }
2683 dropped:
2684   {
2685     priv->dropped++;
2686     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2687     goto done;
2688   }
2689 flushing:
2690   {
2691     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2692     gst_mini_object_unref (obj);
2693     return GST_FLOW_WRONG_STATE;
2694   }
2695 }
2696
2697 /* with STREAM_LOCK, PREROLL_LOCK
2698  *
2699  * Perform preroll on the given object. For buffers this means 
2700  * calling the preroll subclass method. 
2701  * If that succeeds, the state will be commited.
2702  *
2703  * function does not take ownership of obj.
2704  */
2705 static GstFlowReturn
2706 gst_base_sink_preroll_object (GstBaseSink * basesink, GstMiniObject * obj)
2707 {
2708   GstFlowReturn ret;
2709
2710   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2711
2712   /* if it's a buffer, we need to call the preroll method */
2713   if (G_LIKELY (GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2714     GstBaseSinkClass *bclass;
2715     GstBuffer *buf;
2716     GstClockTime timestamp;
2717
2718     buf = GST_BUFFER_CAST (obj);
2719     timestamp = GST_BUFFER_TIMESTAMP (buf);
2720
2721     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2722         GST_TIME_ARGS (timestamp));
2723
2724     gst_base_sink_set_last_buffer (basesink, buf);
2725
2726     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2727     if (bclass->preroll)
2728       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2729         goto preroll_failed;
2730
2731     basesink->priv->call_preroll = FALSE;
2732   }
2733
2734   /* commit state */
2735   if (G_LIKELY (basesink->playing_async)) {
2736     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2737       goto stopping;
2738   }
2739
2740   return GST_FLOW_OK;
2741
2742   /* ERRORS */
2743 preroll_failed:
2744   {
2745     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2746     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2747     return ret;
2748   }
2749 stopping:
2750   {
2751     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2752     return GST_FLOW_WRONG_STATE;
2753   }
2754 }
2755
2756 /* with STREAM_LOCK, PREROLL_LOCK 
2757  *
2758  * Queue an object for rendering.
2759  * The first prerollable object queued will complete the preroll. If the
2760  * preroll queue if filled, we render all the objects in the queue.
2761  *
2762  * This function takes ownership of the object.
2763  */
2764 static GstFlowReturn
2765 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2766     GstMiniObject * obj, gboolean prerollable)
2767 {
2768   GstFlowReturn ret = GST_FLOW_OK;
2769   gint length;
2770   GQueue *q;
2771
2772   if (G_UNLIKELY (basesink->need_preroll)) {
2773     if (G_LIKELY (prerollable))
2774       basesink->preroll_queued++;
2775
2776     length = basesink->preroll_queued;
2777
2778     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2779
2780     /* first prerollable item needs to finish the preroll */
2781     if (length == 1) {
2782       ret = gst_base_sink_preroll_object (basesink, obj);
2783       if (G_UNLIKELY (ret != GST_FLOW_OK))
2784         goto preroll_failed;
2785     }
2786     /* need to recheck if we need preroll, commmit state during preroll 
2787      * could have made us not need more preroll. */
2788     if (G_UNLIKELY (basesink->need_preroll)) {
2789       /* see if we can render now, if we can't add the object to the preroll
2790        * queue. */
2791       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2792         goto more_preroll;
2793     }
2794   }
2795
2796   /* we can start rendering (or blocking) the queued object
2797    * if any. */
2798   q = basesink->preroll_queue;
2799   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2800     GstMiniObject *o;
2801
2802     o = g_queue_pop_head (q);
2803     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2804
2805     /* do something with the return value */
2806     ret = gst_base_sink_render_object (basesink, pad, o);
2807     if (ret != GST_FLOW_OK)
2808       goto dequeue_failed;
2809   }
2810
2811   /* now render the object */
2812   ret = gst_base_sink_render_object (basesink, pad, obj);
2813   basesink->preroll_queued = 0;
2814
2815   return ret;
2816
2817   /* special cases */
2818 preroll_failed:
2819   {
2820     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2821         gst_flow_get_name (ret));
2822     gst_mini_object_unref (obj);
2823     return ret;
2824   }
2825 more_preroll:
2826   {
2827     /* add object to the queue and return */
2828     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2829         length, basesink->preroll_queue_max_len);
2830     g_queue_push_tail (basesink->preroll_queue, obj);
2831     return GST_FLOW_OK;
2832   }
2833 dequeue_failed:
2834   {
2835     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2836         gst_flow_get_name (ret));
2837     gst_mini_object_unref (obj);
2838     return ret;
2839   }
2840 }
2841
2842 /* with STREAM_LOCK
2843  *
2844  * This function grabs the PREROLL_LOCK and adds the object to
2845  * the queue.
2846  *
2847  * This function takes ownership of obj.
2848  */
2849 static GstFlowReturn
2850 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2851     GstMiniObject * obj, gboolean prerollable)
2852 {
2853   GstFlowReturn ret;
2854
2855   GST_PAD_PREROLL_LOCK (pad);
2856   if (G_UNLIKELY (basesink->flushing))
2857     goto flushing;
2858
2859   if (G_UNLIKELY (basesink->priv->received_eos))
2860     goto was_eos;
2861
2862   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2863   GST_PAD_PREROLL_UNLOCK (pad);
2864
2865   return ret;
2866
2867   /* ERRORS */
2868 flushing:
2869   {
2870     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2871     GST_PAD_PREROLL_UNLOCK (pad);
2872     gst_mini_object_unref (obj);
2873     return GST_FLOW_WRONG_STATE;
2874   }
2875 was_eos:
2876   {
2877     GST_DEBUG_OBJECT (basesink,
2878         "we are EOS, dropping object, return UNEXPECTED");
2879     GST_PAD_PREROLL_UNLOCK (pad);
2880     gst_mini_object_unref (obj);
2881     return GST_FLOW_UNEXPECTED;
2882   }
2883 }
2884
2885 static void
2886 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2887 {
2888   /* make sure we are not blocked on the clock also clear any pending
2889    * eos state. */
2890   gst_base_sink_set_flushing (basesink, pad, TRUE);
2891
2892   /* we grab the stream lock but that is not needed since setting the
2893    * sink to flushing would make sure no state commit is being done
2894    * anymore */
2895   GST_PAD_STREAM_LOCK (pad);
2896   gst_base_sink_reset_qos (basesink);
2897   if (basesink->priv->async_enabled) {
2898     /* and we need to commit our state again on the next
2899      * prerolled buffer */
2900     basesink->playing_async = TRUE;
2901     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2902   } else {
2903     basesink->priv->have_latency = TRUE;
2904     basesink->need_preroll = FALSE;
2905   }
2906   gst_base_sink_set_last_buffer (basesink, NULL);
2907   GST_PAD_STREAM_UNLOCK (pad);
2908 }
2909
2910 static void
2911 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2912 {
2913   /* unset flushing so we can accept new data, this also flushes out any EOS
2914    * event. */
2915   gst_base_sink_set_flushing (basesink, pad, FALSE);
2916
2917   /* for position reporting */
2918   GST_OBJECT_LOCK (basesink);
2919   basesink->priv->current_sstart = -1;
2920   basesink->priv->current_sstop = -1;
2921   basesink->priv->eos_rtime = -1;
2922   basesink->priv->call_preroll = TRUE;
2923   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2924     /* we need new segment info after the flush. */
2925     basesink->have_newsegment = FALSE;
2926     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2927     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2928   }
2929   GST_OBJECT_UNLOCK (basesink);
2930 }
2931
2932 static gboolean
2933 gst_base_sink_event (GstPad * pad, GstEvent * event)
2934 {
2935   GstBaseSink *basesink;
2936   gboolean result = TRUE;
2937   GstBaseSinkClass *bclass;
2938
2939   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2940
2941   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2942
2943   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2944       GST_EVENT_TYPE_NAME (event));
2945
2946   switch (GST_EVENT_TYPE (event)) {
2947     case GST_EVENT_EOS:
2948     {
2949       GstFlowReturn ret;
2950
2951       GST_PAD_PREROLL_LOCK (pad);
2952       if (G_UNLIKELY (basesink->flushing))
2953         goto flushing;
2954
2955       if (G_UNLIKELY (basesink->priv->received_eos)) {
2956         /* we can't accept anything when we are EOS */
2957         result = FALSE;
2958         gst_event_unref (event);
2959       } else {
2960         /* we set the received EOS flag here so that we can use it when testing if
2961          * we are prerolled and to refuse more buffers. */
2962         basesink->priv->received_eos = TRUE;
2963
2964         /* EOS is a prerollable object, we call the unlocked version because it
2965          * does not check the received_eos flag. */
2966         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2967             GST_MINI_OBJECT_CAST (event), TRUE);
2968         if (G_UNLIKELY (ret != GST_FLOW_OK))
2969           result = FALSE;
2970       }
2971       GST_PAD_PREROLL_UNLOCK (pad);
2972       break;
2973     }
2974     case GST_EVENT_NEWSEGMENT:
2975     {
2976       GstFlowReturn ret;
2977
2978       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2979
2980       GST_PAD_PREROLL_LOCK (pad);
2981       if (G_UNLIKELY (basesink->flushing))
2982         goto flushing;
2983
2984       if (G_UNLIKELY (basesink->priv->received_eos)) {
2985         /* we can't accept anything when we are EOS */
2986         result = FALSE;
2987         gst_event_unref (event);
2988       } else {
2989         /* the new segment is a non prerollable item and does not block anything,
2990          * we need to configure the current clipping segment and insert the event 
2991          * in the queue to serialize it with the buffers for rendering. */
2992         gst_base_sink_configure_segment (basesink, pad, event,
2993             basesink->abidata.ABI.clip_segment);
2994
2995         ret =
2996             gst_base_sink_queue_object_unlocked (basesink, pad,
2997             GST_MINI_OBJECT_CAST (event), FALSE);
2998         if (G_UNLIKELY (ret != GST_FLOW_OK))
2999           result = FALSE;
3000         else {
3001           GST_OBJECT_LOCK (basesink);
3002           basesink->have_newsegment = TRUE;
3003           GST_OBJECT_UNLOCK (basesink);
3004         }
3005       }
3006       GST_PAD_PREROLL_UNLOCK (pad);
3007       break;
3008     }
3009     case GST_EVENT_FLUSH_START:
3010       if (bclass->event)
3011         bclass->event (basesink, event);
3012
3013       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3014
3015       gst_base_sink_flush_start (basesink, pad);
3016
3017       gst_event_unref (event);
3018       break;
3019     case GST_EVENT_FLUSH_STOP:
3020       if (bclass->event)
3021         bclass->event (basesink, event);
3022
3023       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3024
3025       gst_base_sink_flush_stop (basesink, pad);
3026
3027       gst_event_unref (event);
3028       break;
3029     default:
3030       /* other events are sent to queue or subclass depending on if they
3031        * are serialized. */
3032       if (GST_EVENT_IS_SERIALIZED (event)) {
3033         gst_base_sink_queue_object (basesink, pad,
3034             GST_MINI_OBJECT_CAST (event), FALSE);
3035       } else {
3036         if (bclass->event)
3037           bclass->event (basesink, event);
3038         gst_event_unref (event);
3039       }
3040       break;
3041   }
3042 done:
3043   gst_object_unref (basesink);
3044
3045   return result;
3046
3047   /* ERRORS */
3048 flushing:
3049   {
3050     GST_DEBUG_OBJECT (basesink, "we are flushing");
3051     GST_PAD_PREROLL_UNLOCK (pad);
3052     result = FALSE;
3053     gst_event_unref (event);
3054     goto done;
3055   }
3056 }
3057
3058 /* default implementation to calculate the start and end
3059  * timestamps on a buffer, subclasses can override
3060  */
3061 static void
3062 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3063     GstClockTime * start, GstClockTime * end)
3064 {
3065   GstClockTime timestamp, duration;
3066
3067   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3068   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3069
3070     /* get duration to calculate end time */
3071     duration = GST_BUFFER_DURATION (buffer);
3072     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3073       *end = timestamp + duration;
3074     }
3075     *start = timestamp;
3076   }
3077 }
3078
3079 /* must be called with PREROLL_LOCK */
3080 static gboolean
3081 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3082 {
3083   gboolean is_prerolled, res;
3084
3085   /* we have 2 cases where the PREROLL_LOCK is released:
3086    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3087    *  2) we are syncing on the clock
3088    */
3089   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3090   res = !is_prerolled;
3091
3092   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3093       basesink->have_preroll, basesink->priv->received_eos, res);
3094
3095   return res;
3096 }
3097
3098 /* with STREAM_LOCK, PREROLL_LOCK 
3099  *
3100  * Takes a buffer and compare the timestamps with the last segment.
3101  * If the buffer falls outside of the segment boundaries, drop it.
3102  * Else queue the buffer for preroll and rendering.
3103  *
3104  * This function takes ownership of the buffer.
3105  */
3106 static GstFlowReturn
3107 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3108     GstBuffer * buf)
3109 {
3110   GstBaseSinkClass *bclass;
3111   GstFlowReturn result;
3112   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3113   GstSegment *clip_segment;
3114
3115   if (G_UNLIKELY (basesink->flushing))
3116     goto flushing;
3117
3118   if (G_UNLIKELY (basesink->priv->received_eos))
3119     goto was_eos;
3120
3121   /* for code clarity */
3122   clip_segment = basesink->abidata.ABI.clip_segment;
3123
3124   if (G_UNLIKELY (!basesink->have_newsegment)) {
3125     gboolean sync;
3126
3127     sync = gst_base_sink_get_sync (basesink);
3128     if (sync) {
3129       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3130           (_("Internal data flow problem.")),
3131           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3132     }
3133
3134     /* this means this sink will assume timestamps start from 0 */
3135     GST_OBJECT_LOCK (basesink);
3136     clip_segment->start = 0;
3137     clip_segment->stop = -1;
3138     basesink->segment.start = 0;
3139     basesink->segment.stop = -1;
3140     basesink->have_newsegment = TRUE;
3141     GST_OBJECT_UNLOCK (basesink);
3142   }
3143
3144   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3145
3146   /* check if the buffer needs to be dropped, we first ask the subclass for the
3147    * start and end */
3148   if (bclass->get_times)
3149     bclass->get_times (basesink, buf, &start, &end);
3150
3151   if (start == -1) {
3152     /* if the subclass does not want sync, we use our own values so that we at
3153      * least clip the buffer to the segment */
3154     gst_base_sink_get_times (basesink, buf, &start, &end);
3155   }
3156
3157   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3158       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3159
3160   /* a dropped buffer does not participate in anything */
3161   if (GST_CLOCK_TIME_IS_VALID (start) &&
3162       (clip_segment->format == GST_FORMAT_TIME)) {
3163     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3164                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3165       goto out_of_segment;
3166   }
3167
3168   /* now we can process the buffer in the queue, this function takes ownership
3169    * of the buffer */
3170   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3171       GST_MINI_OBJECT_CAST (buf), TRUE);
3172
3173   return result;
3174
3175   /* ERRORS */
3176 flushing:
3177   {
3178     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3179     gst_buffer_unref (buf);
3180     return GST_FLOW_WRONG_STATE;
3181   }
3182 was_eos:
3183   {
3184     GST_DEBUG_OBJECT (basesink,
3185         "we are EOS, dropping object, return UNEXPECTED");
3186     gst_buffer_unref (buf);
3187     return GST_FLOW_UNEXPECTED;
3188   }
3189 out_of_segment:
3190   {
3191     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3192     gst_buffer_unref (buf);
3193     return GST_FLOW_OK;
3194   }
3195 }
3196
3197 /* with STREAM_LOCK
3198  */
3199 static GstFlowReturn
3200 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3201 {
3202   GstBaseSink *basesink;
3203   GstFlowReturn result;
3204
3205   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3206
3207   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3208     goto wrong_mode;
3209
3210   GST_PAD_PREROLL_LOCK (pad);
3211   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3212   GST_PAD_PREROLL_UNLOCK (pad);
3213
3214 done:
3215   return result;
3216
3217   /* ERRORS */
3218 wrong_mode:
3219   {
3220     GST_OBJECT_LOCK (pad);
3221     GST_WARNING_OBJECT (basesink,
3222         "Push on pad %s:%s, but it was not activated in push mode",
3223         GST_DEBUG_PAD_NAME (pad));
3224     GST_OBJECT_UNLOCK (pad);
3225     gst_buffer_unref (buf);
3226     /* we don't post an error message this will signal to the peer
3227      * pushing that EOS is reached. */
3228     result = GST_FLOW_UNEXPECTED;
3229     goto done;
3230   }
3231 }
3232
3233 static gboolean
3234 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3235 {
3236   gboolean res = TRUE;
3237
3238   /* update our offset if the start/stop position was updated */
3239   if (segment->format == GST_FORMAT_BYTES) {
3240     segment->time = segment->start;
3241   } else if (segment->start == 0) {
3242     /* seek to start, we can implement a default for this. */
3243     segment->time = 0;
3244   } else {
3245     res = FALSE;
3246     GST_INFO_OBJECT (sink, "Can't do a default seek");
3247   }
3248
3249   return res;
3250 }
3251
3252 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3253
3254 static gboolean
3255 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3256     GstEvent * event, GstSegment * segment)
3257 {
3258   /* By default, we try one of 2 things:
3259    *   - For absolute seek positions, convert the requested position to our 
3260    *     configured processing format and place it in the output segment \
3261    *   - For relative seek positions, convert our current (input) values to the
3262    *     seek format, adjust by the relative seek offset and then convert back to
3263    *     the processing format
3264    */
3265   GstSeekType cur_type, stop_type;
3266   gint64 cur, stop;
3267   GstSeekFlags flags;
3268   GstFormat seek_format, dest_format;
3269   gdouble rate;
3270   gboolean update;
3271   gboolean res = TRUE;
3272
3273   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3274       &cur_type, &cur, &stop_type, &stop);
3275   dest_format = segment->format;
3276
3277   if (seek_format == dest_format) {
3278     gst_segment_set_seek (segment, rate, seek_format, flags,
3279         cur_type, cur, stop_type, stop, &update);
3280     return TRUE;
3281   }
3282
3283   if (cur_type != GST_SEEK_TYPE_NONE) {
3284     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3285     res =
3286         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3287         &cur);
3288     cur_type = GST_SEEK_TYPE_SET;
3289   }
3290
3291   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3292     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3293     res =
3294         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3295         &stop);
3296     stop_type = GST_SEEK_TYPE_SET;
3297   }
3298
3299   /* And finally, configure our output segment in the desired format */
3300   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3301       stop_type, stop, &update);
3302
3303   if (!res)
3304     goto no_format;
3305
3306   return res;
3307
3308 no_format:
3309   {
3310     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3311     return FALSE;
3312   }
3313 }
3314
3315 /* perform a seek, only executed in pull mode */
3316 static gboolean
3317 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3318 {
3319   gboolean flush;
3320   gdouble rate;
3321   GstFormat seek_format, dest_format;
3322   GstSeekFlags flags;
3323   GstSeekType cur_type, stop_type;
3324   gboolean seekseg_configured = FALSE;
3325   gint64 cur, stop;
3326   gboolean update, res = TRUE;
3327   GstSegment seeksegment;
3328
3329   dest_format = sink->segment.format;
3330
3331   if (event) {
3332     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3333     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3334         &cur_type, &cur, &stop_type, &stop);
3335
3336     flush = flags & GST_SEEK_FLAG_FLUSH;
3337   } else {
3338     GST_DEBUG_OBJECT (sink, "performing seek without event");
3339     flush = FALSE;
3340   }
3341
3342   if (flush) {
3343     GST_DEBUG_OBJECT (sink, "flushing upstream");
3344     gst_pad_push_event (pad, gst_event_new_flush_start ());
3345     gst_base_sink_flush_start (sink, pad);
3346   } else {
3347     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3348   }
3349
3350   GST_PAD_STREAM_LOCK (pad);
3351
3352   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3353    * copy the current segment info into the temp segment that we can actually
3354    * attempt the seek with. We only update the real segment if the seek suceeds. */
3355   if (!seekseg_configured) {
3356     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3357
3358     /* now configure the final seek segment */
3359     if (event) {
3360       if (sink->segment.format != seek_format) {
3361         /* OK, here's where we give the subclass a chance to convert the relative
3362          * seek into an absolute one in the processing format. We set up any
3363          * absolute seek above, before taking the stream lock. */
3364         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3365                 &seeksegment)) {
3366           GST_DEBUG_OBJECT (sink,
3367               "Preparing the seek failed after flushing. " "Aborting seek");
3368           res = FALSE;
3369         }
3370       } else {
3371         /* The seek format matches our processing format, no need to ask the
3372          * the subclass to configure the segment. */
3373         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3374             cur_type, cur, stop_type, stop, &update);
3375       }
3376     }
3377     /* Else, no seek event passed, so we're just (re)starting the 
3378        current segment. */
3379   }
3380
3381   if (res) {
3382     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3383         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3384         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3385
3386     /* do the seek, segment.last_stop contains the new position. */
3387     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3388   }
3389
3390
3391   if (flush) {
3392     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3393     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3394     gst_base_sink_flush_stop (sink, pad);
3395   } else if (res && sink->abidata.ABI.running) {
3396     /* we are running the current segment and doing a non-flushing seek, 
3397      * close the segment first based on the last_stop. */
3398     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3399         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3400   }
3401
3402   /* The subclass must have converted the segment to the processing format 
3403    * by now */
3404   if (res && seeksegment.format != dest_format) {
3405     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3406         "in the correct format. Aborting seek.");
3407     res = FALSE;
3408   }
3409
3410   /* if successfull seek, we update our real segment and push
3411    * out the new segment. */
3412   if (res) {
3413     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3414
3415     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3416       gst_element_post_message (GST_ELEMENT (sink),
3417           gst_message_new_segment_start (GST_OBJECT (sink),
3418               sink->segment.format, sink->segment.last_stop));
3419     }
3420   }
3421
3422   sink->priv->discont = TRUE;
3423   sink->abidata.ABI.running = TRUE;
3424
3425   GST_PAD_STREAM_UNLOCK (pad);
3426
3427   return res;
3428 }
3429
3430 static gboolean
3431 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3432 {
3433   GstBaseSinkPrivate *priv;
3434   GstBaseSinkClass *bclass;
3435   gboolean flush, intermediate;
3436   gdouble rate;
3437   GstFormat format;
3438   guint64 amount;
3439   guint seqnum;
3440   GstStepInfo *pending;
3441
3442   bclass = GST_BASE_SINK_GET_CLASS (sink);
3443   priv = sink->priv;
3444
3445   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3446
3447   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3448   seqnum = gst_event_get_seqnum (event);
3449
3450   pending = &priv->pending_step;
3451
3452   if (flush) {
3453     /* we need to call ::unlock before locking PREROLL_LOCK
3454      * since we lock it before going into ::render */
3455     if (bclass->unlock)
3456       bclass->unlock (sink);
3457
3458     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3459     /* now that we have the PREROLL lock, clear our unlock request */
3460     if (bclass->unlock_stop)
3461       bclass->unlock_stop (sink);
3462
3463     /* update the stepinfo and make it valid */
3464     pending->seqnum = seqnum;
3465     pending->format = format;
3466     pending->amount = amount;
3467     pending->position = 0;
3468     pending->rate = rate;
3469     pending->flush = flush;
3470     pending->intermediate = intermediate;
3471     pending->valid = TRUE;
3472
3473     if (sink->priv->async_enabled) {
3474       /* and we need to commit our state again on the next
3475        * prerolled buffer */
3476       sink->playing_async = TRUE;
3477       priv->pending_step.need_preroll = TRUE;
3478       sink->need_preroll = FALSE;
3479       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3480     } else {
3481       sink->priv->have_latency = TRUE;
3482       sink->need_preroll = FALSE;
3483     }
3484     priv->current_sstart = -1;
3485     priv->current_sstop = -1;
3486     priv->eos_rtime = -1;
3487     priv->call_preroll = TRUE;
3488     gst_base_sink_set_last_buffer (sink, NULL);
3489     gst_base_sink_reset_qos (sink);
3490
3491     if (sink->clock_id) {
3492       gst_clock_id_unschedule (sink->clock_id);
3493     }
3494
3495     if (sink->have_preroll) {
3496       GST_DEBUG_OBJECT (sink, "signal waiter");
3497       priv->step_unlock = TRUE;
3498       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3499     }
3500     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3501   }
3502
3503   return TRUE;
3504 }
3505
3506 /* with STREAM_LOCK
3507  */
3508 static void
3509 gst_base_sink_loop (GstPad * pad)
3510 {
3511   GstBaseSink *basesink;
3512   GstBuffer *buf = NULL;
3513   GstFlowReturn result;
3514   guint blocksize;
3515   guint64 offset;
3516
3517   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3518
3519   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3520
3521   if ((blocksize = basesink->priv->blocksize) == 0)
3522     blocksize = -1;
3523
3524   offset = basesink->segment.last_stop;
3525
3526   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3527       offset, blocksize);
3528
3529   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3530   if (G_UNLIKELY (result != GST_FLOW_OK))
3531     goto paused;
3532
3533   if (G_UNLIKELY (buf == NULL))
3534     goto no_buffer;
3535
3536   offset += GST_BUFFER_SIZE (buf);
3537
3538   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3539
3540   GST_PAD_PREROLL_LOCK (pad);
3541   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3542   GST_PAD_PREROLL_UNLOCK (pad);
3543   if (G_UNLIKELY (result != GST_FLOW_OK))
3544     goto paused;
3545
3546   return;
3547
3548   /* ERRORS */
3549 paused:
3550   {
3551     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3552         gst_flow_get_name (result));
3553     gst_pad_pause_task (pad);
3554     /* fatal errors and NOT_LINKED cause EOS */
3555     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3556       if (result == GST_FLOW_UNEXPECTED) {
3557         /* perform EOS logic */
3558         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3559           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3560               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3561                   basesink->segment.format, basesink->segment.last_stop));
3562         } else {
3563           gst_base_sink_event (pad, gst_event_new_eos ());
3564         }
3565       } else {
3566         /* for fatal errors we post an error message, post the error
3567          * first so the app knows about the error first. */
3568         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3569             (_("Internal data stream error.")),
3570             ("stream stopped, reason %s", gst_flow_get_name (result)));
3571         gst_base_sink_event (pad, gst_event_new_eos ());
3572       }
3573     }
3574     return;
3575   }
3576 no_buffer:
3577   {
3578     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3579     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3580         (_("Internal data flow error.")), ("element returned NULL buffer"));
3581     result = GST_FLOW_ERROR;
3582     goto paused;
3583   }
3584 }
3585
3586 static gboolean
3587 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3588     gboolean flushing)
3589 {
3590   GstBaseSinkClass *bclass;
3591
3592   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3593
3594   if (flushing) {
3595     /* unlock any subclasses, we need to do this before grabbing the
3596      * PREROLL_LOCK since we hold this lock before going into ::render. */
3597     if (bclass->unlock)
3598       bclass->unlock (basesink);
3599   }
3600
3601   GST_PAD_PREROLL_LOCK (pad);
3602   basesink->flushing = flushing;
3603   if (flushing) {
3604     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3605     if (bclass->unlock_stop)
3606       bclass->unlock_stop (basesink);
3607
3608     /* set need_preroll before we unblock the clock. If the clock is unblocked
3609      * before timing out, we can reuse the buffer for preroll. */
3610     basesink->need_preroll = TRUE;
3611
3612     /* step 2, unblock clock sync (if any) or any other blocking thing */
3613     if (basesink->clock_id) {
3614       gst_clock_id_unschedule (basesink->clock_id);
3615     }
3616
3617     /* flush out the data thread if it's locked in finish_preroll, this will
3618      * also flush out the EOS state */
3619     GST_DEBUG_OBJECT (basesink,
3620         "flushing out data thread, need preroll to TRUE");
3621     gst_base_sink_preroll_queue_flush (basesink, pad);
3622   }
3623   GST_PAD_PREROLL_UNLOCK (pad);
3624
3625   return TRUE;
3626 }
3627
3628 static gboolean
3629 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3630 {
3631   gboolean result;
3632
3633   if (active) {
3634     /* start task */
3635     result = gst_pad_start_task (basesink->sinkpad,
3636         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3637   } else {
3638     /* step 2, make sure streaming finishes */
3639     result = gst_pad_stop_task (basesink->sinkpad);
3640   }
3641
3642   return result;
3643 }
3644
3645 static gboolean
3646 gst_base_sink_pad_activate (GstPad * pad)
3647 {
3648   gboolean result = FALSE;
3649   GstBaseSink *basesink;
3650
3651   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3652
3653   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3654
3655   gst_base_sink_set_flushing (basesink, pad, FALSE);
3656
3657   /* we need to have the pull mode enabled */
3658   if (!basesink->can_activate_pull) {
3659     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3660     goto fallback;
3661   }
3662
3663   /* check if downstreams supports pull mode at all */
3664   if (!gst_pad_check_pull_range (pad)) {
3665     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3666     goto fallback;
3667   }
3668
3669   /* set the pad mode before starting the task so that it's in the
3670    * correct state for the new thread. also the sink set_caps and get_caps
3671    * function checks this */
3672   basesink->pad_mode = GST_ACTIVATE_PULL;
3673
3674   /* we first try to negotiate a format so that when we try to activate
3675    * downstream, it knows about our format */
3676   if (!gst_base_sink_negotiate_pull (basesink)) {
3677     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3678     goto fallback;
3679   }
3680
3681   /* ok activate now */
3682   if (!gst_pad_activate_pull (pad, TRUE)) {
3683     /* clear any pending caps */
3684     GST_OBJECT_LOCK (basesink);
3685     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3686     GST_OBJECT_UNLOCK (basesink);
3687     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3688     goto fallback;
3689   }
3690
3691   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3692   result = TRUE;
3693   goto done;
3694
3695   /* push mode fallback */
3696 fallback:
3697   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3698   if ((result = gst_pad_activate_push (pad, TRUE))) {
3699     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3700   }
3701
3702 done:
3703   if (!result) {
3704     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3705     gst_base_sink_set_flushing (basesink, pad, TRUE);
3706   }
3707
3708   gst_object_unref (basesink);
3709
3710   return result;
3711 }
3712
3713 static gboolean
3714 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3715 {
3716   gboolean result;
3717   GstBaseSink *basesink;
3718
3719   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3720
3721   if (active) {
3722     if (!basesink->can_activate_push) {
3723       result = FALSE;
3724       basesink->pad_mode = GST_ACTIVATE_NONE;
3725     } else {
3726       result = TRUE;
3727       basesink->pad_mode = GST_ACTIVATE_PUSH;
3728     }
3729   } else {
3730     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3731       g_warning ("Internal GStreamer activation error!!!");
3732       result = FALSE;
3733     } else {
3734       gst_base_sink_set_flushing (basesink, pad, TRUE);
3735       result = TRUE;
3736       basesink->pad_mode = GST_ACTIVATE_NONE;
3737     }
3738   }
3739
3740   gst_object_unref (basesink);
3741
3742   return result;
3743 }
3744
3745 static gboolean
3746 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3747 {
3748   GstCaps *caps;
3749   gboolean result;
3750
3751   result = FALSE;
3752
3753   /* this returns the intersection between our caps and the peer caps. If there
3754    * is no peer, it returns NULL and we can't operate in pull mode so we can
3755    * fail the negotiation. */
3756   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3757   if (caps == NULL || gst_caps_is_empty (caps))
3758     goto no_caps_possible;
3759
3760   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3761
3762   caps = gst_caps_make_writable (caps);
3763   /* get the first (prefered) format */
3764   gst_caps_truncate (caps);
3765   /* try to fixate */
3766   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3767
3768   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3769
3770   if (gst_caps_is_any (caps)) {
3771     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3772         "allowing pull()");
3773     /* neither side has template caps in this case, so they are prepared for
3774        pull() without setcaps() */
3775     result = TRUE;
3776   } else if (gst_caps_is_fixed (caps)) {
3777     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3778       goto could_not_set_caps;
3779
3780     GST_OBJECT_LOCK (basesink);
3781     gst_caps_replace (&basesink->priv->pull_caps, caps);
3782     GST_OBJECT_UNLOCK (basesink);
3783
3784     result = TRUE;
3785   }
3786
3787   gst_caps_unref (caps);
3788
3789   return result;
3790
3791 no_caps_possible:
3792   {
3793     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3794     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3795     if (caps)
3796       gst_caps_unref (caps);
3797     return FALSE;
3798   }
3799 could_not_set_caps:
3800   {
3801     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3802     gst_caps_unref (caps);
3803     return FALSE;
3804   }
3805 }
3806
3807 /* this won't get called until we implement an activate function */
3808 static gboolean
3809 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3810 {
3811   gboolean result = FALSE;
3812   GstBaseSink *basesink;
3813   GstBaseSinkClass *bclass;
3814
3815   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3816   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3817
3818   if (active) {
3819     GstFormat format;
3820     gint64 duration;
3821
3822     /* we mark we have a newsegment here because pull based
3823      * mode works just fine without having a newsegment before the
3824      * first buffer */
3825     format = GST_FORMAT_BYTES;
3826
3827     gst_segment_init (&basesink->segment, format);
3828     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3829     GST_OBJECT_LOCK (basesink);
3830     basesink->have_newsegment = TRUE;
3831     GST_OBJECT_UNLOCK (basesink);
3832
3833     /* get the peer duration in bytes */
3834     result = gst_pad_query_peer_duration (pad, &format, &duration);
3835     if (result) {
3836       GST_DEBUG_OBJECT (basesink,
3837           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3838       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3839           duration);
3840       gst_segment_set_duration (&basesink->segment, format, duration);
3841     } else {
3842       GST_DEBUG_OBJECT (basesink, "unknown duration");
3843     }
3844
3845     if (bclass->activate_pull)
3846       result = bclass->activate_pull (basesink, TRUE);
3847     else
3848       result = FALSE;
3849
3850     if (!result)
3851       goto activate_failed;
3852
3853   } else {
3854     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3855       g_warning ("Internal GStreamer activation error!!!");
3856       result = FALSE;
3857     } else {
3858       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3859       if (bclass->activate_pull)
3860         result &= bclass->activate_pull (basesink, FALSE);
3861       basesink->pad_mode = GST_ACTIVATE_NONE;
3862       /* clear any pending caps */
3863       GST_OBJECT_LOCK (basesink);
3864       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3865       GST_OBJECT_UNLOCK (basesink);
3866     }
3867   }
3868   gst_object_unref (basesink);
3869
3870   return result;
3871
3872   /* ERRORS */
3873 activate_failed:
3874   {
3875     /* reset, as starting the thread failed */
3876     basesink->pad_mode = GST_ACTIVATE_NONE;
3877
3878     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3879     return FALSE;
3880   }
3881 }
3882
3883 /* send an event to our sinkpad peer. */
3884 static gboolean
3885 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3886 {
3887   GstPad *pad;
3888   GstBaseSink *basesink = GST_BASE_SINK (element);
3889   gboolean forward, result = TRUE;
3890   GstActivateMode mode;
3891
3892   GST_OBJECT_LOCK (element);
3893   /* get the pad and the scheduling mode */
3894   pad = gst_object_ref (basesink->sinkpad);
3895   mode = basesink->pad_mode;
3896   GST_OBJECT_UNLOCK (element);
3897
3898   /* only push UPSTREAM events upstream */
3899   forward = GST_EVENT_IS_UPSTREAM (event);
3900
3901   switch (GST_EVENT_TYPE (event)) {
3902     case GST_EVENT_LATENCY:
3903     {
3904       GstClockTime latency;
3905
3906       gst_event_parse_latency (event, &latency);
3907
3908       /* store the latency. We use this to adjust the running_time before syncing
3909        * it to the clock. */
3910       GST_OBJECT_LOCK (element);
3911       basesink->priv->latency = latency;
3912       if (!basesink->priv->have_latency)
3913         forward = FALSE;
3914       GST_OBJECT_UNLOCK (element);
3915       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3916           GST_TIME_ARGS (latency));
3917
3918       /* We forward this event so that all elements know about the global pipeline
3919        * latency. This is interesting for an element when it wants to figure out
3920        * when a particular piece of data will be rendered. */
3921       break;
3922     }
3923     case GST_EVENT_SEEK:
3924       /* in pull mode we will execute the seek */
3925       if (mode == GST_ACTIVATE_PULL)
3926         result = gst_base_sink_perform_seek (basesink, pad, event);
3927       break;
3928     case GST_EVENT_STEP:
3929       result = gst_base_sink_perform_step (basesink, pad, event);
3930       forward = FALSE;
3931       break;
3932     default:
3933       break;
3934   }
3935
3936   if (forward) {
3937     result = gst_pad_push_event (pad, event);
3938   } else {
3939     /* not forwarded, unref the event */
3940     gst_event_unref (event);
3941   }
3942
3943   gst_object_unref (pad);
3944   return result;
3945 }
3946
3947 static gboolean
3948 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3949 {
3950   GstPad *peer;
3951   gboolean res = FALSE;
3952
3953   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3954     res = gst_pad_query (peer, query);
3955     gst_object_unref (peer);
3956   }
3957   return res;
3958 }
3959
3960 /* get the end position of the last seen object, this is used
3961  * for EOS and for making sure that we don't report a position we
3962  * have not reached yet. With LOCK. */
3963 static gboolean
3964 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
3965     gint64 * cur)
3966 {
3967   GstFormat oformat;
3968   GstSegment *segment;
3969   gboolean ret = TRUE;
3970
3971   segment = &basesink->segment;
3972   oformat = segment->format;
3973
3974   if (oformat == GST_FORMAT_TIME) {
3975     /* return last observed stream time, we keep the stream time around in the
3976      * time format. */
3977     *cur = basesink->priv->current_sstop;
3978   } else {
3979     /* convert last stop to stream time */
3980     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
3981   }
3982
3983   if (*cur != -1 && oformat != format) {
3984     GST_OBJECT_UNLOCK (basesink);
3985     /* convert to the target format if we need to, release lock first */
3986     ret =
3987         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
3988     if (!ret)
3989       *cur = -1;
3990     GST_OBJECT_LOCK (basesink);
3991   }
3992
3993   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
3994       GST_TIME_ARGS (*cur));
3995
3996   return ret;
3997 }
3998
3999 /* get the position when we are PAUSED, this is the stream time of the buffer
4000  * that prerolled. If no buffer is prerolled (we are still flushing), this
4001  * value will be -1. With LOCK. */
4002 static gboolean
4003 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4004     gint64 * cur)
4005 {
4006   gboolean res;
4007   gint64 time;
4008   GstSegment *segment;
4009   GstFormat oformat;
4010
4011   /* we don't use the clip segment in pull mode, when seeking we update the
4012    * main segment directly with the new segment values without it having to be
4013    * activated by the rendering after preroll */
4014   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4015     segment = basesink->abidata.ABI.clip_segment;
4016   else
4017     segment = &basesink->segment;
4018   oformat = segment->format;
4019
4020   if (oformat == GST_FORMAT_TIME) {
4021     *cur = basesink->priv->current_sstart;
4022     if (segment->rate < 0.0 && basesink->priv->current_sstop != -1) {
4023       /* for reverse playback we prefer the stream time stop position if we have
4024        * one */
4025       *cur = basesink->priv->current_sstop;
4026     }
4027   } else {
4028     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4029   }
4030
4031   time = segment->time;
4032
4033   if (*cur != -1) {
4034     *cur = MAX (*cur, time);
4035     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4036         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4037   } else {
4038     /* we have no buffer, use the segment times. */
4039     if (segment->rate >= 0.0) {
4040       /* forward, next position is always the time of the segment */
4041       *cur = time;
4042       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4043           GST_TIME_ARGS (*cur));
4044     } else {
4045       /* reverse, next expected timestamp is segment->stop. We use the function
4046        * to get things right for negative applied_rates. */
4047       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4048       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4049           GST_TIME_ARGS (*cur));
4050     }
4051   }
4052
4053   res = (*cur != -1);
4054   if (res && oformat != format) {
4055     GST_OBJECT_UNLOCK (basesink);
4056     res =
4057         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4058     if (!res)
4059       *cur = -1;
4060     GST_OBJECT_LOCK (basesink);
4061   }
4062
4063   return res;
4064 }
4065
4066 static gboolean
4067 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4068     gint64 * cur, gboolean * upstream)
4069 {
4070   GstClock *clock;
4071   gboolean res = FALSE;
4072   GstFormat oformat, tformat;
4073   GstClockTime now, base, latency;
4074   gint64 time, accum, duration;
4075   gdouble rate;
4076   gint64 last;
4077
4078   GST_OBJECT_LOCK (basesink);
4079   /* our intermediate time format */
4080   tformat = GST_FORMAT_TIME;
4081   /* get the format in the segment */
4082   oformat = basesink->segment.format;
4083
4084   /* can only give answer based on the clock if not EOS */
4085   if (G_UNLIKELY (basesink->eos))
4086     goto in_eos;
4087
4088   /* we can only get the segment when we are not NULL or READY */
4089   if (!basesink->have_newsegment)
4090     goto wrong_state;
4091
4092   /* when not in PLAYING or when we're busy with a state change, we
4093    * cannot read from the clock so we report time based on the
4094    * last seen timestamp. */
4095   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4096       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4097     goto in_pause;
4098
4099   /* we need to sync on the clock. */
4100   if (basesink->sync == FALSE)
4101     goto no_sync;
4102
4103   /* and we need a clock */
4104   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4105     goto no_sync;
4106
4107   /* collect all data we need holding the lock */
4108   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4109     time = basesink->segment.time;
4110   else
4111     time = 0;
4112
4113   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4114     duration = basesink->segment.stop - basesink->segment.start;
4115   else
4116     duration = 0;
4117
4118   base = GST_ELEMENT_CAST (basesink)->base_time;
4119   accum = basesink->segment.accum;
4120   rate = basesink->segment.rate * basesink->segment.applied_rate;
4121   latency = basesink->priv->latency;
4122
4123   gst_object_ref (clock);
4124
4125   /* this function might release the LOCK */
4126   gst_base_sink_get_position_last (basesink, format, &last);
4127
4128   /* need to release the object lock before we can get the time, 
4129    * a clock might take the LOCK of the provider, which could be
4130    * a basesink subclass. */
4131   GST_OBJECT_UNLOCK (basesink);
4132
4133   now = gst_clock_get_time (clock);
4134
4135   if (oformat != tformat) {
4136     /* convert accum, time and duration to time */
4137     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4138             &accum))
4139       goto convert_failed;
4140     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4141             &duration))
4142       goto convert_failed;
4143     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4144             &time))
4145       goto convert_failed;
4146   }
4147
4148   /* subtract base time and accumulated time from the clock time. 
4149    * Make sure we don't go negative. This is the current time in
4150    * the segment which we need to scale with the combined 
4151    * rate and applied rate. */
4152   base += accum;
4153   base += latency;
4154   base = MIN (now, base);
4155
4156   /* for negative rates we need to count back from from the segment
4157    * duration. */
4158   if (rate < 0.0)
4159     time += duration;
4160
4161   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4162
4163   /* never report more than last seen position */
4164   if (last != -1)
4165     *cur = MIN (last, *cur);
4166
4167   gst_object_unref (clock);
4168
4169   GST_DEBUG_OBJECT (basesink,
4170       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4171       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4172       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4173       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4174
4175   if (oformat != format) {
4176     /* convert time to final format */
4177     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4178       goto convert_failed;
4179   }
4180
4181   res = TRUE;
4182
4183 done:
4184   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4185       res, GST_TIME_ARGS (*cur));
4186   return res;
4187
4188   /* special cases */
4189 in_eos:
4190   {
4191     GST_DEBUG_OBJECT (basesink, "position in EOS");
4192     res = gst_base_sink_get_position_last (basesink, format, cur);
4193     GST_OBJECT_UNLOCK (basesink);
4194     goto done;
4195   }
4196 in_pause:
4197   {
4198     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4199     res = gst_base_sink_get_position_paused (basesink, format, cur);
4200     GST_OBJECT_UNLOCK (basesink);
4201     goto done;
4202   }
4203 wrong_state:
4204   {
4205     /* in NULL or READY we always return FALSE and -1 */
4206     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4207     res = FALSE;
4208     *cur = -1;
4209     GST_OBJECT_UNLOCK (basesink);
4210     goto done;
4211   }
4212 no_sync:
4213   {
4214     /* report last seen timestamp if any, else ask upstream to answer */
4215     if ((*cur = basesink->priv->current_sstart) != -1)
4216       res = TRUE;
4217     else
4218       *upstream = TRUE;
4219
4220     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4221         res, GST_TIME_ARGS (*cur));
4222     GST_OBJECT_UNLOCK (basesink);
4223     return res;
4224   }
4225 convert_failed:
4226   {
4227     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4228     *upstream = TRUE;
4229     return FALSE;
4230   }
4231 }
4232
4233 static gboolean
4234 gst_base_sink_query (GstElement * element, GstQuery * query)
4235 {
4236   gboolean res = FALSE;
4237
4238   GstBaseSink *basesink = GST_BASE_SINK (element);
4239
4240   switch (GST_QUERY_TYPE (query)) {
4241     case GST_QUERY_POSITION:
4242     {
4243       gint64 cur = 0;
4244       GstFormat format;
4245       gboolean upstream = FALSE;
4246
4247       gst_query_parse_position (query, &format, NULL);
4248
4249       GST_DEBUG_OBJECT (basesink, "position format %d", format);
4250
4251       /* first try to get the position based on the clock */
4252       if ((res =
4253               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4254         gst_query_set_position (query, format, cur);
4255       } else if (upstream) {
4256         /* fallback to peer query */
4257         res = gst_base_sink_peer_query (basesink, query);
4258       }
4259       break;
4260     }
4261     case GST_QUERY_DURATION:
4262     {
4263       GstFormat format, uformat;
4264       gint64 duration, uduration;
4265
4266       gst_query_parse_duration (query, &format, NULL);
4267
4268       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4269           gst_format_get_name (format));
4270
4271       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4272         uformat = GST_FORMAT_BYTES;
4273
4274         /* get the duration in bytes, in pull mode that's all we are sure to
4275          * know. We have to explicitly get this value from upstream instead of
4276          * using our cached value because it might change. Duration caching
4277          * should be done at a higher level. */
4278         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4279             &uduration);
4280         if (res) {
4281           gst_segment_set_duration (&basesink->segment, uformat, uduration);
4282           if (format != uformat) {
4283             /* convert to the requested format */
4284             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4285                 &format, &duration);
4286           } else {
4287             duration = uduration;
4288           }
4289           if (res) {
4290             /* set the result */
4291             gst_query_set_duration (query, format, duration);
4292           }
4293         }
4294       } else {
4295         /* in push mode we simply forward upstream */
4296         res = gst_base_sink_peer_query (basesink, query);
4297       }
4298       break;
4299     }
4300     case GST_QUERY_LATENCY:
4301     {
4302       gboolean live, us_live;
4303       GstClockTime min, max;
4304
4305       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4306                   &max))) {
4307         gst_query_set_latency (query, live, min, max);
4308       }
4309       break;
4310     }
4311     case GST_QUERY_JITTER:
4312       break;
4313     case GST_QUERY_RATE:
4314       /* gst_query_set_rate (query, basesink->segment_rate); */
4315       res = TRUE;
4316       break;
4317     case GST_QUERY_SEGMENT:
4318     {
4319       /* FIXME, bring start/stop to stream time */
4320       gst_query_set_segment (query, basesink->segment.rate,
4321           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4322       break;
4323     }
4324     case GST_QUERY_SEEKING:
4325     case GST_QUERY_CONVERT:
4326     case GST_QUERY_FORMATS:
4327     default:
4328       res = gst_base_sink_peer_query (basesink, query);
4329       break;
4330   }
4331   return res;
4332 }
4333
4334 static GstStateChangeReturn
4335 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4336 {
4337   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4338   GstBaseSink *basesink = GST_BASE_SINK (element);
4339   GstBaseSinkClass *bclass;
4340   GstBaseSinkPrivate *priv;
4341
4342   priv = basesink->priv;
4343
4344   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4345
4346   switch (transition) {
4347     case GST_STATE_CHANGE_NULL_TO_READY:
4348       if (bclass->start)
4349         if (!bclass->start (basesink))
4350           goto start_failed;
4351       break;
4352     case GST_STATE_CHANGE_READY_TO_PAUSED:
4353       /* need to complete preroll before this state change completes, there
4354        * is no data flow in READY so we can safely assume we need to preroll. */
4355       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4356       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4357       basesink->have_newsegment = FALSE;
4358       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4359       gst_segment_init (basesink->abidata.ABI.clip_segment,
4360           GST_FORMAT_UNDEFINED);
4361       basesink->offset = 0;
4362       basesink->have_preroll = FALSE;
4363       priv->step_unlock = FALSE;
4364       basesink->need_preroll = TRUE;
4365       basesink->playing_async = TRUE;
4366       priv->current_sstart = -1;
4367       priv->current_sstop = -1;
4368       priv->eos_rtime = -1;
4369       priv->latency = 0;
4370       basesink->eos = FALSE;
4371       priv->received_eos = FALSE;
4372       gst_base_sink_reset_qos (basesink);
4373       priv->commited = FALSE;
4374       priv->call_preroll = TRUE;
4375       priv->current_step.valid = FALSE;
4376       priv->pending_step.valid = FALSE;
4377       if (priv->async_enabled) {
4378         GST_DEBUG_OBJECT (basesink, "doing async state change");
4379         /* when async enabled, post async-start message and return ASYNC from
4380          * the state change function */
4381         ret = GST_STATE_CHANGE_ASYNC;
4382         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4383             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4384       } else {
4385         priv->have_latency = TRUE;
4386       }
4387       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4388       break;
4389     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4390       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4391       if (!gst_base_sink_needs_preroll (basesink)) {
4392         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4393         /* no preroll needed anymore now. */
4394         basesink->playing_async = FALSE;
4395         basesink->need_preroll = FALSE;
4396         if (basesink->eos) {
4397           GstMessage *message;
4398
4399           /* need to post EOS message here */
4400           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4401           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4402           gst_message_set_seqnum (message, basesink->priv->seqnum);
4403           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4404         } else {
4405           GST_DEBUG_OBJECT (basesink, "signal preroll");
4406           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4407         }
4408       } else {
4409         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4410         basesink->need_preroll = TRUE;
4411         basesink->playing_async = TRUE;
4412         priv->call_preroll = TRUE;
4413         priv->commited = FALSE;
4414         if (priv->async_enabled) {
4415           GST_DEBUG_OBJECT (basesink, "doing async state change");
4416           ret = GST_STATE_CHANGE_ASYNC;
4417           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4418               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4419         }
4420       }
4421       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4422       break;
4423     default:
4424       break;
4425   }
4426
4427   {
4428     GstStateChangeReturn bret;
4429
4430     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4431     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4432       goto activate_failed;
4433   }
4434
4435   switch (transition) {
4436     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4437       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4438       /* FIXME, make sure we cannot enter _render first */
4439
4440       /* we need to call ::unlock before locking PREROLL_LOCK
4441        * since we lock it before going into ::render */
4442       if (bclass->unlock)
4443         bclass->unlock (basesink);
4444
4445       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4446       /* now that we have the PREROLL lock, clear our unlock request */
4447       if (bclass->unlock_stop)
4448         bclass->unlock_stop (basesink);
4449
4450       /* we need preroll again and we set the flag before unlocking the clockid
4451        * because if the clockid is unlocked before a current buffer expired, we
4452        * can use that buffer to preroll with */
4453       basesink->need_preroll = TRUE;
4454
4455       if (basesink->clock_id) {
4456         gst_clock_id_unschedule (basesink->clock_id);
4457       }
4458
4459       /* if we don't have a preroll buffer we need to wait for a preroll and
4460        * return ASYNC. */
4461       if (!gst_base_sink_needs_preroll (basesink)) {
4462         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4463         basesink->playing_async = FALSE;
4464       } else {
4465         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4466           ret = GST_STATE_CHANGE_SUCCESS;
4467         } else {
4468           GST_DEBUG_OBJECT (basesink,
4469               "PLAYING to PAUSED, we are not prerolled");
4470           basesink->playing_async = TRUE;
4471           priv->commited = FALSE;
4472           priv->call_preroll = TRUE;
4473           if (priv->async_enabled) {
4474             GST_DEBUG_OBJECT (basesink, "doing async state change");
4475             ret = GST_STATE_CHANGE_ASYNC;
4476             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4477                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4478                     FALSE));
4479           }
4480         }
4481       }
4482       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4483           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4484
4485       gst_base_sink_reset_qos (basesink);
4486       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4487       break;
4488     case GST_STATE_CHANGE_PAUSED_TO_READY:
4489       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4490       /* start by reseting our position state with the object lock so that the
4491        * position query gets the right idea. We do this before we post the
4492        * messages so that the message handlers pick this up. */
4493       GST_OBJECT_LOCK (basesink);
4494       basesink->have_newsegment = FALSE;
4495       priv->current_sstart = -1;
4496       priv->current_sstop = -1;
4497       priv->have_latency = FALSE;
4498       GST_OBJECT_UNLOCK (basesink);
4499
4500       gst_base_sink_set_last_buffer (basesink, NULL);
4501       priv->call_preroll = FALSE;
4502
4503       if (!priv->commited) {
4504         if (priv->async_enabled) {
4505           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4506
4507           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4508               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4509                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4510
4511           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4512               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4513         }
4514         priv->commited = TRUE;
4515       } else {
4516         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4517       }
4518       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4519       break;
4520     case GST_STATE_CHANGE_READY_TO_NULL:
4521       if (bclass->stop) {
4522         if (!bclass->stop (basesink)) {
4523           GST_WARNING_OBJECT (basesink, "failed to stop");
4524         }
4525       }
4526       gst_base_sink_set_last_buffer (basesink, NULL);
4527       priv->call_preroll = FALSE;
4528       break;
4529     default:
4530       break;
4531   }
4532
4533   return ret;
4534
4535   /* ERRORS */
4536 start_failed:
4537   {
4538     GST_DEBUG_OBJECT (basesink, "failed to start");
4539     return GST_STATE_CHANGE_FAILURE;
4540   }
4541 activate_failed:
4542   {
4543     GST_DEBUG_OBJECT (basesink,
4544         "element failed to change states -- activation problem?");
4545     return GST_STATE_CHANGE_FAILURE;
4546   }
4547 }