docs/design/part-negotiation.txt: Small doc update.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 /* FIXME, some stuff in ABI.data and other in Private...
156  * Make up your mind please.
157  */
158 struct _GstBaseSinkPrivate
159 {
160   gint qos_enabled;             /* ATOMIC */
161   gboolean async_enabled;
162   GstClockTimeDiff ts_offset;
163   GstClockTime render_delay;
164
165   /* start, stop of current buffer, stream time, used to report position */
166   GstClockTime current_sstart;
167   GstClockTime current_sstop;
168
169   /* start, stop and jitter of current buffer, running time */
170   GstClockTime current_rstart;
171   GstClockTime current_rstop;
172   GstClockTimeDiff current_jitter;
173
174   /* EOS sync time in running time */
175   GstClockTime eos_rtime;
176
177   /* last buffer that arrived in time, running time */
178   GstClockTime last_in_time;
179   /* when the last buffer left the sink, running time */
180   GstClockTime last_left;
181
182   /* running averages go here these are done on running time */
183   GstClockTime avg_pt;
184   GstClockTime avg_duration;
185   gdouble avg_rate;
186
187   /* these are done on system time. avg_jitter and avg_render are
188    * compared to eachother to see if the rendering time takes a
189    * huge amount of the processing, If so we are flooded with
190    * buffers. */
191   GstClockTime last_left_systime;
192   GstClockTime avg_jitter;
193   GstClockTime start, stop;
194   GstClockTime avg_render;
195
196   /* number of rendered and dropped frames */
197   guint64 rendered;
198   guint64 dropped;
199
200   /* latency stuff */
201   GstClockTime latency;
202
203   /* if we already commited the state */
204   gboolean commited;
205
206   /* when we received EOS */
207   gboolean received_eos;
208
209   /* when we are prerolled and able to report latency */
210   gboolean have_latency;
211
212   /* the last buffer we prerolled or rendered. Useful for making snapshots */
213   GstBuffer *last_buffer;
214
215   /* caps for pull based scheduling */
216   GstCaps *pull_caps;
217
218   guint blocksize;
219 };
220
221 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
222
223 /* generic running average, this has a neutral window size */
224 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
225
226 /* the windows for these running averages are experimentally obtained.
227  * possitive values get averaged more while negative values use a small
228  * window so we can react faster to badness. */
229 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
230 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
231
232 /* BaseSink properties */
233
234 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
235 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
236
237 #define DEFAULT_PREROLL_QUEUE_LEN       0
238 #define DEFAULT_SYNC                    TRUE
239 #define DEFAULT_MAX_LATENESS            -1
240 #define DEFAULT_QOS                     FALSE
241 #define DEFAULT_ASYNC                   TRUE
242 #define DEFAULT_TS_OFFSET               0
243 #define DEFAULT_BLOCKSIZE               4096
244
245 enum
246 {
247   PROP_0,
248   PROP_PREROLL_QUEUE_LEN,
249   PROP_SYNC,
250   PROP_MAX_LATENESS,
251   PROP_QOS,
252   PROP_ASYNC,
253   PROP_TS_OFFSET,
254   PROP_LAST_BUFFER,
255   PROP_BLOCKSIZE,
256   PROP_LAST
257 };
258
259 static GstElementClass *parent_class = NULL;
260
261 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
262 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
263 static void gst_base_sink_finalize (GObject * object);
264
265 GType
266 gst_base_sink_get_type (void)
267 {
268   static GType base_sink_type = 0;
269
270   if (G_UNLIKELY (base_sink_type == 0)) {
271     static const GTypeInfo base_sink_info = {
272       sizeof (GstBaseSinkClass),
273       NULL,
274       NULL,
275       (GClassInitFunc) gst_base_sink_class_init,
276       NULL,
277       NULL,
278       sizeof (GstBaseSink),
279       0,
280       (GInstanceInitFunc) gst_base_sink_init,
281     };
282
283     base_sink_type = g_type_register_static (GST_TYPE_ELEMENT,
284         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
285   }
286   return base_sink_type;
287 }
288
289 static void gst_base_sink_set_property (GObject * object, guint prop_id,
290     const GValue * value, GParamSpec * pspec);
291 static void gst_base_sink_get_property (GObject * object, guint prop_id,
292     GValue * value, GParamSpec * pspec);
293
294 static gboolean gst_base_sink_send_event (GstElement * element,
295     GstEvent * event);
296 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
297
298 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
299 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
300 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
301     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
302 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
303     GstClockTime * start, GstClockTime * end);
304 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
305     GstPad * pad, gboolean flushing);
306 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
307     gboolean active);
308
309 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
310     GstStateChange transition);
311
312 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
313 static void gst_base_sink_loop (GstPad * pad);
314 static gboolean gst_base_sink_pad_activate (GstPad * pad);
315 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
316 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
317 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
318 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
319
320 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
321
322 /* check if an object was too late */
323 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
324     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
325     GstClockReturn status, GstClockTimeDiff jitter);
326
327 static void
328 gst_base_sink_class_init (GstBaseSinkClass * klass)
329 {
330   GObjectClass *gobject_class;
331   GstElementClass *gstelement_class;
332
333   gobject_class = G_OBJECT_CLASS (klass);
334   gstelement_class = GST_ELEMENT_CLASS (klass);
335
336   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
337       "basesink element");
338
339   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
340
341   parent_class = g_type_class_peek_parent (klass);
342
343   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
344   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
345   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
346
347   /* FIXME, this next value should be configured using an event from the
348    * upstream element, ie, the BUFFER_SIZE event. */
349   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
350       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
351           "Number of buffers to queue during preroll", 0, G_MAXUINT,
352           DEFAULT_PREROLL_QUEUE_LEN,
353           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
354
355   g_object_class_install_property (gobject_class, PROP_SYNC,
356       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
357           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358
359   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
360       g_param_spec_int64 ("max-lateness", "Max Lateness",
361           "Maximum number of nanoseconds that a buffer can be late before it "
362           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364
365   g_object_class_install_property (gobject_class, PROP_QOS,
366       g_param_spec_boolean ("qos", "Qos",
367           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   /**
370    * GstBaseSink:async
371    *
372    * If set to #TRUE, the basesink will perform asynchronous state changes.
373    * When set to #FALSE, the sink will not signal the parent when it prerolls.
374    * Use this option when dealing with sparse streams or when synchronisation is
375    * not required.
376    *
377    * Since: 0.10.15
378    */
379   g_object_class_install_property (gobject_class, PROP_ASYNC,
380       g_param_spec_boolean ("async", "Async",
381           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   /**
384    * GstBaseSink:ts-offset
385    *
386    * Controls the final synchronisation, a negative value will render the buffer
387    * earlier while a positive value delays playback. This property can be 
388    * used to fix synchronisation in bad files.
389    *
390    * Since: 0.10.15
391    */
392   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
393       g_param_spec_int64 ("ts-offset", "TS Offset",
394           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
395           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
396
397   /**
398    * GstBaseSink:last-buffer
399    *
400    * The last buffer that arrived in the sink and was used for preroll or for
401    * rendering. This property can be used to generate thumbnails. This property
402    * can be NULL when the sink has not yet received a bufer.
403    *
404    * Since: 0.10.15
405    */
406   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
407       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
408           "The last buffer received in the sink", GST_TYPE_BUFFER,
409           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
410
411   /**
412    * GstBaseSink:blocksize
413    *
414    * The amount of bytes to pull when operating in pull mode.
415    *
416    * Since: 0.10.22
417    */
418   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
419       g_param_spec_uint ("blocksize", "Block size",
420           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
421           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422
423   gstelement_class->change_state =
424       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
425   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
426   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
427
428   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
429   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
430   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
431   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
432   klass->activate_pull =
433       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
434 }
435
436 static GstCaps *
437 gst_base_sink_pad_getcaps (GstPad * pad)
438 {
439   GstBaseSinkClass *bclass;
440   GstBaseSink *bsink;
441   GstCaps *caps = NULL;
442
443   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
444   bclass = GST_BASE_SINK_GET_CLASS (bsink);
445
446   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
447     /* if we are operating in pull mode we only accept the negotiated caps */
448     GST_OBJECT_LOCK (pad);
449     if ((caps = GST_PAD_CAPS (pad)))
450       gst_caps_ref (caps);
451     GST_OBJECT_UNLOCK (pad);
452   }
453   if (caps == NULL) {
454     if (bclass->get_caps)
455       caps = bclass->get_caps (bsink);
456
457     if (caps == NULL) {
458       GstPadTemplate *pad_template;
459
460       pad_template =
461           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
462           "sink");
463       if (pad_template != NULL) {
464         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
465       }
466     }
467   }
468   gst_object_unref (bsink);
469
470   return caps;
471 }
472
473 static gboolean
474 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
475 {
476   GstBaseSinkClass *bclass;
477   GstBaseSink *bsink;
478   gboolean res = TRUE;
479
480   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
481   bclass = GST_BASE_SINK_GET_CLASS (bsink);
482
483   if (res && bclass->set_caps)
484     res = bclass->set_caps (bsink, caps);
485
486   gst_object_unref (bsink);
487
488   return res;
489 }
490
491 static void
492 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
493 {
494   GstBaseSinkClass *bclass;
495   GstBaseSink *bsink;
496
497   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
498   bclass = GST_BASE_SINK_GET_CLASS (bsink);
499
500   if (bclass->fixate)
501     bclass->fixate (bsink, caps);
502
503   gst_object_unref (bsink);
504 }
505
506 static GstFlowReturn
507 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
508     GstCaps * caps, GstBuffer ** buf)
509 {
510   GstBaseSinkClass *bclass;
511   GstBaseSink *bsink;
512   GstFlowReturn result = GST_FLOW_OK;
513
514   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
515   bclass = GST_BASE_SINK_GET_CLASS (bsink);
516
517   if (bclass->buffer_alloc)
518     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
519   else
520     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
521
522   gst_object_unref (bsink);
523
524   return result;
525 }
526
527 static void
528 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
529 {
530   GstPadTemplate *pad_template;
531   GstBaseSinkPrivate *priv;
532
533   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
534
535   pad_template =
536       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
537   g_return_if_fail (pad_template != NULL);
538
539   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
540
541   gst_pad_set_getcaps_function (basesink->sinkpad,
542       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
543   gst_pad_set_setcaps_function (basesink->sinkpad,
544       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
545   gst_pad_set_fixatecaps_function (basesink->sinkpad,
546       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
547   gst_pad_set_bufferalloc_function (basesink->sinkpad,
548       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
549   gst_pad_set_activate_function (basesink->sinkpad,
550       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
551   gst_pad_set_activatepush_function (basesink->sinkpad,
552       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
553   gst_pad_set_activatepull_function (basesink->sinkpad,
554       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
555   gst_pad_set_event_function (basesink->sinkpad,
556       GST_DEBUG_FUNCPTR (gst_base_sink_event));
557   gst_pad_set_chain_function (basesink->sinkpad,
558       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
559   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
560
561   basesink->pad_mode = GST_ACTIVATE_NONE;
562   basesink->preroll_queue = g_queue_new ();
563   basesink->abidata.ABI.clip_segment = gst_segment_new ();
564   priv->have_latency = FALSE;
565
566   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
567   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
568
569   basesink->sync = DEFAULT_SYNC;
570   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
571   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
572   priv->async_enabled = DEFAULT_ASYNC;
573   priv->ts_offset = DEFAULT_TS_OFFSET;
574   priv->render_delay = 0;
575   priv->blocksize = DEFAULT_BLOCKSIZE;
576
577   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
578 }
579
580 static void
581 gst_base_sink_finalize (GObject * object)
582 {
583   GstBaseSink *basesink;
584
585   basesink = GST_BASE_SINK (object);
586
587   g_queue_free (basesink->preroll_queue);
588   gst_segment_free (basesink->abidata.ABI.clip_segment);
589
590   G_OBJECT_CLASS (parent_class)->finalize (object);
591 }
592
593 /**
594  * gst_base_sink_set_sync:
595  * @sink: the sink
596  * @sync: the new sync value.
597  *
598  * Configures @sink to synchronize on the clock or not. When
599  * @sync is FALSE, incomming samples will be played as fast as
600  * possible. If @sync is TRUE, the timestamps of the incomming
601  * buffers will be used to schedule the exact render time of its
602  * contents.
603  *
604  * Since: 0.10.4
605  */
606 void
607 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
608 {
609   g_return_if_fail (GST_IS_BASE_SINK (sink));
610
611   GST_OBJECT_LOCK (sink);
612   sink->sync = sync;
613   GST_OBJECT_UNLOCK (sink);
614 }
615
616 /**
617  * gst_base_sink_get_sync:
618  * @sink: the sink
619  *
620  * Checks if @sink is currently configured to synchronize against the
621  * clock.
622  *
623  * Returns: TRUE if the sink is configured to synchronize against the clock.
624  *
625  * Since: 0.10.4
626  */
627 gboolean
628 gst_base_sink_get_sync (GstBaseSink * sink)
629 {
630   gboolean res;
631
632   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
633
634   GST_OBJECT_LOCK (sink);
635   res = sink->sync;
636   GST_OBJECT_UNLOCK (sink);
637
638   return res;
639 }
640
641 /**
642  * gst_base_sink_set_max_lateness:
643  * @sink: the sink
644  * @max_lateness: the new max lateness value.
645  *
646  * Sets the new max lateness value to @max_lateness. This value is
647  * used to decide if a buffer should be dropped or not based on the
648  * buffer timestamp and the current clock time. A value of -1 means
649  * an unlimited time.
650  *
651  * Since: 0.10.4
652  */
653 void
654 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
655 {
656   g_return_if_fail (GST_IS_BASE_SINK (sink));
657
658   GST_OBJECT_LOCK (sink);
659   sink->abidata.ABI.max_lateness = max_lateness;
660   GST_OBJECT_UNLOCK (sink);
661 }
662
663 /**
664  * gst_base_sink_get_max_lateness:
665  * @sink: the sink
666  *
667  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
668  * more details.
669  *
670  * Returns: The maximum time in nanoseconds that a buffer can be late
671  * before it is dropped and not rendered. A value of -1 means an
672  * unlimited time.
673  *
674  * Since: 0.10.4
675  */
676 gint64
677 gst_base_sink_get_max_lateness (GstBaseSink * sink)
678 {
679   gint64 res;
680
681   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
682
683   GST_OBJECT_LOCK (sink);
684   res = sink->abidata.ABI.max_lateness;
685   GST_OBJECT_UNLOCK (sink);
686
687   return res;
688 }
689
690 /**
691  * gst_base_sink_set_qos_enabled:
692  * @sink: the sink
693  * @enabled: the new qos value.
694  *
695  * Configures @sink to send Quality-of-Service events upstream.
696  *
697  * Since: 0.10.5
698  */
699 void
700 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
701 {
702   g_return_if_fail (GST_IS_BASE_SINK (sink));
703
704   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
705 }
706
707 /**
708  * gst_base_sink_is_qos_enabled:
709  * @sink: the sink
710  *
711  * Checks if @sink is currently configured to send Quality-of-Service events
712  * upstream.
713  *
714  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
715  *
716  * Since: 0.10.5
717  */
718 gboolean
719 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
720 {
721   gboolean res;
722
723   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
724
725   res = g_atomic_int_get (&sink->priv->qos_enabled);
726
727   return res;
728 }
729
730 /**
731  * gst_base_sink_set_async_enabled:
732  * @sink: the sink
733  * @enabled: the new async value.
734  *
735  * Configures @sink to perform all state changes asynchronusly. When async is
736  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
737  * preroll buffer. This feature is usefull if the sink does not synchronize
738  * against the clock or when it is dealing with sparse streams.
739  *
740  * Since: 0.10.15
741  */
742 void
743 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
744 {
745   g_return_if_fail (GST_IS_BASE_SINK (sink));
746
747   GST_PAD_PREROLL_LOCK (sink->sinkpad);
748   sink->priv->async_enabled = enabled;
749   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
750   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
751 }
752
753 /**
754  * gst_base_sink_is_async_enabled:
755  * @sink: the sink
756  *
757  * Checks if @sink is currently configured to perform asynchronous state
758  * changes to PAUSED.
759  *
760  * Returns: TRUE if the sink is configured to perform asynchronous state
761  * changes.
762  *
763  * Since: 0.10.15
764  */
765 gboolean
766 gst_base_sink_is_async_enabled (GstBaseSink * sink)
767 {
768   gboolean res;
769
770   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
771
772   GST_PAD_PREROLL_LOCK (sink->sinkpad);
773   res = sink->priv->async_enabled;
774   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
775
776   return res;
777 }
778
779 /**
780  * gst_base_sink_set_ts_offset:
781  * @sink: the sink
782  * @offset: the new offset
783  *
784  * Adjust the synchronisation of @sink with @offset. A negative value will
785  * render buffers earlier than their timestamp. A positive value will delay
786  * rendering. This function can be used to fix playback of badly timestamped
787  * buffers.
788  *
789  * Since: 0.10.15
790  */
791 void
792 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
793 {
794   g_return_if_fail (GST_IS_BASE_SINK (sink));
795
796   GST_OBJECT_LOCK (sink);
797   sink->priv->ts_offset = offset;
798   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
799   GST_OBJECT_UNLOCK (sink);
800 }
801
802 /**
803  * gst_base_sink_get_ts_offset:
804  * @sink: the sink
805  *
806  * Get the synchronisation offset of @sink.
807  *
808  * Returns: The synchronisation offset.
809  *
810  * Since: 0.10.15
811  */
812 GstClockTimeDiff
813 gst_base_sink_get_ts_offset (GstBaseSink * sink)
814 {
815   GstClockTimeDiff res;
816
817   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
818
819   GST_OBJECT_LOCK (sink);
820   res = sink->priv->ts_offset;
821   GST_OBJECT_UNLOCK (sink);
822
823   return res;
824 }
825
826 /**
827  * gst_base_sink_get_last_buffer:
828  * @sink: the sink
829  *
830  * Get the last buffer that arrived in the sink and was used for preroll or for
831  * rendering. This property can be used to generate thumbnails.
832  *
833  * The #GstCaps on the buffer can be used to determine the type of the buffer.
834  * 
835  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
836  * NULL when no buffer has arrived in the sink yet or when the sink is not in
837  * PAUSED or PLAYING.
838  *
839  * Since: 0.10.15
840  */
841 GstBuffer *
842 gst_base_sink_get_last_buffer (GstBaseSink * sink)
843 {
844   GstBuffer *res;
845
846   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
847
848   GST_OBJECT_LOCK (sink);
849   if ((res = sink->priv->last_buffer))
850     gst_buffer_ref (res);
851   GST_OBJECT_UNLOCK (sink);
852
853   return res;
854 }
855
856 static void
857 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
858 {
859   GstBuffer *old;
860
861   if (buffer)
862     gst_buffer_ref (buffer);
863
864   GST_OBJECT_LOCK (sink);
865   old = sink->priv->last_buffer;
866   sink->priv->last_buffer = buffer;
867   GST_OBJECT_UNLOCK (sink);
868
869   if (old)
870     gst_buffer_unref (old);
871 }
872
873 /**
874  * gst_base_sink_get_latency:
875  * @sink: the sink
876  *
877  * Get the currently configured latency.
878  *
879  * Returns: The configured latency.
880  *
881  * Since: 0.10.12
882  */
883 GstClockTime
884 gst_base_sink_get_latency (GstBaseSink * sink)
885 {
886   GstClockTime res;
887
888   GST_OBJECT_LOCK (sink);
889   res = sink->priv->latency;
890   GST_OBJECT_UNLOCK (sink);
891
892   return res;
893 }
894
895 /**
896  * gst_base_sink_query_latency:
897  * @sink: the sink
898  * @live: if the sink is live
899  * @upstream_live: if an upstream element is live
900  * @min_latency: the min latency of the upstream elements
901  * @max_latency: the max latency of the upstream elements
902  *
903  * Query the sink for the latency parameters. The latency will be queried from
904  * the upstream elements. @live will be TRUE if @sink is configured to
905  * synchronize against the clock. @upstream_live will be TRUE if an upstream
906  * element is live. 
907  *
908  * If both @live and @upstream_live are TRUE, the sink will want to compensate
909  * for the latency introduced by the upstream elements by setting the
910  * @min_latency to a strictly possitive value.
911  *
912  * This function is mostly used by subclasses. 
913  *
914  * Returns: TRUE if the query succeeded.
915  *
916  * Since: 0.10.12
917  */
918 gboolean
919 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
920     gboolean * upstream_live, GstClockTime * min_latency,
921     GstClockTime * max_latency)
922 {
923   gboolean l, us_live, res, have_latency;
924   GstClockTime min, max, render_delay;
925   GstQuery *query;
926   GstClockTime us_min, us_max;
927
928   /* we are live when we sync to the clock */
929   GST_OBJECT_LOCK (sink);
930   l = sink->sync;
931   have_latency = sink->priv->have_latency;
932   render_delay = sink->priv->render_delay;
933   GST_OBJECT_UNLOCK (sink);
934
935   /* assume no latency */
936   min = 0;
937   max = -1;
938   us_live = FALSE;
939
940   if (have_latency) {
941     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
942     /* we are ready for a latency query this is when we preroll or when we are
943      * not async. */
944     query = gst_query_new_latency ();
945
946     /* ask the peer for the latency */
947     if ((res = gst_base_sink_peer_query (sink, query))) {
948       /* get upstream min and max latency */
949       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
950
951       if (us_live) {
952         /* upstream live, use its latency, subclasses should use these
953          * values to create the complete latency. */
954         min = us_min;
955         max = us_max;
956       }
957       if (l) {
958         /* we need to add the render delay if we are live */
959         if (min != -1)
960           min += render_delay;
961         if (max != -1)
962           max += render_delay;
963       }
964     }
965     gst_query_unref (query);
966   } else {
967     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
968     res = FALSE;
969   }
970
971   /* not live, we tried to do the query, if it failed we return TRUE anyway */
972   if (!res) {
973     if (!l) {
974       res = TRUE;
975       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
976     } else {
977       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
978     }
979   }
980
981   if (res) {
982     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
983         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
984         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
985
986     if (live)
987       *live = l;
988     if (upstream_live)
989       *upstream_live = us_live;
990     if (min_latency)
991       *min_latency = min;
992     if (max_latency)
993       *max_latency = max;
994   }
995   return res;
996 }
997
998 /**
999  * gst_base_sink_set_render_delay:
1000  * @sink: a #GstBaseSink
1001  * @delay: the new delay
1002  *
1003  * Set the render delay in @sink to @delay. The render delay is the time 
1004  * between actual rendering of a buffer and its synchronisation time. Some
1005  * devices might delay media rendering which can be compensated for with this
1006  * function. 
1007  *
1008  * After calling this function, this sink will report additional latency and
1009  * other sinks will adjust their latency to delay the rendering of their media.
1010  *
1011  * This function is usually called by subclasses.
1012  *
1013  * Since: 0.10.21
1014  */
1015 void
1016 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1017 {
1018   g_return_if_fail (GST_IS_BASE_SINK (sink));
1019
1020   GST_OBJECT_LOCK (sink);
1021   sink->priv->render_delay = delay;
1022   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1023       GST_TIME_ARGS (delay));
1024   GST_OBJECT_UNLOCK (sink);
1025 }
1026
1027 /**
1028  * gst_base_sink_get_render_delay:
1029  * @sink: a #GstBaseSink
1030  *
1031  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1032  * information about the render delay.
1033  *
1034  * Returns: the render delay of @sink.
1035  *
1036  * Since: 0.10.21
1037  */
1038 GstClockTime
1039 gst_base_sink_get_render_delay (GstBaseSink * sink)
1040 {
1041   GstClockTimeDiff res;
1042
1043   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1044
1045   GST_OBJECT_LOCK (sink);
1046   res = sink->priv->render_delay;
1047   GST_OBJECT_UNLOCK (sink);
1048
1049   return res;
1050 }
1051
1052 /**
1053  * gst_base_sink_set_blocksize:
1054  * @sink: a #GstBaseSink
1055  * @blocksize: the blocksize in bytes
1056  *
1057  * Set the number of bytes that the sink will pull when it is operating in pull
1058  * mode.
1059  *
1060  * Since: 0.10.22
1061  */
1062 void
1063 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1064 {
1065   g_return_if_fail (GST_IS_BASE_SINK (sink));
1066
1067   GST_OBJECT_LOCK (sink);
1068   sink->priv->blocksize = blocksize;
1069   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1070   GST_OBJECT_UNLOCK (sink);
1071 }
1072
1073 /**
1074  * gst_base_sink_get_blocksize:
1075  * @sink: a #GstBaseSink
1076  *
1077  * Get the number of bytes that the sink will pull when it is operating in pull
1078  * mode.
1079  *
1080  * Returns: the number of bytes @sink will pull in pull mode.
1081  *
1082  * Since: 0.10.22
1083  */
1084 guint
1085 gst_base_sink_get_blocksize (GstBaseSink * sink)
1086 {
1087   guint res;
1088
1089   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1090
1091   GST_OBJECT_LOCK (sink);
1092   res = sink->priv->blocksize;
1093   GST_OBJECT_UNLOCK (sink);
1094
1095   return res;
1096 }
1097
1098 static void
1099 gst_base_sink_set_property (GObject * object, guint prop_id,
1100     const GValue * value, GParamSpec * pspec)
1101 {
1102   GstBaseSink *sink = GST_BASE_SINK (object);
1103
1104   switch (prop_id) {
1105     case PROP_PREROLL_QUEUE_LEN:
1106       /* preroll lock necessary to serialize with finish_preroll */
1107       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1108       sink->preroll_queue_max_len = g_value_get_uint (value);
1109       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1110       break;
1111     case PROP_SYNC:
1112       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1113       break;
1114     case PROP_MAX_LATENESS:
1115       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1116       break;
1117     case PROP_QOS:
1118       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1119       break;
1120     case PROP_ASYNC:
1121       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1122       break;
1123     case PROP_TS_OFFSET:
1124       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1125       break;
1126     case PROP_BLOCKSIZE:
1127       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1128       break;
1129     default:
1130       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1131       break;
1132   }
1133 }
1134
1135 static void
1136 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1137     GParamSpec * pspec)
1138 {
1139   GstBaseSink *sink = GST_BASE_SINK (object);
1140
1141   switch (prop_id) {
1142     case PROP_PREROLL_QUEUE_LEN:
1143       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1144       g_value_set_uint (value, sink->preroll_queue_max_len);
1145       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1146       break;
1147     case PROP_SYNC:
1148       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1149       break;
1150     case PROP_MAX_LATENESS:
1151       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1152       break;
1153     case PROP_QOS:
1154       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1155       break;
1156     case PROP_ASYNC:
1157       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1158       break;
1159     case PROP_TS_OFFSET:
1160       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1161       break;
1162     case PROP_LAST_BUFFER:
1163       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1164       break;
1165     case PROP_BLOCKSIZE:
1166       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1167       break;
1168     default:
1169       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1170       break;
1171   }
1172 }
1173
1174
1175 static GstCaps *
1176 gst_base_sink_get_caps (GstBaseSink * sink)
1177 {
1178   return NULL;
1179 }
1180
1181 static gboolean
1182 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1183 {
1184   return TRUE;
1185 }
1186
1187 static GstFlowReturn
1188 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1189     GstCaps * caps, GstBuffer ** buf)
1190 {
1191   *buf = NULL;
1192   return GST_FLOW_OK;
1193 }
1194
1195 /* with PREROLL_LOCK, STREAM_LOCK */
1196 static void
1197 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1198 {
1199   GstMiniObject *obj;
1200
1201   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1202   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1203     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1204     gst_mini_object_unref (obj);
1205   }
1206   /* we can't have EOS anymore now */
1207   basesink->eos = FALSE;
1208   basesink->priv->received_eos = FALSE;
1209   basesink->have_preroll = FALSE;
1210   basesink->eos_queued = FALSE;
1211   basesink->preroll_queued = 0;
1212   basesink->buffers_queued = 0;
1213   basesink->events_queued = 0;
1214   /* can't report latency anymore until we preroll again */
1215   if (basesink->priv->async_enabled) {
1216     GST_OBJECT_LOCK (basesink);
1217     basesink->priv->have_latency = FALSE;
1218     GST_OBJECT_UNLOCK (basesink);
1219   }
1220   /* and signal any waiters now */
1221   GST_PAD_PREROLL_SIGNAL (pad);
1222 }
1223
1224 /* with STREAM_LOCK, configures given segment with the event information. */
1225 static void
1226 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1227     GstEvent * event, GstSegment * segment)
1228 {
1229   gboolean update;
1230   gdouble rate, arate;
1231   GstFormat format;
1232   gint64 start;
1233   gint64 stop;
1234   gint64 time;
1235
1236   /* the newsegment event is needed to bring the buffer timestamps to the
1237    * stream time and to drop samples outside of the playback segment. */
1238   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1239       &start, &stop, &time);
1240
1241   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1242    * We protect with the OBJECT_LOCK so that we can use the values to
1243    * safely answer a POSITION query. */
1244   GST_OBJECT_LOCK (basesink);
1245   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1246       stop, time);
1247
1248   if (format == GST_FORMAT_TIME) {
1249     GST_DEBUG_OBJECT (basesink,
1250         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1251         "format GST_FORMAT_TIME, "
1252         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1253         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1254         update, rate, arate, GST_TIME_ARGS (segment->start),
1255         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1256         GST_TIME_ARGS (segment->accum));
1257   } else {
1258     GST_DEBUG_OBJECT (basesink,
1259         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1260         "format %d, "
1261         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1262         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1263         segment->format, segment->start, segment->stop, segment->time,
1264         segment->accum);
1265   }
1266   GST_OBJECT_UNLOCK (basesink);
1267 }
1268
1269 /* with PREROLL_LOCK, STREAM_LOCK */
1270 static gboolean
1271 gst_base_sink_commit_state (GstBaseSink * basesink)
1272 {
1273   /* commit state and proceed to next pending state */
1274   GstState current, next, pending, post_pending;
1275   gboolean post_paused = FALSE;
1276   gboolean post_async_done = FALSE;
1277   gboolean post_playing = FALSE;
1278   gboolean sync;
1279
1280   /* we are certainly not playing async anymore now */
1281   basesink->playing_async = FALSE;
1282
1283   GST_OBJECT_LOCK (basesink);
1284   current = GST_STATE (basesink);
1285   next = GST_STATE_NEXT (basesink);
1286   pending = GST_STATE_PENDING (basesink);
1287   post_pending = pending;
1288   sync = basesink->sync;
1289
1290   switch (pending) {
1291     case GST_STATE_PLAYING:
1292     {
1293       GstBaseSinkClass *bclass;
1294       GstStateChangeReturn ret;
1295
1296       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1297
1298       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1299
1300       basesink->need_preroll = FALSE;
1301       post_async_done = TRUE;
1302       basesink->priv->commited = TRUE;
1303       post_playing = TRUE;
1304       /* post PAUSED too when we were READY */
1305       if (current == GST_STATE_READY) {
1306         post_paused = TRUE;
1307       }
1308
1309       /* make sure we notify the subclass of async playing */
1310       if (bclass->async_play) {
1311         ret = bclass->async_play (basesink);
1312         if (ret == GST_STATE_CHANGE_FAILURE)
1313           goto async_failed;
1314       }
1315       break;
1316     }
1317     case GST_STATE_PAUSED:
1318       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1319       post_paused = TRUE;
1320       post_async_done = TRUE;
1321       basesink->priv->commited = TRUE;
1322       post_pending = GST_STATE_VOID_PENDING;
1323       break;
1324     case GST_STATE_READY:
1325     case GST_STATE_NULL:
1326       goto stopping;
1327     case GST_STATE_VOID_PENDING:
1328       goto nothing_pending;
1329     default:
1330       break;
1331   }
1332
1333   /* we can report latency queries now */
1334   basesink->priv->have_latency = TRUE;
1335
1336   GST_STATE (basesink) = pending;
1337   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1338   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1339   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1340   GST_OBJECT_UNLOCK (basesink);
1341
1342   if (post_paused) {
1343     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1344     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1345         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1346             current, next, post_pending));
1347   }
1348   if (post_async_done) {
1349     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1350     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1351         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1352   }
1353   if (post_playing) {
1354     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1355     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1356         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1357             next, pending, GST_STATE_VOID_PENDING));
1358   }
1359
1360   GST_STATE_BROADCAST (basesink);
1361
1362   return TRUE;
1363
1364 nothing_pending:
1365   {
1366     /* Depending on the state, set our vars. We get in this situation when the
1367      * state change function got a change to update the state vars before the
1368      * streaming thread did. This is fine but we need to make sure that we
1369      * update the need_preroll var since it was TRUE when we got here and might
1370      * become FALSE if we got to PLAYING. */
1371     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1372         gst_element_state_get_name (current));
1373     switch (current) {
1374       case GST_STATE_PLAYING:
1375         basesink->need_preroll = FALSE;
1376         break;
1377       case GST_STATE_PAUSED:
1378         basesink->need_preroll = TRUE;
1379         break;
1380       default:
1381         basesink->need_preroll = FALSE;
1382         basesink->flushing = TRUE;
1383         break;
1384     }
1385     /* we can report latency queries now */
1386     basesink->priv->have_latency = TRUE;
1387     GST_OBJECT_UNLOCK (basesink);
1388     return TRUE;
1389   }
1390 stopping:
1391   {
1392     /* app is going to READY */
1393     GST_DEBUG_OBJECT (basesink, "stopping");
1394     basesink->need_preroll = FALSE;
1395     basesink->flushing = TRUE;
1396     GST_OBJECT_UNLOCK (basesink);
1397     return FALSE;
1398   }
1399 async_failed:
1400   {
1401     GST_DEBUG_OBJECT (basesink, "async commit failed");
1402     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1403     GST_OBJECT_UNLOCK (basesink);
1404     return FALSE;
1405   }
1406 }
1407
1408
1409 /* with STREAM_LOCK, PREROLL_LOCK
1410  *
1411  * Returns TRUE if the object needs synchronisation and takes therefore
1412  * part in prerolling.
1413  *
1414  * rsstart/rsstop contain the start/stop in stream time.
1415  * rrstart/rrstop contain the start/stop in running time.
1416  */
1417 static gboolean
1418 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1419     GstClockTime * rsstart, GstClockTime * rsstop,
1420     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1421     GstSegment * segment)
1422 {
1423   GstBaseSinkClass *bclass;
1424   GstBuffer *buffer;
1425   GstClockTime start, stop;     /* raw start/stop timestamps */
1426   gint64 cstart, cstop;         /* clipped raw timestamps */
1427   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1428   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1429   GstFormat format;
1430   GstBaseSinkPrivate *priv;
1431
1432   priv = basesink->priv;
1433
1434   /* start with nothing */
1435   start = stop = sstart = sstop = rstart = rstop = -1;
1436
1437   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1438     GstEvent *event = GST_EVENT_CAST (obj);
1439
1440     switch (GST_EVENT_TYPE (event)) {
1441         /* EOS event needs syncing */
1442       case GST_EVENT_EOS:
1443       {
1444         if (basesink->segment.rate >= 0.0) {
1445           sstart = sstop = priv->current_sstop;
1446           if (sstart == -1) {
1447             /* we have not seen a buffer yet, use the segment values */
1448             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1449                 basesink->segment.format, basesink->segment.stop);
1450           }
1451         } else {
1452           sstart = sstop = priv->current_sstart;
1453           if (sstart == -1) {
1454             /* we have not seen a buffer yet, use the segment values */
1455             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1456                 basesink->segment.format, basesink->segment.start);
1457           }
1458         }
1459
1460         rstart = rstop = priv->eos_rtime;
1461         *do_sync = rstart != -1;
1462         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1463             GST_TIME_ARGS (rstart));
1464         goto done;
1465       }
1466       default:
1467         /* other events do not need syncing */
1468         /* FIXME, maybe NEWSEGMENT might need synchronisation
1469          * since the POSITION query depends on accumulated times and
1470          * we cannot accumulate the current segment before the previous
1471          * one completed.
1472          */
1473         return FALSE;
1474     }
1475   }
1476
1477   /* else do buffer sync code */
1478   buffer = GST_BUFFER_CAST (obj);
1479
1480   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1481
1482   /* just get the times to see if we need syncing */
1483   if (bclass->get_times)
1484     bclass->get_times (basesink, buffer, &start, &stop);
1485
1486   if (start == -1) {
1487     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1488     *do_sync = FALSE;
1489   } else {
1490     *do_sync = TRUE;
1491   }
1492
1493   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1494       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1495       GST_TIME_ARGS (stop), *do_sync);
1496
1497   /* collect segment and format for code clarity */
1498   format = segment->format;
1499
1500   /* no timestamp clipping if we did not * get a TIME segment format */
1501   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1502     cstart = start;
1503     cstop = stop;
1504     /* do running and stream time in TIME format */
1505     format = GST_FORMAT_TIME;
1506     goto do_times;
1507   }
1508
1509   /* clip */
1510   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1511               (gint64) start, (gint64) stop, &cstart, &cstop)))
1512     goto out_of_segment;
1513
1514   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1515     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1516         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1517         GST_TIME_ARGS (cstop));
1518   }
1519
1520   /* set last stop position */
1521   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1522     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1523   else
1524     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1525
1526 do_times:
1527   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1528    * upstream is behaving very badly */
1529   sstart = gst_segment_to_stream_time (segment, format, cstart);
1530   sstop = gst_segment_to_stream_time (segment, format, cstop);
1531   rstart = gst_segment_to_running_time (segment, format, cstart);
1532   rstop = gst_segment_to_running_time (segment, format, cstop);
1533
1534 done:
1535   /* save times */
1536   *rsstart = sstart;
1537   *rsstop = sstop;
1538   *rrstart = rstart;
1539   *rrstop = rstop;
1540
1541   /* buffers and EOS always need syncing and preroll */
1542   return TRUE;
1543
1544   /* special cases */
1545 out_of_segment:
1546   {
1547     /* should not happen since we clip them in the chain function already, 
1548      * we return FALSE so that we don't try to sync on it. */
1549     GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
1550         (NULL), ("unexpected buffer out of segment found."));
1551     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1552     return FALSE;
1553   }
1554 }
1555
1556 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1557  * adjust a timestamp with the latency and timestamp offset */
1558 static GstClockTime
1559 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1560 {
1561   GstClockTimeDiff ts_offset;
1562
1563   /* don't do anything funny with invalid timestamps */
1564   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1565     return time;
1566
1567   time += basesink->priv->latency;
1568
1569   /* apply offset, be carefull for underflows */
1570   ts_offset = basesink->priv->ts_offset;
1571   if (ts_offset < 0) {
1572     ts_offset = -ts_offset;
1573     if (ts_offset < time)
1574       time -= ts_offset;
1575     else
1576       time = 0;
1577   } else
1578     time += ts_offset;
1579
1580   return time;
1581 }
1582
1583 /* gst_base_sink_wait_clock:
1584  * @sink: the sink
1585  * @time: the running_time to be reached
1586  * @jitter: the jitter to be filled with time diff (can be NULL)
1587  *
1588  * This function will block until @time is reached. It is usually called by
1589  * subclasses that use their own internal synchronisation.
1590  *
1591  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1592  * returned. Likewise, if synchronisation is disabled in the element or there
1593  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1594  *
1595  * This function should only be called with the PREROLL_LOCK held, like when
1596  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1597  * the ::render vmethod.
1598  *
1599  * The @time argument should be the running_time of when this method should
1600  * return and is not adjusted with any latency or offset configured in the
1601  * sink.
1602  *
1603  * Since 0.10.20
1604  *
1605  * Returns: #GstClockReturn
1606  */
1607 GstClockReturn
1608 gst_base_sink_wait_clock (GstBaseSink * basesink, GstClockTime time,
1609     GstClockTimeDiff * jitter)
1610 {
1611   GstClockID id;
1612   GstClockReturn ret;
1613   GstClock *clock;
1614
1615   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1616     goto invalid_time;
1617
1618   GST_OBJECT_LOCK (basesink);
1619   if (G_UNLIKELY (!basesink->sync))
1620     goto no_sync;
1621
1622   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
1623     goto no_clock;
1624
1625   /* add base_time to running_time to get the time against the clock */
1626   time += GST_ELEMENT_CAST (basesink)->base_time;
1627
1628   id = gst_clock_new_single_shot_id (clock, time);
1629   GST_OBJECT_UNLOCK (basesink);
1630
1631   /* A blocking wait is performed on the clock. We save the ClockID
1632    * so we can unlock the entry at any time. While we are blocking, we 
1633    * release the PREROLL_LOCK so that other threads can interrupt the
1634    * entry. */
1635   basesink->clock_id = id;
1636   /* release the preroll lock while waiting */
1637   GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1638
1639   ret = gst_clock_id_wait (id, jitter);
1640
1641   GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1642   gst_clock_id_unref (id);
1643   basesink->clock_id = NULL;
1644
1645   return ret;
1646
1647   /* no syncing needed */
1648 invalid_time:
1649   {
1650     GST_DEBUG_OBJECT (basesink, "time not valid, no sync needed");
1651     return GST_CLOCK_BADTIME;
1652   }
1653 no_sync:
1654   {
1655     GST_DEBUG_OBJECT (basesink, "sync disabled");
1656     GST_OBJECT_UNLOCK (basesink);
1657     return GST_CLOCK_BADTIME;
1658   }
1659 no_clock:
1660   {
1661     GST_DEBUG_OBJECT (basesink, "no clock, can't sync");
1662     GST_OBJECT_UNLOCK (basesink);
1663     return GST_CLOCK_BADTIME;
1664   }
1665 }
1666
1667 /**
1668  * gst_base_sink_wait_preroll:
1669  * @sink: the sink
1670  *
1671  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1672  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1673  * this method before continuing to render the remaining data.
1674  *
1675  * This function will block until a state change to PLAYING happens (in which
1676  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1677  * to a state change to READY or a FLUSH event (in which case this function
1678  * returns #GST_FLOW_WRONG_STATE).
1679  *
1680  * This function should only be called with the PREROLL_LOCK held, like in the
1681  * render function.
1682  *
1683  * Since: 0.10.11
1684  *
1685  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1686  * continue. Any other return value should be returned from the render vmethod.
1687  */
1688 GstFlowReturn
1689 gst_base_sink_wait_preroll (GstBaseSink * sink)
1690 {
1691   sink->have_preroll = TRUE;
1692   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1693   /* block until the state changes, or we get a flush, or something */
1694   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1695   sink->have_preroll = FALSE;
1696   if (G_UNLIKELY (sink->flushing))
1697     goto stopping;
1698   GST_DEBUG_OBJECT (sink, "continue after preroll");
1699
1700   return GST_FLOW_OK;
1701
1702   /* ERRORS */
1703 stopping:
1704   {
1705     GST_DEBUG_OBJECT (sink, "preroll interrupted");
1706     return GST_FLOW_WRONG_STATE;
1707   }
1708 }
1709
1710 /**
1711  * gst_base_sink_wait_eos:
1712  * @sink: the sink
1713  * @time: the running_time to be reached
1714  * @jitter: the jitter to be filled with time diff (can be NULL)
1715  *
1716  * This function will block until @time is reached. It is usually called by
1717  * subclasses that use their own internal synchronisation but want to let the
1718  * EOS be handled by the base class.
1719  *
1720  * This function should only be called with the PREROLL_LOCK held, like when
1721  * receiving an EOS event in the ::event vmethod.
1722  *
1723  * The @time argument should be the running_time of when the EOS should happen
1724  * and will be adjusted with any latency and offset configured in the sink.
1725  *
1726  * Since 0.10.15
1727  *
1728  * Returns: #GstFlowReturn
1729  */
1730 GstFlowReturn
1731 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
1732     GstClockTimeDiff * jitter)
1733 {
1734   GstClockReturn status;
1735   GstFlowReturn ret;
1736
1737   do {
1738     GstClockTime stime;
1739
1740     GST_DEBUG_OBJECT (sink, "checking preroll");
1741
1742     /* first wait for the playing state before we can continue */
1743     if (G_UNLIKELY (sink->need_preroll)) {
1744       ret = gst_base_sink_wait_preroll (sink);
1745       if (ret != GST_FLOW_OK)
1746         goto flushing;
1747     }
1748
1749     /* preroll done, we can sync since we are in PLAYING now. */
1750     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
1751         GST_TIME_FORMAT, GST_TIME_ARGS (time));
1752
1753     /* compensate for latency and ts_offset. We don't adjust for render delay
1754      * because we don't interact with the device on EOS normally. */
1755     stime = gst_base_sink_adjust_time (sink, time);
1756
1757     /* wait for the clock, this can be interrupted because we got shut down or 
1758      * we PAUSED. */
1759     status = gst_base_sink_wait_clock (sink, stime, jitter);
1760
1761     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
1762
1763     /* invalid time, no clock or sync disabled, just continue then */
1764     if (status == GST_CLOCK_BADTIME)
1765       break;
1766
1767     /* waiting could have been interrupted and we can be flushing now */
1768     if (G_UNLIKELY (sink->flushing))
1769       goto flushing;
1770
1771     /* retry if we got unscheduled, which means we did not reach the timeout
1772      * yet. if some other error occures, we continue. */
1773   } while (status == GST_CLOCK_UNSCHEDULED);
1774
1775   GST_DEBUG_OBJECT (sink, "end of stream");
1776
1777   return GST_FLOW_OK;
1778
1779   /* ERRORS */
1780 flushing:
1781   {
1782     GST_DEBUG_OBJECT (sink, "we are flushing");
1783     return GST_FLOW_WRONG_STATE;
1784   }
1785 }
1786
1787 /* with STREAM_LOCK, PREROLL_LOCK
1788  *
1789  * Make sure we are in PLAYING and synchronize an object to the clock.
1790  *
1791  * If we need preroll, we are not in PLAYING. We try to commit the state
1792  * if needed and then block if we still are not PLAYING.
1793  *
1794  * We start waiting on the clock in PLAYING. If we got interrupted, we
1795  * immediatly try to re-preroll.
1796  *
1797  * Some objects do not need synchronisation (most events) and so this function
1798  * immediatly returns GST_FLOW_OK.
1799  *
1800  * for objects that arrive later than max-lateness to be synchronized to the 
1801  * clock have the @late boolean set to TRUE.
1802  *
1803  * This function keeps a running average of the jitter (the diff between the
1804  * clock time and the requested sync time). The jitter is negative for
1805  * objects that arrive in time and positive for late buffers.
1806  *
1807  * does not take ownership of obj.
1808  */
1809 static GstFlowReturn
1810 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
1811     GstMiniObject * obj, gboolean * late)
1812 {
1813   GstClockTimeDiff jitter;
1814   gboolean syncable;
1815   GstClockReturn status = GST_CLOCK_OK;
1816   GstClockTime rstart, rstop, sstart, sstop, stime;
1817   gboolean do_sync;
1818   GstBaseSinkPrivate *priv;
1819
1820   priv = basesink->priv;
1821
1822   sstart = sstop = rstart = rstop = -1;
1823   do_sync = TRUE;
1824
1825   priv->current_rstart = -1;
1826
1827   /* get timing information for this object against the render segment */
1828   syncable = gst_base_sink_get_sync_times (basesink, obj,
1829       &sstart, &sstop, &rstart, &rstop, &do_sync, &basesink->segment);
1830
1831   /* a syncable object needs to participate in preroll and
1832    * clocking. All buffers and EOS are syncable. */
1833   if (G_UNLIKELY (!syncable))
1834     goto not_syncable;
1835
1836   /* store timing info for current object */
1837   priv->current_rstart = rstart;
1838   priv->current_rstop = (rstop != -1 ? rstop : rstart);
1839   /* save sync time for eos when the previous object needed sync */
1840   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
1841
1842 again:
1843   /* first do preroll, this makes sure we commit our state
1844    * to PAUSED and can continue to PLAYING. We cannot perform
1845    * any clock sync in PAUSED because there is no clock. 
1846    */
1847   while (G_UNLIKELY (basesink->need_preroll)) {
1848     GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
1849
1850     if (G_LIKELY (basesink->playing_async)) {
1851       /* commit state */
1852       if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
1853         goto stopping;
1854     }
1855
1856     /* need to recheck here because the commit state could have
1857      * made us not need the preroll anymore */
1858     if (G_LIKELY (basesink->need_preroll)) {
1859       /* block until the state changes, or we get a flush, or something */
1860       if (gst_base_sink_wait_preroll (basesink) != GST_FLOW_OK)
1861         goto flushing;
1862     }
1863   }
1864
1865   /* After rendering we store the position of the last buffer so that we can use
1866    * it to report the position. We need to take the lock here. */
1867   GST_OBJECT_LOCK (basesink);
1868   priv->current_sstart = sstart;
1869   priv->current_sstop = (sstop != -1 ? sstop : sstart);
1870   GST_OBJECT_UNLOCK (basesink);
1871
1872   if (!do_sync)
1873     goto done;
1874
1875   /* adjust for latency */
1876   stime = gst_base_sink_adjust_time (basesink, rstart);
1877
1878   /* adjust for render-delay, avoid underflows */
1879   if (stime != -1) {
1880     if (stime > priv->render_delay)
1881       stime -= priv->render_delay;
1882     else
1883       stime = 0;
1884   }
1885
1886   /* preroll done, we can sync since we are in PLAYING now. */
1887   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
1888       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
1889       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
1890
1891   /* This function will return immediatly if start == -1, no clock
1892    * or sync is disabled with GST_CLOCK_BADTIME. */
1893   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
1894
1895   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
1896
1897   /* invalid time, no clock or sync disabled, just render */
1898   if (status == GST_CLOCK_BADTIME)
1899     goto done;
1900
1901   /* waiting could have been interrupted and we can be flushing now */
1902   if (G_UNLIKELY (basesink->flushing))
1903     goto flushing;
1904
1905   /* check for unlocked by a state change, we are not flushing so
1906    * we can try to preroll on the current buffer. */
1907   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
1908     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
1909     goto again;
1910   }
1911
1912   /* successful syncing done, record observation */
1913   priv->current_jitter = jitter;
1914
1915   /* check if the object should be dropped */
1916   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
1917       status, jitter);
1918
1919 done:
1920   return GST_FLOW_OK;
1921
1922   /* ERRORS */
1923 not_syncable:
1924   {
1925     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
1926     return GST_FLOW_OK;
1927   }
1928 flushing:
1929   {
1930     GST_DEBUG_OBJECT (basesink, "we are flushing");
1931     return GST_FLOW_WRONG_STATE;
1932   }
1933 stopping:
1934   {
1935     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
1936     return GST_FLOW_WRONG_STATE;
1937   }
1938 }
1939
1940 static gboolean
1941 gst_base_sink_send_qos (GstBaseSink * basesink,
1942     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
1943 {
1944   GstEvent *event;
1945   gboolean res;
1946
1947   /* generate Quality-of-Service event */
1948   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
1949       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1950       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
1951
1952   event = gst_event_new_qos (proportion, diff, time);
1953
1954   /* send upstream */
1955   res = gst_pad_push_event (basesink->sinkpad, event);
1956
1957   return res;
1958 }
1959
1960 static void
1961 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
1962 {
1963   GstBaseSinkPrivate *priv;
1964   GstClockTime start, stop;
1965   GstClockTimeDiff jitter;
1966   GstClockTime pt, entered, left;
1967   GstClockTime duration;
1968   gdouble rate;
1969
1970   priv = sink->priv;
1971
1972   start = priv->current_rstart;
1973
1974   /* if Quality-of-Service disabled, do nothing */
1975   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
1976     return;
1977
1978   stop = priv->current_rstop;
1979   jitter = priv->current_jitter;
1980
1981   if (jitter < 0) {
1982     /* this is the time the buffer entered the sink */
1983     if (start < -jitter)
1984       entered = 0;
1985     else
1986       entered = start + jitter;
1987     left = start;
1988   } else {
1989     /* this is the time the buffer entered the sink */
1990     entered = start + jitter;
1991     /* this is the time the buffer left the sink */
1992     left = start + jitter;
1993   }
1994
1995   /* calculate duration of the buffer */
1996   if (stop != -1)
1997     duration = stop - start;
1998   else
1999     duration = -1;
2000
2001   /* if we have the time when the last buffer left us, calculate
2002    * processing time */
2003   if (priv->last_left != -1) {
2004     if (entered > priv->last_left) {
2005       pt = entered - priv->last_left;
2006     } else {
2007       pt = 0;
2008     }
2009   } else {
2010     pt = priv->avg_pt;
2011   }
2012
2013   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2014       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2015       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2016       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2017       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2018       jitter);
2019
2020   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2021       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2022       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2023       priv->avg_rate);
2024
2025   /* collect running averages. for first observations, we copy the
2026    * values */
2027   if (priv->avg_duration == -1)
2028     priv->avg_duration = duration;
2029   else
2030     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2031
2032   if (priv->avg_pt == -1)
2033     priv->avg_pt = pt;
2034   else
2035     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2036
2037   if (priv->avg_duration != 0)
2038     rate =
2039         gst_guint64_to_gdouble (priv->avg_pt) /
2040         gst_guint64_to_gdouble (priv->avg_duration);
2041   else
2042     rate = 0.0;
2043
2044   if (priv->last_left != -1) {
2045     if (dropped || priv->avg_rate < 0.0) {
2046       priv->avg_rate = rate;
2047     } else {
2048       if (rate > 1.0)
2049         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2050       else
2051         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2052     }
2053   }
2054
2055   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2056       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2057       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2058       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2059
2060
2061   if (priv->avg_rate >= 0.0) {
2062     /* if we have a valid rate, start sending QoS messages */
2063     if (priv->current_jitter < 0) {
2064       /* make sure we never go below 0 when adding the jitter to the
2065        * timestamp. */
2066       if (priv->current_rstart < -priv->current_jitter)
2067         priv->current_jitter = -priv->current_rstart;
2068     }
2069     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2070         priv->current_jitter);
2071   }
2072
2073   /* record when this buffer will leave us */
2074   priv->last_left = left;
2075 }
2076
2077 /* reset all qos measuring */
2078 static void
2079 gst_base_sink_reset_qos (GstBaseSink * sink)
2080 {
2081   GstBaseSinkPrivate *priv;
2082
2083   priv = sink->priv;
2084
2085   priv->last_in_time = -1;
2086   priv->last_left = -1;
2087   priv->avg_duration = -1;
2088   priv->avg_pt = -1;
2089   priv->avg_rate = -1.0;
2090   priv->avg_render = -1;
2091   priv->rendered = 0;
2092   priv->dropped = 0;
2093
2094 }
2095
2096 /* Checks if the object was scheduled too late.
2097  *
2098  * start/stop contain the raw timestamp start and stop values
2099  * of the object.
2100  *
2101  * status and jitter contain the return values from the clock wait.
2102  *
2103  * returns TRUE if the buffer was too late.
2104  */
2105 static gboolean
2106 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2107     GstClockTime start, GstClockTime stop,
2108     GstClockReturn status, GstClockTimeDiff jitter)
2109 {
2110   gboolean late;
2111   gint64 max_lateness;
2112   GstBaseSinkPrivate *priv;
2113
2114   priv = basesink->priv;
2115
2116   late = FALSE;
2117
2118   /* only for objects that were too late */
2119   if (G_LIKELY (status != GST_CLOCK_EARLY))
2120     goto in_time;
2121
2122   max_lateness = basesink->abidata.ABI.max_lateness;
2123
2124   /* check if frame dropping is enabled */
2125   if (max_lateness == -1)
2126     goto no_drop;
2127
2128   /* only check for buffers */
2129   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2130     goto not_buffer;
2131
2132   /* can't do check if we don't have a timestamp */
2133   if (G_UNLIKELY (start == -1))
2134     goto no_timestamp;
2135
2136   /* we can add a valid stop time */
2137   if (stop != -1)
2138     max_lateness += stop;
2139   else
2140     max_lateness += start;
2141
2142   /* if the jitter bigger than duration and lateness we are too late */
2143   if ((late = start + jitter > max_lateness)) {
2144     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2145         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2146         GST_TIME_ARGS (max_lateness));
2147     /* !!emergency!!, if we did not receive anything valid for more than a 
2148      * second, render it anyway so the user sees something */
2149     if (priv->last_in_time && start - priv->last_in_time > GST_SECOND) {
2150       late = FALSE;
2151       GST_DEBUG_OBJECT (basesink,
2152           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2153           GST_TIME_ARGS (priv->last_in_time));
2154     }
2155   }
2156
2157 done:
2158   if (!late) {
2159     priv->last_in_time = start;
2160   }
2161   return late;
2162
2163   /* all is fine */
2164 in_time:
2165   {
2166     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2167     goto done;
2168   }
2169 no_drop:
2170   {
2171     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2172     goto done;
2173   }
2174 not_buffer:
2175   {
2176     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2177     return FALSE;
2178   }
2179 no_timestamp:
2180   {
2181     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2182     return FALSE;
2183   }
2184 }
2185
2186 /* called before and after calling the render vmethod. It keeps track of how
2187  * much time was spent in the render method and is used to check if we are
2188  * flooded */
2189 static void
2190 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2191 {
2192   GstBaseSinkPrivate *priv;
2193
2194   priv = basesink->priv;
2195
2196   if (start) {
2197     priv->start = gst_util_get_timestamp ();
2198   } else {
2199     GstClockTime elapsed;
2200
2201     priv->stop = gst_util_get_timestamp ();
2202
2203     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2204
2205     if (priv->avg_render == -1)
2206       priv->avg_render = elapsed;
2207     else
2208       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2209
2210     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2211         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2212   }
2213 }
2214
2215 /* with STREAM_LOCK, PREROLL_LOCK,
2216  *
2217  * Synchronize the object on the clock and then render it.
2218  *
2219  * takes ownership of obj.
2220  */
2221 static GstFlowReturn
2222 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2223     GstMiniObject * obj)
2224 {
2225   GstFlowReturn ret = GST_FLOW_OK;
2226   GstBaseSinkClass *bclass;
2227   gboolean late = FALSE;
2228   GstBaseSinkPrivate *priv;
2229
2230   priv = basesink->priv;
2231
2232   /* synchronize this object, non syncable objects return OK
2233    * immediatly. */
2234   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2235   if (G_UNLIKELY (ret != GST_FLOW_OK))
2236     goto sync_failed;
2237
2238   /* and now render, event or buffer. */
2239   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2240     GstBuffer *buf;
2241
2242     /* drop late buffers unconditionally, let's hope it's unlikely */
2243     if (G_UNLIKELY (late))
2244       goto dropped;
2245
2246     buf = GST_BUFFER_CAST (obj);
2247
2248     gst_base_sink_set_last_buffer (basesink, buf);
2249
2250     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2251
2252     if (G_LIKELY (bclass->render)) {
2253       gint do_qos;
2254
2255       /* read once, to get same value before and after */
2256       do_qos = g_atomic_int_get (&priv->qos_enabled);
2257
2258       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2259
2260       /* record rendering time for QoS and stats */
2261       if (do_qos)
2262         gst_base_sink_do_render_stats (basesink, TRUE);
2263
2264       ret = bclass->render (basesink, buf);
2265
2266       priv->rendered++;
2267
2268       if (do_qos)
2269         gst_base_sink_do_render_stats (basesink, FALSE);
2270     }
2271   } else {
2272     GstEvent *event = GST_EVENT_CAST (obj);
2273     gboolean event_res = TRUE;
2274     GstEventType type;
2275
2276     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2277
2278     type = GST_EVENT_TYPE (event);
2279
2280     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2281         gst_event_type_get_name (type));
2282
2283     if (bclass->event)
2284       event_res = bclass->event (basesink, event);
2285
2286     /* when we get here we could be flushing again when the event handler calls
2287      * _wait_eos(). We have to ignore this object in that case. */
2288     if (G_UNLIKELY (basesink->flushing))
2289       goto flushing;
2290
2291     if (G_LIKELY (event_res)) {
2292       switch (type) {
2293         case GST_EVENT_EOS:
2294           /* the EOS event is completely handled so we mark
2295            * ourselves as being in the EOS state. eos is also 
2296            * protected by the object lock so we can read it when 
2297            * answering the POSITION query. */
2298           GST_OBJECT_LOCK (basesink);
2299           basesink->eos = TRUE;
2300           GST_OBJECT_UNLOCK (basesink);
2301           /* ok, now we can post the message */
2302           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2303           gst_element_post_message (GST_ELEMENT_CAST (basesink),
2304               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
2305           break;
2306         case GST_EVENT_NEWSEGMENT:
2307           /* configure the segment */
2308           gst_base_sink_configure_segment (basesink, pad, event,
2309               &basesink->segment);
2310           break;
2311         default:
2312           break;
2313       }
2314     }
2315   }
2316
2317 done:
2318   gst_base_sink_perform_qos (basesink, late);
2319
2320   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2321   gst_mini_object_unref (obj);
2322
2323   return ret;
2324
2325   /* ERRORS */
2326 sync_failed:
2327   {
2328     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2329     goto done;
2330   }
2331 dropped:
2332   {
2333     priv->dropped++;
2334     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2335     goto done;
2336   }
2337 flushing:
2338   {
2339     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2340     gst_mini_object_unref (obj);
2341     return GST_FLOW_WRONG_STATE;
2342   }
2343 }
2344
2345 /* with STREAM_LOCK, PREROLL_LOCK
2346  *
2347  * Perform preroll on the given object. For buffers this means 
2348  * calling the preroll subclass method. 
2349  * If that succeeds, the state will be commited.
2350  *
2351  * function does not take ownership of obj.
2352  */
2353 static GstFlowReturn
2354 gst_base_sink_preroll_object (GstBaseSink * basesink, GstPad * pad,
2355     GstMiniObject * obj)
2356 {
2357   GstFlowReturn ret;
2358
2359   GST_DEBUG_OBJECT (basesink, "do preroll %p", obj);
2360
2361   /* if it's a buffer, we need to call the preroll method */
2362   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2363     GstBaseSinkClass *bclass;
2364     GstBuffer *buf;
2365     GstClockTime timestamp;
2366
2367     buf = GST_BUFFER_CAST (obj);
2368     timestamp = GST_BUFFER_TIMESTAMP (buf);
2369
2370     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2371         GST_TIME_ARGS (timestamp));
2372
2373     gst_base_sink_set_last_buffer (basesink, buf);
2374
2375     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2376     if (bclass->preroll)
2377       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2378         goto preroll_failed;
2379   }
2380
2381   /* commit state */
2382   if (G_LIKELY (basesink->playing_async)) {
2383     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2384       goto stopping;
2385   }
2386
2387   return GST_FLOW_OK;
2388
2389   /* ERRORS */
2390 preroll_failed:
2391   {
2392     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2393     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2394     return ret;
2395   }
2396 stopping:
2397   {
2398     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2399     return GST_FLOW_WRONG_STATE;
2400   }
2401 }
2402
2403 /* with STREAM_LOCK, PREROLL_LOCK 
2404  *
2405  * Queue an object for rendering.
2406  * The first prerollable object queued will complete the preroll. If the
2407  * preroll queue if filled, we render all the objects in the queue.
2408  *
2409  * This function takes ownership of the object.
2410  */
2411 static GstFlowReturn
2412 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2413     GstMiniObject * obj, gboolean prerollable)
2414 {
2415   GstFlowReturn ret = GST_FLOW_OK;
2416   gint length;
2417   GQueue *q;
2418
2419   if (G_UNLIKELY (basesink->need_preroll)) {
2420     if (G_LIKELY (prerollable))
2421       basesink->preroll_queued++;
2422
2423     length = basesink->preroll_queued;
2424
2425     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2426
2427     /* first prerollable item needs to finish the preroll */
2428     if (length == 1) {
2429       ret = gst_base_sink_preroll_object (basesink, pad, obj);
2430       if (G_UNLIKELY (ret != GST_FLOW_OK))
2431         goto preroll_failed;
2432     }
2433     /* need to recheck if we need preroll, commmit state during preroll 
2434      * could have made us not need more preroll. */
2435     if (G_UNLIKELY (basesink->need_preroll)) {
2436       /* see if we can render now, if we can't add the object to the preroll
2437        * queue. */
2438       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2439         goto more_preroll;
2440     }
2441   }
2442
2443   /* we can start rendering (or blocking) the queued object
2444    * if any. */
2445   q = basesink->preroll_queue;
2446   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2447     GstMiniObject *o;
2448
2449     o = g_queue_pop_head (q);
2450     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2451
2452     /* do something with the return value */
2453     ret = gst_base_sink_render_object (basesink, pad, o);
2454     if (ret != GST_FLOW_OK)
2455       goto dequeue_failed;
2456   }
2457
2458   /* now render the object */
2459   ret = gst_base_sink_render_object (basesink, pad, obj);
2460   basesink->preroll_queued = 0;
2461
2462   return ret;
2463
2464   /* special cases */
2465 preroll_failed:
2466   {
2467     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2468         gst_flow_get_name (ret));
2469     gst_mini_object_unref (obj);
2470     return ret;
2471   }
2472 more_preroll:
2473   {
2474     /* add object to the queue and return */
2475     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2476         length, basesink->preroll_queue_max_len);
2477     g_queue_push_tail (basesink->preroll_queue, obj);
2478     return GST_FLOW_OK;
2479   }
2480 dequeue_failed:
2481   {
2482     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2483         gst_flow_get_name (ret));
2484     gst_mini_object_unref (obj);
2485     return ret;
2486   }
2487 }
2488
2489 /* with STREAM_LOCK
2490  *
2491  * This function grabs the PREROLL_LOCK and adds the object to
2492  * the queue.
2493  *
2494  * This function takes ownership of obj.
2495  */
2496 static GstFlowReturn
2497 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2498     GstMiniObject * obj, gboolean prerollable)
2499 {
2500   GstFlowReturn ret;
2501
2502   GST_PAD_PREROLL_LOCK (pad);
2503   if (G_UNLIKELY (basesink->flushing))
2504     goto flushing;
2505
2506   if (G_UNLIKELY (basesink->priv->received_eos))
2507     goto was_eos;
2508
2509   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2510   GST_PAD_PREROLL_UNLOCK (pad);
2511
2512   return ret;
2513
2514   /* ERRORS */
2515 flushing:
2516   {
2517     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2518     GST_PAD_PREROLL_UNLOCK (pad);
2519     gst_mini_object_unref (obj);
2520     return GST_FLOW_WRONG_STATE;
2521   }
2522 was_eos:
2523   {
2524     GST_DEBUG_OBJECT (basesink,
2525         "we are EOS, dropping object, return UNEXPECTED");
2526     GST_PAD_PREROLL_UNLOCK (pad);
2527     gst_mini_object_unref (obj);
2528     return GST_FLOW_UNEXPECTED;
2529   }
2530 }
2531
2532 static gboolean
2533 gst_base_sink_event (GstPad * pad, GstEvent * event)
2534 {
2535   GstBaseSink *basesink;
2536   gboolean result = TRUE;
2537   GstBaseSinkClass *bclass;
2538
2539   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2540
2541   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2542
2543   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2544       GST_EVENT_TYPE_NAME (event));
2545
2546   switch (GST_EVENT_TYPE (event)) {
2547     case GST_EVENT_EOS:
2548     {
2549       GstFlowReturn ret;
2550
2551       GST_PAD_PREROLL_LOCK (pad);
2552       if (G_UNLIKELY (basesink->flushing))
2553         goto flushing;
2554
2555       if (G_UNLIKELY (basesink->priv->received_eos)) {
2556         /* we can't accept anything when we are EOS */
2557         result = FALSE;
2558         gst_event_unref (event);
2559       } else {
2560         /* we set the received EOS flag here so that we can use it when testing if
2561          * we are prerolled and to refure more buffers. */
2562         basesink->priv->received_eos = TRUE;
2563
2564         /* EOS is a prerollable object, we call the unlocked version because it
2565          * does not check the received_eos flag. */
2566         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2567             GST_MINI_OBJECT_CAST (event), TRUE);
2568         if (G_UNLIKELY (ret != GST_FLOW_OK))
2569           result = FALSE;
2570       }
2571       GST_PAD_PREROLL_UNLOCK (pad);
2572       break;
2573     }
2574     case GST_EVENT_NEWSEGMENT:
2575     {
2576       GstFlowReturn ret;
2577
2578       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2579
2580       GST_PAD_PREROLL_LOCK (pad);
2581       if (G_UNLIKELY (basesink->flushing))
2582         goto flushing;
2583
2584       if (G_UNLIKELY (basesink->priv->received_eos)) {
2585         /* we can't accept anything when we are EOS */
2586         result = FALSE;
2587         gst_event_unref (event);
2588       } else {
2589         /* the new segment is a non prerollable item and does not block anything,
2590          * we need to configure the current clipping segment and insert the event 
2591          * in the queue to serialize it with the buffers for rendering. */
2592         gst_base_sink_configure_segment (basesink, pad, event,
2593             basesink->abidata.ABI.clip_segment);
2594
2595         ret =
2596             gst_base_sink_queue_object_unlocked (basesink, pad,
2597             GST_MINI_OBJECT_CAST (event), FALSE);
2598         if (G_UNLIKELY (ret != GST_FLOW_OK))
2599           result = FALSE;
2600         else {
2601           GST_OBJECT_LOCK (basesink);
2602           basesink->have_newsegment = TRUE;
2603           GST_OBJECT_UNLOCK (basesink);
2604         }
2605       }
2606       GST_PAD_PREROLL_UNLOCK (pad);
2607       break;
2608     }
2609     case GST_EVENT_FLUSH_START:
2610       if (bclass->event)
2611         bclass->event (basesink, event);
2612
2613       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2614
2615       /* make sure we are not blocked on the clock also clear any pending
2616        * eos state. */
2617       gst_base_sink_set_flushing (basesink, pad, TRUE);
2618
2619       /* we grab the stream lock but that is not needed since setting the
2620        * sink to flushing would make sure no state commit is being done
2621        * anymore */
2622       GST_PAD_STREAM_LOCK (pad);
2623       gst_base_sink_reset_qos (basesink);
2624       if (basesink->priv->async_enabled) {
2625         /* and we need to commit our state again on the next
2626          * prerolled buffer */
2627         basesink->playing_async = TRUE;
2628         gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2629       } else {
2630         basesink->priv->have_latency = TRUE;
2631         basesink->need_preroll = FALSE;
2632       }
2633       gst_base_sink_set_last_buffer (basesink, NULL);
2634       GST_PAD_STREAM_UNLOCK (pad);
2635
2636       gst_event_unref (event);
2637       break;
2638     case GST_EVENT_FLUSH_STOP:
2639       if (bclass->event)
2640         bclass->event (basesink, event);
2641
2642       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
2643
2644       /* unset flushing so we can accept new data, this also flushes out any EOS
2645        * event. */
2646       gst_base_sink_set_flushing (basesink, pad, FALSE);
2647
2648       /* for position reporting */
2649       GST_OBJECT_LOCK (basesink);
2650       basesink->priv->current_sstart = -1;
2651       basesink->priv->current_sstop = -1;
2652       basesink->priv->eos_rtime = -1;
2653       basesink->have_newsegment = FALSE;
2654       GST_OBJECT_UNLOCK (basesink);
2655
2656       /* we need new segment info after the flush. */
2657       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2658       gst_segment_init (basesink->abidata.ABI.clip_segment,
2659           GST_FORMAT_UNDEFINED);
2660
2661       gst_event_unref (event);
2662       break;
2663     default:
2664       /* other events are sent to queue or subclass depending on if they
2665        * are serialized. */
2666       if (GST_EVENT_IS_SERIALIZED (event)) {
2667         gst_base_sink_queue_object (basesink, pad,
2668             GST_MINI_OBJECT_CAST (event), FALSE);
2669       } else {
2670         if (bclass->event)
2671           bclass->event (basesink, event);
2672         gst_event_unref (event);
2673       }
2674       break;
2675   }
2676 done:
2677   gst_object_unref (basesink);
2678
2679   return result;
2680
2681   /* ERRORS */
2682 flushing:
2683   {
2684     GST_DEBUG_OBJECT (basesink, "we are flushing");
2685     GST_PAD_PREROLL_UNLOCK (pad);
2686     result = FALSE;
2687     gst_event_unref (event);
2688     goto done;
2689   }
2690 }
2691
2692 /* default implementation to calculate the start and end
2693  * timestamps on a buffer, subclasses can override
2694  */
2695 static void
2696 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
2697     GstClockTime * start, GstClockTime * end)
2698 {
2699   GstClockTime timestamp, duration;
2700
2701   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2702   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2703
2704     /* get duration to calculate end time */
2705     duration = GST_BUFFER_DURATION (buffer);
2706     if (GST_CLOCK_TIME_IS_VALID (duration)) {
2707       *end = timestamp + duration;
2708     }
2709     *start = timestamp;
2710   }
2711 }
2712
2713 /* must be called with PREROLL_LOCK */
2714 static gboolean
2715 gst_base_sink_needs_preroll (GstBaseSink * basesink)
2716 {
2717   gboolean is_prerolled, res;
2718
2719   /* we have 2 cases where the PREROLL_LOCK is released:
2720    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
2721    *  2) we are syncing on the clock
2722    */
2723   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
2724   res = !is_prerolled;
2725
2726   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
2727       basesink->have_preroll, basesink->priv->received_eos, res);
2728
2729   return res;
2730 }
2731
2732 /* with STREAM_LOCK, PREROLL_LOCK 
2733  *
2734  * Takes a buffer and compare the timestamps with the last segment.
2735  * If the buffer falls outside of the segment boundaries, drop it.
2736  * Else queue the buffer for preroll and rendering.
2737  *
2738  * This function takes ownership of the buffer.
2739  */
2740 static GstFlowReturn
2741 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
2742     GstBuffer * buf)
2743 {
2744   GstBaseSinkClass *bclass;
2745   GstFlowReturn result;
2746   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
2747   GstSegment *clip_segment;
2748
2749   if (G_UNLIKELY (basesink->flushing))
2750     goto flushing;
2751
2752   if (G_UNLIKELY (basesink->priv->received_eos))
2753     goto was_eos;
2754
2755   /* for code clarity */
2756   clip_segment = basesink->abidata.ABI.clip_segment;
2757
2758   if (G_UNLIKELY (!basesink->have_newsegment)) {
2759     gboolean sync;
2760
2761     sync = gst_base_sink_get_sync (basesink);
2762     if (sync) {
2763       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
2764           (_("Internal data flow problem.")),
2765           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
2766     }
2767
2768     /* this means this sink will assume timestamps start from 0 */
2769     GST_OBJECT_LOCK (basesink);
2770     clip_segment->start = 0;
2771     clip_segment->stop = -1;
2772     basesink->segment.start = 0;
2773     basesink->segment.stop = -1;
2774     basesink->have_newsegment = TRUE;
2775     GST_OBJECT_UNLOCK (basesink);
2776   }
2777
2778   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2779
2780   /* check if the buffer needs to be dropped, we first ask the subclass for the
2781    * start and end */
2782   if (bclass->get_times)
2783     bclass->get_times (basesink, buf, &start, &end);
2784
2785   if (start == -1) {
2786     /* if the subclass does not want sync, we use our own values so that we at
2787      * least clip the buffer to the segment */
2788     gst_base_sink_get_times (basesink, buf, &start, &end);
2789   }
2790
2791   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2792       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
2793
2794   /* a dropped buffer does not participate in anything */
2795   if (GST_CLOCK_TIME_IS_VALID (start) &&
2796       (clip_segment->format == GST_FORMAT_TIME)) {
2797     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
2798                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
2799       goto out_of_segment;
2800   }
2801
2802   /* now we can process the buffer in the queue, this function takes ownership
2803    * of the buffer */
2804   result = gst_base_sink_queue_object_unlocked (basesink, pad,
2805       GST_MINI_OBJECT_CAST (buf), TRUE);
2806
2807   return result;
2808
2809   /* ERRORS */
2810 flushing:
2811   {
2812     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2813     gst_buffer_unref (buf);
2814     return GST_FLOW_WRONG_STATE;
2815   }
2816 was_eos:
2817   {
2818     GST_DEBUG_OBJECT (basesink,
2819         "we are EOS, dropping object, return UNEXPECTED");
2820     gst_buffer_unref (buf);
2821     return GST_FLOW_UNEXPECTED;
2822   }
2823 out_of_segment:
2824   {
2825     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
2826     gst_buffer_unref (buf);
2827     return GST_FLOW_OK;
2828   }
2829 }
2830
2831 /* with STREAM_LOCK
2832  */
2833 static GstFlowReturn
2834 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
2835 {
2836   GstBaseSink *basesink;
2837   GstFlowReturn result;
2838
2839   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2840
2841   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
2842     goto wrong_mode;
2843
2844   GST_PAD_PREROLL_LOCK (pad);
2845   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2846   GST_PAD_PREROLL_UNLOCK (pad);
2847
2848 done:
2849   return result;
2850
2851   /* ERRORS */
2852 wrong_mode:
2853   {
2854     GST_OBJECT_LOCK (pad);
2855     GST_WARNING_OBJECT (basesink,
2856         "Push on pad %s:%s, but it was not activated in push mode",
2857         GST_DEBUG_PAD_NAME (pad));
2858     GST_OBJECT_UNLOCK (pad);
2859     gst_buffer_unref (buf);
2860     /* we don't post an error message this will signal to the peer
2861      * pushing that EOS is reached. */
2862     result = GST_FLOW_UNEXPECTED;
2863     goto done;
2864   }
2865 }
2866
2867 /* with STREAM_LOCK
2868  */
2869 static void
2870 gst_base_sink_loop (GstPad * pad)
2871 {
2872   GstBaseSink *basesink;
2873   GstBuffer *buf = NULL;
2874   GstFlowReturn result;
2875   guint blocksize;
2876
2877   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
2878
2879   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
2880
2881   if ((blocksize = basesink->priv->blocksize) == 0)
2882     blocksize = -1;
2883
2884   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
2885       basesink->offset, blocksize);
2886
2887   result = gst_pad_pull_range (pad, basesink->offset, blocksize, &buf);
2888   if (G_UNLIKELY (result != GST_FLOW_OK))
2889     goto paused;
2890
2891   if (G_UNLIKELY (buf == NULL))
2892     goto no_buffer;
2893
2894   basesink->offset += GST_BUFFER_SIZE (buf);
2895
2896   GST_PAD_PREROLL_LOCK (pad);
2897   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
2898   GST_PAD_PREROLL_UNLOCK (pad);
2899   if (G_UNLIKELY (result != GST_FLOW_OK))
2900     goto paused;
2901
2902   return;
2903
2904   /* ERRORS */
2905 paused:
2906   {
2907     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
2908         gst_flow_get_name (result));
2909     gst_pad_pause_task (pad);
2910     /* fatal errors and NOT_LINKED cause EOS */
2911     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
2912       /* FIXME, we shouldn't post EOS when we are operating in segment mode */
2913       gst_base_sink_event (pad, gst_event_new_eos ());
2914       /* EOS does not cause an ERROR message */
2915       if (result != GST_FLOW_UNEXPECTED) {
2916         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
2917             (_("Internal data stream error.")),
2918             ("stream stopped, reason %s", gst_flow_get_name (result)));
2919       }
2920     }
2921     return;
2922   }
2923 no_buffer:
2924   {
2925     GST_LOG_OBJECT (basesink, "no buffer, pausing");
2926     result = GST_FLOW_ERROR;
2927     goto paused;
2928   }
2929 }
2930
2931 static gboolean
2932 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
2933     gboolean flushing)
2934 {
2935   GstBaseSinkClass *bclass;
2936
2937   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2938
2939   if (flushing) {
2940     /* unlock any subclasses, we need to do this before grabbing the
2941      * PREROLL_LOCK since we hold this lock before going into ::render. */
2942     if (bclass->unlock)
2943       bclass->unlock (basesink);
2944   }
2945
2946   GST_PAD_PREROLL_LOCK (pad);
2947   basesink->flushing = flushing;
2948   if (flushing) {
2949     /* step 1, now that we have the PREROLL lock, clear our unlock request */
2950     if (bclass->unlock_stop)
2951       bclass->unlock_stop (basesink);
2952
2953     /* set need_preroll before we unblock the clock. If the clock is unblocked
2954      * before timing out, we can reuse the buffer for preroll. */
2955     basesink->need_preroll = TRUE;
2956
2957     /* step 2, unblock clock sync (if any) or any other blocking thing */
2958     if (basesink->clock_id) {
2959       gst_clock_id_unschedule (basesink->clock_id);
2960     }
2961
2962     /* flush out the data thread if it's locked in finish_preroll, this will
2963      * also flush out the EOS state */
2964     GST_DEBUG_OBJECT (basesink,
2965         "flushing out data thread, need preroll to TRUE");
2966     gst_base_sink_preroll_queue_flush (basesink, pad);
2967   }
2968   GST_PAD_PREROLL_UNLOCK (pad);
2969
2970   return TRUE;
2971 }
2972
2973 static gboolean
2974 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
2975 {
2976   gboolean result;
2977
2978   if (active) {
2979     /* start task */
2980     result = gst_pad_start_task (basesink->sinkpad,
2981         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
2982   } else {
2983     /* step 2, make sure streaming finishes */
2984     result = gst_pad_stop_task (basesink->sinkpad);
2985   }
2986
2987   return result;
2988 }
2989
2990 static gboolean
2991 gst_base_sink_pad_activate (GstPad * pad)
2992 {
2993   gboolean result = FALSE;
2994   GstBaseSink *basesink;
2995
2996   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2997
2998   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
2999
3000   gst_base_sink_set_flushing (basesink, pad, FALSE);
3001
3002   /* we need to have the pull mode enabled */
3003   if (!basesink->can_activate_pull)
3004     goto fallback;
3005
3006   /* check if downstreams supports pull mode at all */
3007   if (!gst_pad_check_pull_range (pad))
3008     goto fallback;
3009
3010   /* set the pad mode before starting the task so that it's in the
3011    * correct state for the new thread. also the sink set_caps and get_caps
3012    * function checks this */
3013   basesink->pad_mode = GST_ACTIVATE_PULL;
3014
3015   /* we first try to negotiate a format so that when we try to activate
3016    * downstream, it knows about our format */
3017   if (!gst_base_sink_negotiate_pull (basesink))
3018     goto fallback;
3019
3020   /* ok activate now */
3021   if (!gst_pad_activate_pull (pad, TRUE)) {
3022     /* clear any pending caps */
3023     GST_OBJECT_LOCK (basesink);
3024     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3025     GST_OBJECT_UNLOCK (basesink);
3026     goto fallback;
3027   }
3028
3029   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3030   result = TRUE;
3031   goto done;
3032
3033   /* push mode fallback */
3034 fallback:
3035   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3036   if ((result = gst_pad_activate_push (pad, TRUE))) {
3037     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3038   }
3039
3040 done:
3041   if (!result) {
3042     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3043     gst_base_sink_set_flushing (basesink, pad, TRUE);
3044   }
3045
3046   gst_object_unref (basesink);
3047
3048   return result;
3049 }
3050
3051 static gboolean
3052 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3053 {
3054   gboolean result;
3055   GstBaseSink *basesink;
3056
3057   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3058
3059   if (active) {
3060     if (!basesink->can_activate_push) {
3061       result = FALSE;
3062       basesink->pad_mode = GST_ACTIVATE_NONE;
3063     } else {
3064       result = TRUE;
3065       basesink->pad_mode = GST_ACTIVATE_PUSH;
3066     }
3067   } else {
3068     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3069       g_warning ("Internal GStreamer activation error!!!");
3070       result = FALSE;
3071     } else {
3072       gst_base_sink_set_flushing (basesink, pad, TRUE);
3073       result = TRUE;
3074       basesink->pad_mode = GST_ACTIVATE_NONE;
3075     }
3076   }
3077
3078   gst_object_unref (basesink);
3079
3080   return result;
3081 }
3082
3083 static gboolean
3084 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3085 {
3086   GstCaps *caps;
3087   gboolean result;
3088
3089   result = FALSE;
3090
3091   /* this returns the intersection between our caps and the peer caps. If there
3092    * is no peer, it returns NULL and we can't operate in pull mode so we can
3093    * fail the negotiation. */
3094   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3095   if (caps == NULL || gst_caps_is_empty (caps))
3096     goto no_caps_possible;
3097
3098   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3099
3100   caps = gst_caps_make_writable (caps);
3101   /* get the first (prefered) format */
3102   gst_caps_truncate (caps);
3103   /* try to fixate */
3104   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3105
3106   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3107
3108   if (gst_caps_is_any (caps)) {
3109     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3110         "allowing pull()");
3111     /* neither side has template caps in this case, so they are prepared for
3112        pull() without setcaps() */
3113     result = TRUE;
3114   } else if (gst_caps_is_fixed (caps)) {
3115     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3116       goto could_not_set_caps;
3117
3118     GST_OBJECT_LOCK (basesink);
3119     gst_caps_replace (&basesink->priv->pull_caps, caps);
3120     GST_OBJECT_UNLOCK (basesink);
3121
3122     result = TRUE;
3123   }
3124
3125   gst_caps_unref (caps);
3126
3127   return result;
3128
3129 no_caps_possible:
3130   {
3131     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3132     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3133     if (caps)
3134       gst_caps_unref (caps);
3135     return FALSE;
3136   }
3137 could_not_set_caps:
3138   {
3139     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3140     gst_caps_unref (caps);
3141     return FALSE;
3142   }
3143 }
3144
3145 /* this won't get called until we implement an activate function */
3146 static gboolean
3147 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3148 {
3149   gboolean result = FALSE;
3150   GstBaseSink *basesink;
3151   GstBaseSinkClass *bclass;
3152
3153   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3154   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3155
3156   if (active) {
3157     /* we mark we have a newsegment here because pull based
3158      * mode works just fine without having a newsegment before the
3159      * first buffer */
3160     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3161     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3162     GST_OBJECT_LOCK (basesink);
3163     basesink->have_newsegment = TRUE;
3164     GST_OBJECT_UNLOCK (basesink);
3165
3166     if (bclass->activate_pull)
3167       result = bclass->activate_pull (basesink, TRUE);
3168     else
3169       result = FALSE;
3170
3171     /* but if starting the thread fails, set it back */
3172     if (!result)
3173       basesink->pad_mode = GST_ACTIVATE_NONE;
3174   } else {
3175     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3176       g_warning ("Internal GStreamer activation error!!!");
3177       result = FALSE;
3178     } else {
3179       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3180       if (bclass->activate_pull)
3181         result &= bclass->activate_pull (basesink, FALSE);
3182       basesink->pad_mode = GST_ACTIVATE_NONE;
3183       /* clear any pending caps */
3184       GST_OBJECT_LOCK (basesink);
3185       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3186       GST_OBJECT_UNLOCK (basesink);
3187     }
3188   }
3189   gst_object_unref (basesink);
3190
3191   return result;
3192 }
3193
3194 /* send an event to our sinkpad peer. */
3195 static gboolean
3196 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3197 {
3198   GstPad *pad;
3199   GstBaseSink *basesink = GST_BASE_SINK (element);
3200   gboolean forward, result = TRUE;
3201
3202   /* only push UPSTREAM events upstream */
3203   forward = GST_EVENT_IS_UPSTREAM (event);
3204
3205   switch (GST_EVENT_TYPE (event)) {
3206     case GST_EVENT_LATENCY:
3207     {
3208       GstClockTime latency;
3209
3210       gst_event_parse_latency (event, &latency);
3211
3212       /* store the latency. We use this to adjust the running_time before syncing
3213        * it to the clock. */
3214       GST_OBJECT_LOCK (element);
3215       basesink->priv->latency = latency;
3216       GST_OBJECT_UNLOCK (element);
3217       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3218           GST_TIME_ARGS (latency));
3219
3220       /* We forward this event so that all elements know about the global pipeline
3221        * latency. This is interesting for an element when it wants to figure out
3222        * when a particular piece of data will be rendered. */
3223       break;
3224     }
3225     default:
3226       break;
3227   }
3228
3229   if (forward) {
3230     GST_OBJECT_LOCK (element);
3231     pad = gst_object_ref (basesink->sinkpad);
3232     GST_OBJECT_UNLOCK (element);
3233
3234     result = gst_pad_push_event (pad, event);
3235
3236     gst_object_unref (pad);
3237   } else {
3238     /* not forwarded, unref the event */
3239     gst_event_unref (event);
3240   }
3241   return result;
3242 }
3243
3244 static gboolean
3245 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3246 {
3247   GstPad *peer;
3248   gboolean res = FALSE;
3249
3250   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3251     res = gst_pad_query (peer, query);
3252     gst_object_unref (peer);
3253   }
3254   return res;
3255 }
3256
3257 /* get the end position of the last seen object, this is used
3258  * for EOS and for making sure that we don't report a position we
3259  * have not reached yet. */
3260 static gboolean
3261 gst_base_sink_get_position_last (GstBaseSink * basesink, gint64 * cur)
3262 {
3263   /* return last observed stream time */
3264   *cur = basesink->priv->current_sstop;
3265
3266   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
3267       GST_TIME_ARGS (*cur));
3268   return TRUE;
3269 }
3270
3271 /* get the position when we are PAUSED, this is the stream time of the buffer
3272  * that prerolled. If no buffer is prerolled (we are still flushing), this
3273  * value will be -1. */
3274 static gboolean
3275 gst_base_sink_get_position_paused (GstBaseSink * basesink, gint64 * cur)
3276 {
3277   gboolean res;
3278   gint64 time;
3279   GstSegment *segment;
3280
3281   *cur = basesink->priv->current_sstart;
3282   segment = basesink->abidata.ABI.clip_segment;
3283
3284   time = segment->time;
3285
3286   if (*cur != -1) {
3287     *cur = MAX (*cur, time);
3288     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
3289         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
3290   } else {
3291     /* we have no buffer, use the segment times. */
3292     if (segment->rate >= 0.0) {
3293       /* forward, next position is always the time of the segment */
3294       *cur = time;
3295       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
3296           GST_TIME_ARGS (*cur));
3297     } else {
3298       /* reverse, next expected timestamp is segment->stop. We use the function
3299        * to get things right for negative applied_rates. */
3300       *cur =
3301           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, segment->stop);
3302       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
3303           GST_TIME_ARGS (*cur));
3304     }
3305   }
3306   res = (*cur != -1);
3307
3308   return res;
3309 }
3310
3311 static gboolean
3312 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
3313     gint64 * cur, gboolean * upstream)
3314 {
3315   GstClock *clock;
3316   gboolean res = FALSE;
3317
3318   switch (format) {
3319       /* we can answer time format */
3320     case GST_FORMAT_TIME:
3321     {
3322       GstClockTime now, base, latency;
3323       gint64 time, accum, duration;
3324       gdouble rate;
3325       gint64 last;
3326
3327       GST_OBJECT_LOCK (basesink);
3328
3329       /* can only give answer based on the clock if not EOS */
3330       if (G_UNLIKELY (basesink->eos))
3331         goto in_eos;
3332
3333       /* we can only get the segment when we are not NULL or READY */
3334       if (!basesink->have_newsegment)
3335         goto wrong_state;
3336
3337       /* when not in PLAYING or when we're busy with a state change, we
3338        * cannot read from the clock so we report time based on the
3339        * last seen timestamp. */
3340       if (GST_STATE (basesink) != GST_STATE_PLAYING ||
3341           GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
3342         goto in_pause;
3343
3344       /* we need to sync on the clock. */
3345       if (basesink->sync == FALSE)
3346         goto no_sync;
3347
3348       /* and we need a clock */
3349       if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
3350         goto no_sync;
3351
3352       /* collect all data we need holding the lock */
3353       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
3354         time = basesink->segment.time;
3355       else
3356         time = 0;
3357
3358       if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
3359         duration = basesink->segment.stop - basesink->segment.start;
3360       else
3361         duration = 0;
3362
3363       base = GST_ELEMENT_CAST (basesink)->base_time;
3364       accum = basesink->segment.accum;
3365       rate = basesink->segment.rate * basesink->segment.applied_rate;
3366       gst_base_sink_get_position_last (basesink, &last);
3367       latency = basesink->priv->latency;
3368
3369       gst_object_ref (clock);
3370       /* need to release the object lock before we can get the time, 
3371        * a clock might take the LOCK of the provider, which could be
3372        * a basesink subclass. */
3373       GST_OBJECT_UNLOCK (basesink);
3374
3375       now = gst_clock_get_time (clock);
3376
3377       /* subtract base time and accumulated time from the clock time. 
3378        * Make sure we don't go negative. This is the current time in
3379        * the segment which we need to scale with the combined 
3380        * rate and applied rate. */
3381       base += accum;
3382       base += latency;
3383       base = MIN (now, base);
3384
3385       /* for negative rates we need to count back from from the segment
3386        * duration. */
3387       if (rate < 0.0)
3388         time += duration;
3389
3390       *cur = time + gst_guint64_to_gdouble (now - base) * rate;
3391
3392       /* never report more than last seen position */
3393       if (last != -1)
3394         *cur = MIN (last, *cur);
3395
3396       gst_object_unref (clock);
3397
3398       res = TRUE;
3399
3400       GST_DEBUG_OBJECT (basesink,
3401           "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
3402           GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
3403           GST_TIME_ARGS (now), GST_TIME_ARGS (base),
3404           GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
3405       break;
3406     }
3407     default:
3408       /* cannot answer other than TIME, ask to send the query upstream. */
3409       *upstream = TRUE;
3410       break;
3411   }
3412
3413 done:
3414   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
3415       res, GST_TIME_ARGS (*cur));
3416   return res;
3417
3418   /* special cases */
3419 in_eos:
3420   {
3421     GST_DEBUG_OBJECT (basesink, "position in EOS");
3422     res = gst_base_sink_get_position_last (basesink, cur);
3423     GST_OBJECT_UNLOCK (basesink);
3424     goto done;
3425   }
3426 in_pause:
3427   {
3428     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
3429     res = gst_base_sink_get_position_paused (basesink, cur);
3430     GST_OBJECT_UNLOCK (basesink);
3431     goto done;
3432   }
3433 wrong_state:
3434   {
3435     /* in NULL or READY we always return FALSE and -1 */
3436     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
3437     res = FALSE;
3438     *cur = -1;
3439     GST_OBJECT_UNLOCK (basesink);
3440     goto done;
3441   }
3442 no_sync:
3443   {
3444     /* report last seen timestamp if any, else ask upstream to answer */
3445     if ((*cur = basesink->priv->current_sstart) != -1)
3446       res = TRUE;
3447     else
3448       *upstream = TRUE;
3449
3450     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
3451         res, GST_TIME_ARGS (*cur));
3452     GST_OBJECT_UNLOCK (basesink);
3453     return res;
3454   }
3455 }
3456
3457 static gboolean
3458 gst_base_sink_query (GstElement * element, GstQuery * query)
3459 {
3460   gboolean res = FALSE;
3461
3462   GstBaseSink *basesink = GST_BASE_SINK (element);
3463
3464   switch (GST_QUERY_TYPE (query)) {
3465     case GST_QUERY_POSITION:
3466     {
3467       gint64 cur = 0;
3468       GstFormat format;
3469       gboolean upstream = FALSE;
3470
3471       gst_query_parse_position (query, &format, NULL);
3472
3473       GST_DEBUG_OBJECT (basesink, "position format %d", format);
3474
3475       /* first try to get the position based on the clock */
3476       if ((res =
3477               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
3478         gst_query_set_position (query, format, cur);
3479       } else if (upstream) {
3480         /* fallback to peer query */
3481         res = gst_base_sink_peer_query (basesink, query);
3482       }
3483       break;
3484     }
3485     case GST_QUERY_DURATION:
3486       GST_DEBUG_OBJECT (basesink, "duration query");
3487       res = gst_base_sink_peer_query (basesink, query);
3488       break;
3489     case GST_QUERY_LATENCY:
3490     {
3491       gboolean live, us_live;
3492       GstClockTime min, max;
3493
3494       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
3495                   &max))) {
3496         gst_query_set_latency (query, live, min, max);
3497       }
3498       break;
3499     }
3500     case GST_QUERY_JITTER:
3501       break;
3502     case GST_QUERY_RATE:
3503       /* gst_query_set_rate (query, basesink->segment_rate); */
3504       res = TRUE;
3505       break;
3506     case GST_QUERY_SEGMENT:
3507     {
3508       /* FIXME, bring start/stop to stream time */
3509       gst_query_set_segment (query, basesink->segment.rate,
3510           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
3511       break;
3512     }
3513     case GST_QUERY_SEEKING:
3514     case GST_QUERY_CONVERT:
3515     case GST_QUERY_FORMATS:
3516     default:
3517       res = gst_base_sink_peer_query (basesink, query);
3518       break;
3519   }
3520   return res;
3521 }
3522
3523 static GstStateChangeReturn
3524 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
3525 {
3526   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3527   GstBaseSink *basesink = GST_BASE_SINK (element);
3528   GstBaseSinkClass *bclass;
3529   GstBaseSinkPrivate *priv;
3530
3531   priv = basesink->priv;
3532
3533   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3534
3535   switch (transition) {
3536     case GST_STATE_CHANGE_NULL_TO_READY:
3537       if (bclass->start)
3538         if (!bclass->start (basesink))
3539           goto start_failed;
3540       break;
3541     case GST_STATE_CHANGE_READY_TO_PAUSED:
3542       /* need to complete preroll before this state change completes, there
3543        * is no data flow in READY so we can safely assume we need to preroll. */
3544       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3545       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
3546       basesink->have_newsegment = FALSE;
3547       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3548       gst_segment_init (basesink->abidata.ABI.clip_segment,
3549           GST_FORMAT_UNDEFINED);
3550       basesink->offset = 0;
3551       basesink->have_preroll = FALSE;
3552       basesink->need_preroll = TRUE;
3553       basesink->playing_async = TRUE;
3554       priv->current_sstart = -1;
3555       priv->current_sstop = -1;
3556       priv->eos_rtime = -1;
3557       priv->latency = 0;
3558       basesink->eos = FALSE;
3559       priv->received_eos = FALSE;
3560       gst_base_sink_reset_qos (basesink);
3561       priv->commited = FALSE;
3562       if (priv->async_enabled) {
3563         GST_DEBUG_OBJECT (basesink, "doing async state change");
3564         /* when async enabled, post async-start message and return ASYNC from
3565          * the state change function */
3566         ret = GST_STATE_CHANGE_ASYNC;
3567         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3568             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3569       } else {
3570         priv->have_latency = TRUE;
3571       }
3572       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3573       break;
3574     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3575       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3576       if (!gst_base_sink_needs_preroll (basesink)) {
3577         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
3578         /* no preroll needed anymore now. */
3579         basesink->playing_async = FALSE;
3580         basesink->need_preroll = FALSE;
3581         if (basesink->eos) {
3582           /* need to post EOS message here */
3583           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3584           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3585               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
3586         } else {
3587           GST_DEBUG_OBJECT (basesink, "signal preroll");
3588           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
3589         }
3590       } else {
3591         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
3592         basesink->need_preroll = TRUE;
3593         basesink->playing_async = TRUE;
3594         priv->commited = FALSE;
3595         if (priv->async_enabled) {
3596           GST_DEBUG_OBJECT (basesink, "doing async state change");
3597           ret = GST_STATE_CHANGE_ASYNC;
3598           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3599               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
3600         }
3601       }
3602       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3603       break;
3604     default:
3605       break;
3606   }
3607
3608   {
3609     GstStateChangeReturn bret;
3610
3611     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3612     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
3613       goto activate_failed;
3614   }
3615
3616   switch (transition) {
3617     case GST_STATE_CHANGE_READY_TO_PAUSED:
3618 #if 0
3619       /* note that this is the upward case, which doesn't follow most
3620          patterns */
3621       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
3622         GST_DEBUG_OBJECT (basesink, "basesink activated in pull mode, "
3623             "returning SUCCESS directly");
3624         GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3625         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3626             gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
3627         GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3628         ret = GST_STATE_CHANGE_SUCCESS;
3629       }
3630 #endif
3631       break;
3632     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3633       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
3634       /* FIXME, make sure we cannot enter _render first */
3635
3636       /* we need to call ::unlock before locking PREROLL_LOCK
3637        * since we lock it before going into ::render */
3638       if (bclass->unlock)
3639         bclass->unlock (basesink);
3640
3641       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3642       /* now that we have the PREROLL lock, clear our unlock request */
3643       if (bclass->unlock_stop)
3644         bclass->unlock_stop (basesink);
3645
3646       /* we need preroll again and we set the flag before unlocking the clockid
3647        * because if the clockid is unlocked before a current buffer expired, we
3648        * can use that buffer to preroll with */
3649       basesink->need_preroll = TRUE;
3650
3651       if (basesink->clock_id) {
3652         gst_clock_id_unschedule (basesink->clock_id);
3653       }
3654
3655       /* if we don't have a preroll buffer we need to wait for a preroll and
3656        * return ASYNC. */
3657       if (!gst_base_sink_needs_preroll (basesink)) {
3658         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
3659         basesink->playing_async = FALSE;
3660       } else {
3661         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
3662           ret = GST_STATE_CHANGE_SUCCESS;
3663         } else {
3664           GST_DEBUG_OBJECT (basesink,
3665               "PLAYING to PAUSED, we are not prerolled");
3666           basesink->playing_async = TRUE;
3667           priv->commited = FALSE;
3668           if (priv->async_enabled) {
3669             GST_DEBUG_OBJECT (basesink, "doing async state change");
3670             ret = GST_STATE_CHANGE_ASYNC;
3671             gst_element_post_message (GST_ELEMENT_CAST (basesink),
3672                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
3673                     FALSE));
3674           }
3675         }
3676       }
3677       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
3678           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
3679
3680       gst_base_sink_reset_qos (basesink);
3681       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3682       break;
3683     case GST_STATE_CHANGE_PAUSED_TO_READY:
3684       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
3685       /* start by reseting our position state with the object lock so that the
3686        * position query gets the right idea. We do this before we post the
3687        * messages so that the message handlers pick this up. */
3688       GST_OBJECT_LOCK (basesink);
3689       basesink->have_newsegment = FALSE;
3690       priv->current_sstart = -1;
3691       priv->current_sstop = -1;
3692       priv->have_latency = FALSE;
3693       GST_OBJECT_UNLOCK (basesink);
3694
3695       gst_base_sink_set_last_buffer (basesink, NULL);
3696
3697       if (!priv->commited) {
3698         if (priv->async_enabled) {
3699           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
3700
3701           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3702               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
3703                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
3704
3705           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3706               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
3707         }
3708         priv->commited = TRUE;
3709       } else {
3710         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
3711       }
3712       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
3713       break;
3714     case GST_STATE_CHANGE_READY_TO_NULL:
3715       if (bclass->stop) {
3716         if (!bclass->stop (basesink)) {
3717           GST_WARNING_OBJECT (basesink, "failed to stop");
3718         }
3719       }
3720       gst_base_sink_set_last_buffer (basesink, NULL);
3721       break;
3722     default:
3723       break;
3724   }
3725
3726   return ret;
3727
3728   /* ERRORS */
3729 start_failed:
3730   {
3731     GST_DEBUG_OBJECT (basesink, "failed to start");
3732     return GST_STATE_CHANGE_FAILURE;
3733   }
3734 activate_failed:
3735   {
3736     GST_DEBUG_OBJECT (basesink,
3737         "element failed to change states -- activation problem?");
3738     return GST_STATE_CHANGE_FAILURE;
3739   }
3740 }