basesink: do proper clipping in stepping
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
156
157 typedef struct
158 {
159   gboolean valid;               /* if this info is valid */
160   guint32 seqnum;               /* the seqnum of the STEP event */
161   GstFormat format;             /* the format of the amount */
162   guint64 amount;               /* the total amount of data to skip */
163   guint64 position;             /* the position in the stepped data */
164   guint64 duration;             /* the duration in time of the skipped data */
165   guint64 start;                /* running_time of the start */
166   gdouble rate;                 /* rate of skipping */
167   gdouble start_rate;           /* rate before skipping */
168   guint64 start_stop;           /* stop position skipping */
169   gboolean flush;               /* if this was a flushing step */
170   gboolean intermediate;        /* if this is an intermediate step */
171   gboolean need_preroll;        /* if we need preroll after this step */
172 } GstStepInfo;
173
174 /* FIXME, some stuff in ABI.data and other in Private...
175  * Make up your mind please.
176  */
177 struct _GstBaseSinkPrivate
178 {
179   gint qos_enabled;             /* ATOMIC */
180   gboolean async_enabled;
181   GstClockTimeDiff ts_offset;
182   GstClockTime render_delay;
183
184   /* start, stop of current buffer, stream time, used to report position */
185   GstClockTime current_sstart;
186   GstClockTime current_sstop;
187
188   /* start, stop and jitter of current buffer, running time */
189   GstClockTime current_rstart;
190   GstClockTime current_rstop;
191   GstClockTimeDiff current_jitter;
192
193   /* EOS sync time in running time */
194   GstClockTime eos_rtime;
195
196   /* last buffer that arrived in time, running time */
197   GstClockTime last_in_time;
198   /* when the last buffer left the sink, running time */
199   GstClockTime last_left;
200
201   /* running averages go here these are done on running time */
202   GstClockTime avg_pt;
203   GstClockTime avg_duration;
204   gdouble avg_rate;
205
206   /* these are done on system time. avg_jitter and avg_render are
207    * compared to eachother to see if the rendering time takes a
208    * huge amount of the processing, If so we are flooded with
209    * buffers. */
210   GstClockTime last_left_systime;
211   GstClockTime avg_jitter;
212   GstClockTime start, stop;
213   GstClockTime avg_render;
214
215   /* number of rendered and dropped frames */
216   guint64 rendered;
217   guint64 dropped;
218
219   /* latency stuff */
220   GstClockTime latency;
221
222   /* if we already commited the state */
223   gboolean commited;
224
225   /* when we received EOS */
226   gboolean received_eos;
227
228   /* when we are prerolled and able to report latency */
229   gboolean have_latency;
230
231   /* the last buffer we prerolled or rendered. Useful for making snapshots */
232   GstBuffer *last_buffer;
233
234   /* caps for pull based scheduling */
235   GstCaps *pull_caps;
236
237   /* blocksize for pulling */
238   guint blocksize;
239
240   gboolean discont;
241
242   /* seqnum of the stream */
243   guint32 seqnum;
244
245   gboolean call_preroll;
246   gboolean step_unlock;
247
248   /* we have a pending and a current step operation */
249   GstStepInfo current_step;
250   GstStepInfo pending_step;
251 };
252
253 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
254
255 /* generic running average, this has a neutral window size */
256 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
257
258 /* the windows for these running averages are experimentally obtained.
259  * possitive values get averaged more while negative values use a small
260  * window so we can react faster to badness. */
261 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
262 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
263
264 /* BaseSink properties */
265
266 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
267 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
268
269 #define DEFAULT_PREROLL_QUEUE_LEN       0
270 #define DEFAULT_SYNC                    TRUE
271 #define DEFAULT_MAX_LATENESS            -1
272 #define DEFAULT_QOS                     FALSE
273 #define DEFAULT_ASYNC                   TRUE
274 #define DEFAULT_TS_OFFSET               0
275 #define DEFAULT_BLOCKSIZE               4096
276 #define DEFAULT_RENDER_DELAY            0
277
278 enum
279 {
280   PROP_0,
281   PROP_PREROLL_QUEUE_LEN,
282   PROP_SYNC,
283   PROP_MAX_LATENESS,
284   PROP_QOS,
285   PROP_ASYNC,
286   PROP_TS_OFFSET,
287   PROP_LAST_BUFFER,
288   PROP_BLOCKSIZE,
289   PROP_RENDER_DELAY,
290   PROP_LAST
291 };
292
293 static GstElementClass *parent_class = NULL;
294
295 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
296 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
297 static void gst_base_sink_finalize (GObject * object);
298
299 GType
300 gst_base_sink_get_type (void)
301 {
302   static volatile gsize base_sink_type = 0;
303
304   if (g_once_init_enter (&base_sink_type)) {
305     GType _type;
306     static const GTypeInfo base_sink_info = {
307       sizeof (GstBaseSinkClass),
308       NULL,
309       NULL,
310       (GClassInitFunc) gst_base_sink_class_init,
311       NULL,
312       NULL,
313       sizeof (GstBaseSink),
314       0,
315       (GInstanceInitFunc) gst_base_sink_init,
316     };
317
318     _type = g_type_register_static (GST_TYPE_ELEMENT,
319         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
320     g_once_init_leave (&base_sink_type, _type);
321   }
322   return base_sink_type;
323 }
324
325 static void gst_base_sink_set_property (GObject * object, guint prop_id,
326     const GValue * value, GParamSpec * pspec);
327 static void gst_base_sink_get_property (GObject * object, guint prop_id,
328     GValue * value, GParamSpec * pspec);
329
330 static gboolean gst_base_sink_send_event (GstElement * element,
331     GstEvent * event);
332 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
333
334 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
335 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
336 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
337     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
338 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
339     GstClockTime * start, GstClockTime * end);
340 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
341     GstPad * pad, gboolean flushing);
342 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
343     gboolean active);
344 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
345     GstSegment * segment);
346 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
347     GstEvent * event, GstSegment * segment);
348
349 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
350     GstStateChange transition);
351
352 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
353 static void gst_base_sink_loop (GstPad * pad);
354 static gboolean gst_base_sink_pad_activate (GstPad * pad);
355 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
356 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
357 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
358 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
359
360 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
361
362 /* check if an object was too late */
363 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
364     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
365     GstClockReturn status, GstClockTimeDiff jitter);
366 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
367     GstMiniObject * obj);
368
369 static void
370 gst_base_sink_class_init (GstBaseSinkClass * klass)
371 {
372   GObjectClass *gobject_class;
373   GstElementClass *gstelement_class;
374
375   gobject_class = G_OBJECT_CLASS (klass);
376   gstelement_class = GST_ELEMENT_CLASS (klass);
377
378   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
379       "basesink element");
380
381   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
382
383   parent_class = g_type_class_peek_parent (klass);
384
385   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
386   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
387   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
388
389   /* FIXME, this next value should be configured using an event from the
390    * upstream element, ie, the BUFFER_SIZE event. */
391   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
392       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
393           "Number of buffers to queue during preroll", 0, G_MAXUINT,
394           DEFAULT_PREROLL_QUEUE_LEN,
395           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
396
397   g_object_class_install_property (gobject_class, PROP_SYNC,
398       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
399           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400
401   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
402       g_param_spec_int64 ("max-lateness", "Max Lateness",
403           "Maximum number of nanoseconds that a buffer can be late before it "
404           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
405           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
406
407   g_object_class_install_property (gobject_class, PROP_QOS,
408       g_param_spec_boolean ("qos", "Qos",
409           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
410           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411   /**
412    * GstBaseSink:async
413    *
414    * If set to #TRUE, the basesink will perform asynchronous state changes.
415    * When set to #FALSE, the sink will not signal the parent when it prerolls.
416    * Use this option when dealing with sparse streams or when synchronisation is
417    * not required.
418    *
419    * Since: 0.10.15
420    */
421   g_object_class_install_property (gobject_class, PROP_ASYNC,
422       g_param_spec_boolean ("async", "Async",
423           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
424           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
425   /**
426    * GstBaseSink:ts-offset
427    *
428    * Controls the final synchronisation, a negative value will render the buffer
429    * earlier while a positive value delays playback. This property can be 
430    * used to fix synchronisation in bad files.
431    *
432    * Since: 0.10.15
433    */
434   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
435       g_param_spec_int64 ("ts-offset", "TS Offset",
436           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
437           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438   /**
439    * GstBaseSink:last-buffer
440    *
441    * The last buffer that arrived in the sink and was used for preroll or for
442    * rendering. This property can be used to generate thumbnails. This property
443    * can be NULL when the sink has not yet received a bufer.
444    *
445    * Since: 0.10.15
446    */
447   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
448       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
449           "The last buffer received in the sink", GST_TYPE_BUFFER,
450           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstBaseSink:blocksize
453    *
454    * The amount of bytes to pull when operating in pull mode.
455    *
456    * Since: 0.10.22
457    */
458   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
459       g_param_spec_uint ("blocksize", "Block size",
460           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
461           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
462   /**
463    * GstBaseSink:render-delay
464    *
465    * The additional delay between synchronisation and actual rendering of the
466    * media. This property will add additional latency to the device in order to
467    * make other sinks compensate for the delay.
468    *
469    * Since: 0.10.22
470    */
471   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
472       g_param_spec_uint64 ("render-delay", "Render Delay",
473           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
474           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
475
476   gstelement_class->change_state =
477       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
478   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
479   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
480
481   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
482   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
483   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
484   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
485   klass->activate_pull =
486       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
487 }
488
489 static GstCaps *
490 gst_base_sink_pad_getcaps (GstPad * pad)
491 {
492   GstBaseSinkClass *bclass;
493   GstBaseSink *bsink;
494   GstCaps *caps = NULL;
495
496   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
497   bclass = GST_BASE_SINK_GET_CLASS (bsink);
498
499   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
500     /* if we are operating in pull mode we only accept the negotiated caps */
501     GST_OBJECT_LOCK (pad);
502     if ((caps = GST_PAD_CAPS (pad)))
503       gst_caps_ref (caps);
504     GST_OBJECT_UNLOCK (pad);
505   }
506   if (caps == NULL) {
507     if (bclass->get_caps)
508       caps = bclass->get_caps (bsink);
509
510     if (caps == NULL) {
511       GstPadTemplate *pad_template;
512
513       pad_template =
514           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
515           "sink");
516       if (pad_template != NULL) {
517         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
518       }
519     }
520   }
521   gst_object_unref (bsink);
522
523   return caps;
524 }
525
526 static gboolean
527 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
528 {
529   GstBaseSinkClass *bclass;
530   GstBaseSink *bsink;
531   gboolean res = TRUE;
532
533   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
534   bclass = GST_BASE_SINK_GET_CLASS (bsink);
535
536   if (res && bclass->set_caps)
537     res = bclass->set_caps (bsink, caps);
538
539   gst_object_unref (bsink);
540
541   return res;
542 }
543
544 static void
545 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
546 {
547   GstBaseSinkClass *bclass;
548   GstBaseSink *bsink;
549
550   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
551   bclass = GST_BASE_SINK_GET_CLASS (bsink);
552
553   if (bclass->fixate)
554     bclass->fixate (bsink, caps);
555
556   gst_object_unref (bsink);
557 }
558
559 static GstFlowReturn
560 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
561     GstCaps * caps, GstBuffer ** buf)
562 {
563   GstBaseSinkClass *bclass;
564   GstBaseSink *bsink;
565   GstFlowReturn result = GST_FLOW_OK;
566
567   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
568   bclass = GST_BASE_SINK_GET_CLASS (bsink);
569
570   if (bclass->buffer_alloc)
571     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
572   else
573     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
574
575   gst_object_unref (bsink);
576
577   return result;
578 }
579
580 static void
581 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
582 {
583   GstPadTemplate *pad_template;
584   GstBaseSinkPrivate *priv;
585
586   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
587
588   pad_template =
589       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
590   g_return_if_fail (pad_template != NULL);
591
592   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
593
594   gst_pad_set_getcaps_function (basesink->sinkpad,
595       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
596   gst_pad_set_setcaps_function (basesink->sinkpad,
597       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
598   gst_pad_set_fixatecaps_function (basesink->sinkpad,
599       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
600   gst_pad_set_bufferalloc_function (basesink->sinkpad,
601       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
602   gst_pad_set_activate_function (basesink->sinkpad,
603       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
604   gst_pad_set_activatepush_function (basesink->sinkpad,
605       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
606   gst_pad_set_activatepull_function (basesink->sinkpad,
607       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
608   gst_pad_set_event_function (basesink->sinkpad,
609       GST_DEBUG_FUNCPTR (gst_base_sink_event));
610   gst_pad_set_chain_function (basesink->sinkpad,
611       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
612   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
613
614   basesink->pad_mode = GST_ACTIVATE_NONE;
615   basesink->preroll_queue = g_queue_new ();
616   basesink->abidata.ABI.clip_segment = gst_segment_new ();
617   priv->have_latency = FALSE;
618
619   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
620   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
621
622   basesink->sync = DEFAULT_SYNC;
623   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
624   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
625   priv->async_enabled = DEFAULT_ASYNC;
626   priv->ts_offset = DEFAULT_TS_OFFSET;
627   priv->render_delay = DEFAULT_RENDER_DELAY;
628   priv->blocksize = DEFAULT_BLOCKSIZE;
629
630   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
631 }
632
633 static void
634 gst_base_sink_finalize (GObject * object)
635 {
636   GstBaseSink *basesink;
637
638   basesink = GST_BASE_SINK (object);
639
640   g_queue_free (basesink->preroll_queue);
641   gst_segment_free (basesink->abidata.ABI.clip_segment);
642
643   G_OBJECT_CLASS (parent_class)->finalize (object);
644 }
645
646 /**
647  * gst_base_sink_set_sync:
648  * @sink: the sink
649  * @sync: the new sync value.
650  *
651  * Configures @sink to synchronize on the clock or not. When
652  * @sync is FALSE, incomming samples will be played as fast as
653  * possible. If @sync is TRUE, the timestamps of the incomming
654  * buffers will be used to schedule the exact render time of its
655  * contents.
656  *
657  * Since: 0.10.4
658  */
659 void
660 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
661 {
662   g_return_if_fail (GST_IS_BASE_SINK (sink));
663
664   GST_OBJECT_LOCK (sink);
665   sink->sync = sync;
666   GST_OBJECT_UNLOCK (sink);
667 }
668
669 /**
670  * gst_base_sink_get_sync:
671  * @sink: the sink
672  *
673  * Checks if @sink is currently configured to synchronize against the
674  * clock.
675  *
676  * Returns: TRUE if the sink is configured to synchronize against the clock.
677  *
678  * Since: 0.10.4
679  */
680 gboolean
681 gst_base_sink_get_sync (GstBaseSink * sink)
682 {
683   gboolean res;
684
685   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
686
687   GST_OBJECT_LOCK (sink);
688   res = sink->sync;
689   GST_OBJECT_UNLOCK (sink);
690
691   return res;
692 }
693
694 /**
695  * gst_base_sink_set_max_lateness:
696  * @sink: the sink
697  * @max_lateness: the new max lateness value.
698  *
699  * Sets the new max lateness value to @max_lateness. This value is
700  * used to decide if a buffer should be dropped or not based on the
701  * buffer timestamp and the current clock time. A value of -1 means
702  * an unlimited time.
703  *
704  * Since: 0.10.4
705  */
706 void
707 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
708 {
709   g_return_if_fail (GST_IS_BASE_SINK (sink));
710
711   GST_OBJECT_LOCK (sink);
712   sink->abidata.ABI.max_lateness = max_lateness;
713   GST_OBJECT_UNLOCK (sink);
714 }
715
716 /**
717  * gst_base_sink_get_max_lateness:
718  * @sink: the sink
719  *
720  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
721  * more details.
722  *
723  * Returns: The maximum time in nanoseconds that a buffer can be late
724  * before it is dropped and not rendered. A value of -1 means an
725  * unlimited time.
726  *
727  * Since: 0.10.4
728  */
729 gint64
730 gst_base_sink_get_max_lateness (GstBaseSink * sink)
731 {
732   gint64 res;
733
734   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
735
736   GST_OBJECT_LOCK (sink);
737   res = sink->abidata.ABI.max_lateness;
738   GST_OBJECT_UNLOCK (sink);
739
740   return res;
741 }
742
743 /**
744  * gst_base_sink_set_qos_enabled:
745  * @sink: the sink
746  * @enabled: the new qos value.
747  *
748  * Configures @sink to send Quality-of-Service events upstream.
749  *
750  * Since: 0.10.5
751  */
752 void
753 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
754 {
755   g_return_if_fail (GST_IS_BASE_SINK (sink));
756
757   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
758 }
759
760 /**
761  * gst_base_sink_is_qos_enabled:
762  * @sink: the sink
763  *
764  * Checks if @sink is currently configured to send Quality-of-Service events
765  * upstream.
766  *
767  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
768  *
769  * Since: 0.10.5
770  */
771 gboolean
772 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
773 {
774   gboolean res;
775
776   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
777
778   res = g_atomic_int_get (&sink->priv->qos_enabled);
779
780   return res;
781 }
782
783 /**
784  * gst_base_sink_set_async_enabled:
785  * @sink: the sink
786  * @enabled: the new async value.
787  *
788  * Configures @sink to perform all state changes asynchronusly. When async is
789  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
790  * preroll buffer. This feature is usefull if the sink does not synchronize
791  * against the clock or when it is dealing with sparse streams.
792  *
793  * Since: 0.10.15
794  */
795 void
796 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
797 {
798   g_return_if_fail (GST_IS_BASE_SINK (sink));
799
800   GST_PAD_PREROLL_LOCK (sink->sinkpad);
801   sink->priv->async_enabled = enabled;
802   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
803   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
804 }
805
806 /**
807  * gst_base_sink_is_async_enabled:
808  * @sink: the sink
809  *
810  * Checks if @sink is currently configured to perform asynchronous state
811  * changes to PAUSED.
812  *
813  * Returns: TRUE if the sink is configured to perform asynchronous state
814  * changes.
815  *
816  * Since: 0.10.15
817  */
818 gboolean
819 gst_base_sink_is_async_enabled (GstBaseSink * sink)
820 {
821   gboolean res;
822
823   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
824
825   GST_PAD_PREROLL_LOCK (sink->sinkpad);
826   res = sink->priv->async_enabled;
827   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
828
829   return res;
830 }
831
832 /**
833  * gst_base_sink_set_ts_offset:
834  * @sink: the sink
835  * @offset: the new offset
836  *
837  * Adjust the synchronisation of @sink with @offset. A negative value will
838  * render buffers earlier than their timestamp. A positive value will delay
839  * rendering. This function can be used to fix playback of badly timestamped
840  * buffers.
841  *
842  * Since: 0.10.15
843  */
844 void
845 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
846 {
847   g_return_if_fail (GST_IS_BASE_SINK (sink));
848
849   GST_OBJECT_LOCK (sink);
850   sink->priv->ts_offset = offset;
851   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
852   GST_OBJECT_UNLOCK (sink);
853 }
854
855 /**
856  * gst_base_sink_get_ts_offset:
857  * @sink: the sink
858  *
859  * Get the synchronisation offset of @sink.
860  *
861  * Returns: The synchronisation offset.
862  *
863  * Since: 0.10.15
864  */
865 GstClockTimeDiff
866 gst_base_sink_get_ts_offset (GstBaseSink * sink)
867 {
868   GstClockTimeDiff res;
869
870   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
871
872   GST_OBJECT_LOCK (sink);
873   res = sink->priv->ts_offset;
874   GST_OBJECT_UNLOCK (sink);
875
876   return res;
877 }
878
879 /**
880  * gst_base_sink_get_last_buffer:
881  * @sink: the sink
882  *
883  * Get the last buffer that arrived in the sink and was used for preroll or for
884  * rendering. This property can be used to generate thumbnails.
885  *
886  * The #GstCaps on the buffer can be used to determine the type of the buffer.
887  * 
888  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
889  * NULL when no buffer has arrived in the sink yet or when the sink is not in
890  * PAUSED or PLAYING.
891  *
892  * Since: 0.10.15
893  */
894 GstBuffer *
895 gst_base_sink_get_last_buffer (GstBaseSink * sink)
896 {
897   GstBuffer *res;
898
899   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
900
901   GST_OBJECT_LOCK (sink);
902   if ((res = sink->priv->last_buffer))
903     gst_buffer_ref (res);
904   GST_OBJECT_UNLOCK (sink);
905
906   return res;
907 }
908
909 static void
910 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
911 {
912   GstBuffer *old;
913
914   GST_OBJECT_LOCK (sink);
915   old = sink->priv->last_buffer;
916   if (G_LIKELY (old != buffer)) {
917     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
918     if (G_LIKELY (buffer))
919       gst_buffer_ref (buffer);
920     sink->priv->last_buffer = buffer;
921   } else {
922     old = NULL;
923   }
924   GST_OBJECT_UNLOCK (sink);
925
926   /* avoid unreffing with the lock because cleanup code might want to take the
927    * lock too */
928   if (G_LIKELY (old))
929     gst_buffer_unref (old);
930 }
931
932 /**
933  * gst_base_sink_get_latency:
934  * @sink: the sink
935  *
936  * Get the currently configured latency.
937  *
938  * Returns: The configured latency.
939  *
940  * Since: 0.10.12
941  */
942 GstClockTime
943 gst_base_sink_get_latency (GstBaseSink * sink)
944 {
945   GstClockTime res;
946
947   GST_OBJECT_LOCK (sink);
948   res = sink->priv->latency;
949   GST_OBJECT_UNLOCK (sink);
950
951   return res;
952 }
953
954 /**
955  * gst_base_sink_query_latency:
956  * @sink: the sink
957  * @live: if the sink is live
958  * @upstream_live: if an upstream element is live
959  * @min_latency: the min latency of the upstream elements
960  * @max_latency: the max latency of the upstream elements
961  *
962  * Query the sink for the latency parameters. The latency will be queried from
963  * the upstream elements. @live will be TRUE if @sink is configured to
964  * synchronize against the clock. @upstream_live will be TRUE if an upstream
965  * element is live. 
966  *
967  * If both @live and @upstream_live are TRUE, the sink will want to compensate
968  * for the latency introduced by the upstream elements by setting the
969  * @min_latency to a strictly possitive value.
970  *
971  * This function is mostly used by subclasses. 
972  *
973  * Returns: TRUE if the query succeeded.
974  *
975  * Since: 0.10.12
976  */
977 gboolean
978 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
979     gboolean * upstream_live, GstClockTime * min_latency,
980     GstClockTime * max_latency)
981 {
982   gboolean l, us_live, res, have_latency;
983   GstClockTime min, max, render_delay;
984   GstQuery *query;
985   GstClockTime us_min, us_max;
986
987   /* we are live when we sync to the clock */
988   GST_OBJECT_LOCK (sink);
989   l = sink->sync;
990   have_latency = sink->priv->have_latency;
991   render_delay = sink->priv->render_delay;
992   GST_OBJECT_UNLOCK (sink);
993
994   /* assume no latency */
995   min = 0;
996   max = -1;
997   us_live = FALSE;
998
999   if (have_latency) {
1000     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1001     /* we are ready for a latency query this is when we preroll or when we are
1002      * not async. */
1003     query = gst_query_new_latency ();
1004
1005     /* ask the peer for the latency */
1006     if ((res = gst_base_sink_peer_query (sink, query))) {
1007       /* get upstream min and max latency */
1008       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1009
1010       if (us_live) {
1011         /* upstream live, use its latency, subclasses should use these
1012          * values to create the complete latency. */
1013         min = us_min;
1014         max = us_max;
1015       }
1016       if (l) {
1017         /* we need to add the render delay if we are live */
1018         if (min != -1)
1019           min += render_delay;
1020         if (max != -1)
1021           max += render_delay;
1022       }
1023     }
1024     gst_query_unref (query);
1025   } else {
1026     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1027     res = FALSE;
1028   }
1029
1030   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1031   if (!res) {
1032     if (!l) {
1033       res = TRUE;
1034       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1035     } else {
1036       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1037     }
1038   }
1039
1040   if (res) {
1041     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1042         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1043         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1044
1045     if (live)
1046       *live = l;
1047     if (upstream_live)
1048       *upstream_live = us_live;
1049     if (min_latency)
1050       *min_latency = min;
1051     if (max_latency)
1052       *max_latency = max;
1053   }
1054   return res;
1055 }
1056
1057 /**
1058  * gst_base_sink_set_render_delay:
1059  * @sink: a #GstBaseSink
1060  * @delay: the new delay
1061  *
1062  * Set the render delay in @sink to @delay. The render delay is the time 
1063  * between actual rendering of a buffer and its synchronisation time. Some
1064  * devices might delay media rendering which can be compensated for with this
1065  * function. 
1066  *
1067  * After calling this function, this sink will report additional latency and
1068  * other sinks will adjust their latency to delay the rendering of their media.
1069  *
1070  * This function is usually called by subclasses.
1071  *
1072  * Since: 0.10.21
1073  */
1074 void
1075 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1076 {
1077   GstClockTime old_render_delay;
1078
1079   g_return_if_fail (GST_IS_BASE_SINK (sink));
1080
1081   GST_OBJECT_LOCK (sink);
1082   old_render_delay = sink->priv->render_delay;
1083   sink->priv->render_delay = delay;
1084   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1085       GST_TIME_ARGS (delay));
1086   GST_OBJECT_UNLOCK (sink);
1087
1088   if (delay != old_render_delay) {
1089     GST_DEBUG_OBJECT (sink, "posting latency changed");
1090     gst_element_post_message (GST_ELEMENT_CAST (sink),
1091         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1092   }
1093 }
1094
1095 /**
1096  * gst_base_sink_get_render_delay:
1097  * @sink: a #GstBaseSink
1098  *
1099  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1100  * information about the render delay.
1101  *
1102  * Returns: the render delay of @sink.
1103  *
1104  * Since: 0.10.21
1105  */
1106 GstClockTime
1107 gst_base_sink_get_render_delay (GstBaseSink * sink)
1108 {
1109   GstClockTimeDiff res;
1110
1111   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1112
1113   GST_OBJECT_LOCK (sink);
1114   res = sink->priv->render_delay;
1115   GST_OBJECT_UNLOCK (sink);
1116
1117   return res;
1118 }
1119
1120 /**
1121  * gst_base_sink_set_blocksize:
1122  * @sink: a #GstBaseSink
1123  * @blocksize: the blocksize in bytes
1124  *
1125  * Set the number of bytes that the sink will pull when it is operating in pull
1126  * mode.
1127  *
1128  * Since: 0.10.22
1129  */
1130 void
1131 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1132 {
1133   g_return_if_fail (GST_IS_BASE_SINK (sink));
1134
1135   GST_OBJECT_LOCK (sink);
1136   sink->priv->blocksize = blocksize;
1137   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1138   GST_OBJECT_UNLOCK (sink);
1139 }
1140
1141 /**
1142  * gst_base_sink_get_blocksize:
1143  * @sink: a #GstBaseSink
1144  *
1145  * Get the number of bytes that the sink will pull when it is operating in pull
1146  * mode.
1147  *
1148  * Returns: the number of bytes @sink will pull in pull mode.
1149  *
1150  * Since: 0.10.22
1151  */
1152 guint
1153 gst_base_sink_get_blocksize (GstBaseSink * sink)
1154 {
1155   guint res;
1156
1157   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1158
1159   GST_OBJECT_LOCK (sink);
1160   res = sink->priv->blocksize;
1161   GST_OBJECT_UNLOCK (sink);
1162
1163   return res;
1164 }
1165
1166 static void
1167 gst_base_sink_set_property (GObject * object, guint prop_id,
1168     const GValue * value, GParamSpec * pspec)
1169 {
1170   GstBaseSink *sink = GST_BASE_SINK (object);
1171
1172   switch (prop_id) {
1173     case PROP_PREROLL_QUEUE_LEN:
1174       /* preroll lock necessary to serialize with finish_preroll */
1175       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1176       sink->preroll_queue_max_len = g_value_get_uint (value);
1177       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1178       break;
1179     case PROP_SYNC:
1180       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1181       break;
1182     case PROP_MAX_LATENESS:
1183       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1184       break;
1185     case PROP_QOS:
1186       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1187       break;
1188     case PROP_ASYNC:
1189       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1190       break;
1191     case PROP_TS_OFFSET:
1192       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1193       break;
1194     case PROP_BLOCKSIZE:
1195       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1196       break;
1197     case PROP_RENDER_DELAY:
1198       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1199       break;
1200     default:
1201       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1202       break;
1203   }
1204 }
1205
1206 static void
1207 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1208     GParamSpec * pspec)
1209 {
1210   GstBaseSink *sink = GST_BASE_SINK (object);
1211
1212   switch (prop_id) {
1213     case PROP_PREROLL_QUEUE_LEN:
1214       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1215       g_value_set_uint (value, sink->preroll_queue_max_len);
1216       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1217       break;
1218     case PROP_SYNC:
1219       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1220       break;
1221     case PROP_MAX_LATENESS:
1222       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1223       break;
1224     case PROP_QOS:
1225       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1226       break;
1227     case PROP_ASYNC:
1228       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1229       break;
1230     case PROP_TS_OFFSET:
1231       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1232       break;
1233     case PROP_LAST_BUFFER:
1234       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1235       break;
1236     case PROP_BLOCKSIZE:
1237       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1238       break;
1239     case PROP_RENDER_DELAY:
1240       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1241       break;
1242     default:
1243       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1244       break;
1245   }
1246 }
1247
1248
1249 static GstCaps *
1250 gst_base_sink_get_caps (GstBaseSink * sink)
1251 {
1252   return NULL;
1253 }
1254
1255 static gboolean
1256 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1257 {
1258   return TRUE;
1259 }
1260
1261 static GstFlowReturn
1262 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1263     GstCaps * caps, GstBuffer ** buf)
1264 {
1265   *buf = NULL;
1266   return GST_FLOW_OK;
1267 }
1268
1269 /* with PREROLL_LOCK, STREAM_LOCK */
1270 static void
1271 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1272 {
1273   GstMiniObject *obj;
1274
1275   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1276   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1277     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1278     gst_mini_object_unref (obj);
1279   }
1280   /* we can't have EOS anymore now */
1281   basesink->eos = FALSE;
1282   basesink->priv->received_eos = FALSE;
1283   basesink->have_preroll = FALSE;
1284   basesink->priv->step_unlock = FALSE;
1285   basesink->eos_queued = FALSE;
1286   basesink->preroll_queued = 0;
1287   basesink->buffers_queued = 0;
1288   basesink->events_queued = 0;
1289   /* can't report latency anymore until we preroll again */
1290   if (basesink->priv->async_enabled) {
1291     GST_OBJECT_LOCK (basesink);
1292     basesink->priv->have_latency = FALSE;
1293     GST_OBJECT_UNLOCK (basesink);
1294   }
1295   /* and signal any waiters now */
1296   GST_PAD_PREROLL_SIGNAL (pad);
1297 }
1298
1299 /* with STREAM_LOCK, configures given segment with the event information. */
1300 static void
1301 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1302     GstEvent * event, GstSegment * segment)
1303 {
1304   gboolean update;
1305   gdouble rate, arate;
1306   GstFormat format;
1307   gint64 start;
1308   gint64 stop;
1309   gint64 time;
1310
1311   /* the newsegment event is needed to bring the buffer timestamps to the
1312    * stream time and to drop samples outside of the playback segment. */
1313   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1314       &start, &stop, &time);
1315
1316   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1317    * We protect with the OBJECT_LOCK so that we can use the values to
1318    * safely answer a POSITION query. */
1319   GST_OBJECT_LOCK (basesink);
1320   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1321       stop, time);
1322
1323   if (format == GST_FORMAT_TIME) {
1324     GST_DEBUG_OBJECT (basesink,
1325         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1326         "format GST_FORMAT_TIME, "
1327         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1328         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1329         update, rate, arate, GST_TIME_ARGS (segment->start),
1330         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1331         GST_TIME_ARGS (segment->accum));
1332   } else {
1333     GST_DEBUG_OBJECT (basesink,
1334         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1335         "format %d, "
1336         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1337         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1338         segment->format, segment->start, segment->stop, segment->time,
1339         segment->accum);
1340   }
1341   GST_OBJECT_UNLOCK (basesink);
1342 }
1343
1344 /* with PREROLL_LOCK, STREAM_LOCK */
1345 static gboolean
1346 gst_base_sink_commit_state (GstBaseSink * basesink)
1347 {
1348   /* commit state and proceed to next pending state */
1349   GstState current, next, pending, post_pending;
1350   gboolean post_paused = FALSE;
1351   gboolean post_async_done = FALSE;
1352   gboolean post_playing = FALSE;
1353
1354   /* we are certainly not playing async anymore now */
1355   basesink->playing_async = FALSE;
1356
1357   GST_OBJECT_LOCK (basesink);
1358   current = GST_STATE (basesink);
1359   next = GST_STATE_NEXT (basesink);
1360   pending = GST_STATE_PENDING (basesink);
1361   post_pending = pending;
1362
1363   switch (pending) {
1364     case GST_STATE_PLAYING:
1365     {
1366       GstBaseSinkClass *bclass;
1367       GstStateChangeReturn ret;
1368
1369       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1370
1371       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1372
1373       basesink->need_preroll = FALSE;
1374       post_async_done = TRUE;
1375       basesink->priv->commited = TRUE;
1376       post_playing = TRUE;
1377       /* post PAUSED too when we were READY */
1378       if (current == GST_STATE_READY) {
1379         post_paused = TRUE;
1380       }
1381
1382       /* make sure we notify the subclass of async playing */
1383       if (bclass->async_play) {
1384         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1385         ret = bclass->async_play (basesink);
1386         if (ret == GST_STATE_CHANGE_FAILURE)
1387           goto async_failed;
1388       }
1389       break;
1390     }
1391     case GST_STATE_PAUSED:
1392       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1393       post_paused = TRUE;
1394       post_async_done = TRUE;
1395       basesink->priv->commited = TRUE;
1396       post_pending = GST_STATE_VOID_PENDING;
1397       break;
1398     case GST_STATE_READY:
1399     case GST_STATE_NULL:
1400       goto stopping;
1401     case GST_STATE_VOID_PENDING:
1402       goto nothing_pending;
1403     default:
1404       break;
1405   }
1406
1407   /* we can report latency queries now */
1408   basesink->priv->have_latency = TRUE;
1409
1410   GST_STATE (basesink) = pending;
1411   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1412   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1413   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1414   GST_OBJECT_UNLOCK (basesink);
1415
1416   if (post_paused) {
1417     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1418     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1419         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1420             current, next, post_pending));
1421   }
1422   if (post_async_done) {
1423     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1424     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1425         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1426   }
1427   if (post_playing) {
1428     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1429     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1430         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1431             next, pending, GST_STATE_VOID_PENDING));
1432   }
1433
1434   GST_STATE_BROADCAST (basesink);
1435
1436   return TRUE;
1437
1438 nothing_pending:
1439   {
1440     /* Depending on the state, set our vars. We get in this situation when the
1441      * state change function got a change to update the state vars before the
1442      * streaming thread did. This is fine but we need to make sure that we
1443      * update the need_preroll var since it was TRUE when we got here and might
1444      * become FALSE if we got to PLAYING. */
1445     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1446         gst_element_state_get_name (current));
1447     switch (current) {
1448       case GST_STATE_PLAYING:
1449         basesink->need_preroll = FALSE;
1450         break;
1451       case GST_STATE_PAUSED:
1452         basesink->need_preroll = TRUE;
1453         break;
1454       default:
1455         basesink->need_preroll = FALSE;
1456         basesink->flushing = TRUE;
1457         break;
1458     }
1459     /* we can report latency queries now */
1460     basesink->priv->have_latency = TRUE;
1461     GST_OBJECT_UNLOCK (basesink);
1462     return TRUE;
1463   }
1464 stopping:
1465   {
1466     /* app is going to READY */
1467     GST_DEBUG_OBJECT (basesink, "stopping");
1468     basesink->need_preroll = FALSE;
1469     basesink->flushing = TRUE;
1470     GST_OBJECT_UNLOCK (basesink);
1471     return FALSE;
1472   }
1473 async_failed:
1474   {
1475     GST_DEBUG_OBJECT (basesink, "async commit failed");
1476     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1477     GST_OBJECT_UNLOCK (basesink);
1478     return FALSE;
1479   }
1480 }
1481
1482 static void
1483 start_stepping (GstBaseSink * sink, GstSegment * segment,
1484     GstStepInfo * pending, GstStepInfo * current)
1485 {
1486   gint64 end;
1487
1488   GST_DEBUG_OBJECT (sink, "update pending step");
1489
1490   GST_OBJECT_LOCK (sink);
1491   memcpy (current, pending, sizeof (GstStepInfo));
1492   pending->valid = FALSE;
1493   GST_OBJECT_UNLOCK (sink);
1494
1495   /* get the running time of where we paused and remember it */
1496   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1497   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1498
1499   /* set the new rate for the remainder of the segment */
1500   current->start_rate = segment->rate;
1501   segment->rate *= current->rate;
1502   segment->abs_rate = ABS (segment->rate);
1503
1504   if (segment->format == GST_FORMAT_TIME) {
1505     current->start_stop = segment->stop;
1506     end = current->start + current->amount;
1507     segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1508   }
1509
1510   GST_DEBUG_OBJECT (sink, "segment now %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1511       GST_TIME_ARGS (segment->start), GST_TIME_ARGS (segment->stop));
1512
1513   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1514       GST_TIME_ARGS (current->start));
1515
1516   if (current->amount == -1) {
1517     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1518     current->valid = FALSE;
1519   } else {
1520     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1521         "rate: %f", current->amount, gst_format_get_name (current->format),
1522         current->rate);
1523   }
1524 }
1525
1526 static void
1527 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1528     GstStepInfo * current, gint64 rstart, gint64 rstop)
1529 {
1530   gint64 stop, position;
1531   GstMessage *message;
1532
1533   GST_DEBUG_OBJECT (sink, "step complete");
1534
1535   if (segment->rate > 0.0)
1536     stop = rstart;
1537   else
1538     stop = rstop;
1539
1540   GST_DEBUG_OBJECT (sink,
1541       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1542
1543   if (stop == -1)
1544     current->duration = current->position;
1545   else
1546     current->duration = stop - current->start;
1547
1548   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1549       GST_TIME_ARGS (current->duration));
1550
1551   position = current->start + current->duration;
1552   gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1553
1554   /* now move the segment to the new running time */
1555   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1556
1557   if (current->flush) {
1558     /* and remove the accumulated time we flushed */
1559     segment->accum = current->start;
1560   }
1561
1562   /* restore the previous rate */
1563   segment->rate = current->start_rate;
1564   segment->abs_rate = ABS (segment->rate);
1565   segment->stop = current->start_stop;
1566
1567   /* the clip segment is used for position report in paused... */
1568   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1569
1570   /* post the step done when we know the stepped duration in TIME */
1571   message =
1572       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1573       current->amount, current->rate, current->flush, current->intermediate,
1574       current->duration);
1575   gst_message_set_seqnum (message, current->seqnum);
1576   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1577
1578   if (!current->intermediate)
1579     sink->need_preroll = current->need_preroll;
1580
1581   /* and the current step info finished and becomes invalid */
1582   current->valid = FALSE;
1583 }
1584
1585 static gboolean
1586 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1587     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1588     gint64 * rstop)
1589 {
1590   GstBaseSinkPrivate *priv;
1591   gboolean step_end = FALSE;
1592
1593   priv = sink->priv;
1594
1595   /* see if we need to skip this buffer because of stepping */
1596   switch (current->format) {
1597     case GST_FORMAT_TIME:
1598     {
1599       guint64 end;
1600       gint64 first, last;
1601
1602       if (segment->rate > 0.0) {
1603         first = *rstart;
1604         last = *rstop;
1605       } else {
1606         first = *rstop;
1607         last = *rstart;
1608       }
1609
1610       end = current->start + current->amount;
1611       current->position = first - current->start;
1612
1613       if (G_UNLIKELY (segment->abs_rate != 1.0))
1614         current->position /= segment->abs_rate;
1615
1616       GST_DEBUG_OBJECT (sink,
1617           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1618           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1619       GST_DEBUG_OBJECT (sink,
1620           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1621           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1622           GST_TIME_ARGS (last - current->start),
1623           GST_TIME_ARGS (current->amount));
1624
1625       if ((current->flush && current->position >= current->amount)
1626           || last >= end) {
1627         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1628         step_end = TRUE;
1629         if (segment->rate > 0.0) {
1630           *rstart = end;
1631           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1632         } else {
1633           *rstop = end;
1634           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1635         }
1636       }
1637       GST_DEBUG_OBJECT (sink,
1638           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1639           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1640       GST_DEBUG_OBJECT (sink,
1641           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1642           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1643       break;
1644     }
1645     case GST_FORMAT_BUFFERS:
1646       GST_DEBUG_OBJECT (sink,
1647           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1648           current->position, current->amount);
1649
1650       if (current->position < current->amount) {
1651         current->position++;
1652       } else {
1653         step_end = TRUE;
1654       }
1655       break;
1656     case GST_FORMAT_DEFAULT:
1657     default:
1658       GST_DEBUG_OBJECT (sink,
1659           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1660           current->position, current->amount);
1661       break;
1662   }
1663   return step_end;
1664 }
1665
1666 /* with STREAM_LOCK, PREROLL_LOCK
1667  *
1668  * Returns TRUE if the object needs synchronisation and takes therefore
1669  * part in prerolling.
1670  *
1671  * rsstart/rsstop contain the start/stop in stream time.
1672  * rrstart/rrstop contain the start/stop in running time.
1673  */
1674 static gboolean
1675 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1676     GstClockTime * rsstart, GstClockTime * rsstop,
1677     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1678     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1679     gboolean * step_end)
1680 {
1681   GstBaseSinkClass *bclass;
1682   GstBuffer *buffer;
1683   GstClockTime start, stop;     /* raw start/stop timestamps */
1684   gint64 cstart, cstop;         /* clipped raw timestamps */
1685   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1686   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1687   GstFormat format;
1688   GstBaseSinkPrivate *priv;
1689
1690   priv = basesink->priv;
1691
1692   /* start with nothing */
1693   start = stop = -1;
1694
1695   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1696     GstEvent *event = GST_EVENT_CAST (obj);
1697
1698     switch (GST_EVENT_TYPE (event)) {
1699         /* EOS event needs syncing */
1700       case GST_EVENT_EOS:
1701       {
1702         if (basesink->segment.rate >= 0.0) {
1703           sstart = sstop = priv->current_sstop;
1704           if (sstart == -1) {
1705             /* we have not seen a buffer yet, use the segment values */
1706             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1707                 basesink->segment.format, basesink->segment.stop);
1708           }
1709         } else {
1710           sstart = sstop = priv->current_sstart;
1711           if (sstart == -1) {
1712             /* we have not seen a buffer yet, use the segment values */
1713             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1714                 basesink->segment.format, basesink->segment.start);
1715           }
1716         }
1717
1718         rstart = rstop = priv->eos_rtime;
1719         *do_sync = rstart != -1;
1720         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1721             GST_TIME_ARGS (rstart));
1722         /* if we are stepping, we end now */
1723         *step_end = step->valid;
1724         goto eos_done;
1725       }
1726       default:
1727         /* other events do not need syncing */
1728         /* FIXME, maybe NEWSEGMENT might need synchronisation
1729          * since the POSITION query depends on accumulated times and
1730          * we cannot accumulate the current segment before the previous
1731          * one completed.
1732          */
1733         return FALSE;
1734     }
1735   }
1736
1737   /* else do buffer sync code */
1738   buffer = GST_BUFFER_CAST (obj);
1739
1740   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1741
1742   /* just get the times to see if we need syncing, if the start returns -1 we
1743    * don't sync. */
1744   if (bclass->get_times)
1745     bclass->get_times (basesink, buffer, &start, &stop);
1746
1747   if (start == -1) {
1748     /* we don't need to sync but we still want to get the timestamps for
1749      * tracking the position */
1750     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1751     *do_sync = FALSE;
1752   } else {
1753     *do_sync = TRUE;
1754   }
1755
1756   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1757       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1758       GST_TIME_ARGS (stop), *do_sync);
1759
1760   /* collect segment and format for code clarity */
1761   format = segment->format;
1762
1763   /* no timestamp clipping if we did not get a TIME segment format */
1764   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1765     cstart = start;
1766     cstop = stop;
1767     /* do running and stream time in TIME format */
1768     format = GST_FORMAT_TIME;
1769     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1770     goto do_times;
1771   }
1772
1773   /* clip, only when we know about time */
1774   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1775               (gint64) start, (gint64) stop, &cstart, &cstop)))
1776     goto out_of_segment;
1777
1778   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1779     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1780         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1781         GST_TIME_ARGS (cstop));
1782   }
1783
1784   /* set last stop position */
1785   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1786     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1787   else
1788     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1789
1790 do_times:
1791   rstart = gst_segment_to_running_time (segment, format, cstart);
1792   rstop = gst_segment_to_running_time (segment, format, cstop);
1793
1794   if (G_UNLIKELY (step->valid)) {
1795     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1796                 &rstart, &rstop))) {
1797       /* step is still busy, we discard data when we are flushing */
1798       *stepped = step->flush;
1799     }
1800   }
1801   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1802    * upstream is behaving very badly */
1803   sstart = gst_segment_to_stream_time (segment, format, cstart);
1804   sstop = gst_segment_to_stream_time (segment, format, cstop);
1805
1806 eos_done:
1807   /* done label only called when doing EOS, we also stop stepping then */
1808   if (*step_end && step->flush) {
1809     stop_stepping (basesink, segment, step, rstart, rstop);
1810     *step_end = FALSE;
1811   }
1812
1813   /* save times */
1814   *rsstart = sstart;
1815   *rsstop = sstop;
1816   *rrstart = rstart;
1817   *rrstop = rstop;
1818
1819   /* buffers and EOS always need syncing and preroll */
1820   return TRUE;
1821
1822   /* special cases */
1823 out_of_segment:
1824   {
1825     /* we usually clip in the chain function already but stepping could cause
1826      * the segment to be updated later. we return FALSE so that we don't try
1827      * to sync on it. */
1828     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1829     return FALSE;
1830   }
1831 }
1832
1833 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1834  * adjust a timestamp with the latency and timestamp offset */
1835 static GstClockTime
1836 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1837 {
1838   GstClockTimeDiff ts_offset;
1839
1840   /* don't do anything funny with invalid timestamps */
1841   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1842     return time;
1843
1844   time += basesink->priv->latency;
1845
1846   /* apply offset, be carefull for underflows */
1847   ts_offset = basesink->priv->ts_offset;
1848   if (ts_offset < 0) {
1849     ts_offset = -ts_offset;
1850     if (ts_offset < time)
1851       time -= ts_offset;
1852     else
1853       time = 0;
1854   } else
1855     time += ts_offset;
1856
1857   return time;
1858 }
1859
1860 /**
1861  * gst_base_sink_wait_clock:
1862  * @sink: the sink
1863  * @time: the running_time to be reached
1864  * @jitter: the jitter to be filled with time diff (can be NULL)
1865  *
1866  * This function will block until @time is reached. It is usually called by
1867  * subclasses that use their own internal synchronisation.
1868  *
1869  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1870  * returned. Likewise, if synchronisation is disabled in the element or there
1871  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1872  *
1873  * This function should only be called with the PREROLL_LOCK held, like when
1874  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1875  * the ::render vmethod.
1876  *
1877  * The @time argument should be the running_time of when this method should
1878  * return and is not adjusted with any latency or offset configured in the
1879  * sink.
1880  *
1881  * Since 0.10.20
1882  *
1883  * Returns: #GstClockReturn
1884  */
1885 GstClockReturn
1886 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1887     GstClockTimeDiff * jitter)
1888 {
1889   GstClockID id;
1890   GstClockReturn ret;
1891   GstClock *clock;
1892
1893   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1894     goto invalid_time;
1895
1896   GST_OBJECT_LOCK (sink);
1897   if (G_UNLIKELY (!sink->sync))
1898     goto no_sync;
1899
1900   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1901     goto no_clock;
1902
1903   /* add base_time to running_time to get the time against the clock */
1904   time += GST_ELEMENT_CAST (sink)->base_time;
1905
1906   id = gst_clock_new_single_shot_id (clock, time);
1907   GST_OBJECT_UNLOCK (sink);
1908
1909   /* A blocking wait is performed on the clock. We save the ClockID
1910    * so we can unlock the entry at any time. While we are blocking, we 
1911    * release the PREROLL_LOCK so that other threads can interrupt the
1912    * entry. */
1913   sink->clock_id = id;
1914   /* release the preroll lock while waiting */
1915   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1916
1917   ret = gst_clock_id_wait (id, jitter);
1918
1919   GST_PAD_PREROLL_LOCK (sink->sinkpad);
1920   gst_clock_id_unref (id);
1921   sink->clock_id = NULL;
1922
1923   return ret;
1924
1925   /* no syncing needed */
1926 invalid_time:
1927   {
1928     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
1929     return GST_CLOCK_BADTIME;
1930   }
1931 no_sync:
1932   {
1933     GST_DEBUG_OBJECT (sink, "sync disabled");
1934     GST_OBJECT_UNLOCK (sink);
1935     return GST_CLOCK_BADTIME;
1936   }
1937 no_clock:
1938   {
1939     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
1940     GST_OBJECT_UNLOCK (sink);
1941     return GST_CLOCK_BADTIME;
1942   }
1943 }
1944
1945 /**
1946  * gst_base_sink_wait_preroll:
1947  * @sink: the sink
1948  *
1949  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1950  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1951  * this method before continuing to render the remaining data.
1952  *
1953  * This function will block until a state change to PLAYING happens (in which
1954  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1955  * to a state change to READY or a FLUSH event (in which case this function
1956  * returns #GST_FLOW_WRONG_STATE).
1957  *
1958  * This function should only be called with the PREROLL_LOCK held, like in the
1959  * render function.
1960  *
1961  * Since: 0.10.11
1962  *
1963  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1964  * continue. Any other return value should be returned from the render vmethod.
1965  */
1966 GstFlowReturn
1967 gst_base_sink_wait_preroll (GstBaseSink * sink)
1968 {
1969   sink->have_preroll = TRUE;
1970   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1971   /* block until the state changes, or we get a flush, or something */
1972   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1973   sink->have_preroll = FALSE;
1974   if (G_UNLIKELY (sink->flushing))
1975     goto stopping;
1976   if (G_UNLIKELY (sink->priv->step_unlock))
1977     goto step_unlocked;
1978   GST_DEBUG_OBJECT (sink, "continue after preroll");
1979
1980   return GST_FLOW_OK;
1981
1982   /* ERRORS */
1983 stopping:
1984   {
1985     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
1986     return GST_FLOW_WRONG_STATE;
1987   }
1988 step_unlocked:
1989   {
1990     sink->priv->step_unlock = FALSE;
1991     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
1992     return GST_FLOW_STEP;
1993   }
1994 }
1995
1996 /**
1997  * gst_base_sink_do_preroll:
1998  * @sink: the sink
1999  * @obj: the object that caused the preroll
2000  *
2001  * If the @sink spawns its own thread for pulling buffers from upstream it
2002  * should call this method after it has pulled a buffer. If the element needed
2003  * to preroll, this function will perform the preroll and will then block
2004  * until the element state is changed.
2005  *
2006  * This function should be called with the PREROLL_LOCK held.
2007  *
2008  * Since 0.10.22
2009  *
2010  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2011  * continue. Any other return value should be returned from the render vmethod.
2012  */
2013 GstFlowReturn
2014 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2015 {
2016   GstFlowReturn ret;
2017
2018   while (G_UNLIKELY (sink->need_preroll)) {
2019     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2020
2021     ret = gst_base_sink_preroll_object (sink, obj);
2022     if (ret != GST_FLOW_OK)
2023       goto preroll_failed;
2024
2025     /* need to recheck here because the commit state could have
2026      * made us not need the preroll anymore */
2027     if (G_LIKELY (sink->need_preroll)) {
2028       /* block until the state changes, or we get a flush, or something */
2029       ret = gst_base_sink_wait_preroll (sink);
2030       if (ret != GST_FLOW_OK) {
2031         if (ret == GST_FLOW_STEP)
2032           ret = GST_FLOW_OK;
2033         else
2034           goto preroll_failed;
2035       }
2036     }
2037   }
2038   return GST_FLOW_OK;
2039
2040   /* ERRORS */
2041 preroll_failed:
2042   {
2043     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2044     return ret;
2045   }
2046 }
2047
2048 /**
2049  * gst_base_sink_wait_eos:
2050  * @sink: the sink
2051  * @time: the running_time to be reached
2052  * @jitter: the jitter to be filled with time diff (can be NULL)
2053  *
2054  * This function will block until @time is reached. It is usually called by
2055  * subclasses that use their own internal synchronisation but want to let the
2056  * EOS be handled by the base class.
2057  *
2058  * This function should only be called with the PREROLL_LOCK held, like when
2059  * receiving an EOS event in the ::event vmethod.
2060  *
2061  * The @time argument should be the running_time of when the EOS should happen
2062  * and will be adjusted with any latency and offset configured in the sink.
2063  *
2064  * Since 0.10.15
2065  *
2066  * Returns: #GstFlowReturn
2067  */
2068 GstFlowReturn
2069 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2070     GstClockTimeDiff * jitter)
2071 {
2072   GstClockReturn status;
2073   GstFlowReturn ret;
2074
2075   do {
2076     GstClockTime stime;
2077
2078     GST_DEBUG_OBJECT (sink, "checking preroll");
2079
2080     /* first wait for the playing state before we can continue */
2081     if (G_UNLIKELY (sink->need_preroll)) {
2082       ret = gst_base_sink_wait_preroll (sink);
2083       if (ret != GST_FLOW_OK) {
2084         if (ret == GST_FLOW_STEP)
2085           ret = GST_FLOW_OK;
2086         else
2087           goto flushing;
2088       }
2089     }
2090
2091     /* preroll done, we can sync since we are in PLAYING now. */
2092     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2093         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2094
2095     /* compensate for latency and ts_offset. We don't adjust for render delay
2096      * because we don't interact with the device on EOS normally. */
2097     stime = gst_base_sink_adjust_time (sink, time);
2098
2099     /* wait for the clock, this can be interrupted because we got shut down or 
2100      * we PAUSED. */
2101     status = gst_base_sink_wait_clock (sink, stime, jitter);
2102
2103     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2104
2105     /* invalid time, no clock or sync disabled, just continue then */
2106     if (status == GST_CLOCK_BADTIME)
2107       break;
2108
2109     /* waiting could have been interrupted and we can be flushing now */
2110     if (G_UNLIKELY (sink->flushing))
2111       goto flushing;
2112
2113     /* retry if we got unscheduled, which means we did not reach the timeout
2114      * yet. if some other error occures, we continue. */
2115   } while (status == GST_CLOCK_UNSCHEDULED);
2116
2117   GST_DEBUG_OBJECT (sink, "end of stream");
2118
2119   return GST_FLOW_OK;
2120
2121   /* ERRORS */
2122 flushing:
2123   {
2124     GST_DEBUG_OBJECT (sink, "we are flushing");
2125     return GST_FLOW_WRONG_STATE;
2126   }
2127 }
2128
2129 /* with STREAM_LOCK, PREROLL_LOCK
2130  *
2131  * Make sure we are in PLAYING and synchronize an object to the clock.
2132  *
2133  * If we need preroll, we are not in PLAYING. We try to commit the state
2134  * if needed and then block if we still are not PLAYING.
2135  *
2136  * We start waiting on the clock in PLAYING. If we got interrupted, we
2137  * immediatly try to re-preroll.
2138  *
2139  * Some objects do not need synchronisation (most events) and so this function
2140  * immediatly returns GST_FLOW_OK.
2141  *
2142  * for objects that arrive later than max-lateness to be synchronized to the 
2143  * clock have the @late boolean set to TRUE.
2144  *
2145  * This function keeps a running average of the jitter (the diff between the
2146  * clock time and the requested sync time). The jitter is negative for
2147  * objects that arrive in time and positive for late buffers.
2148  *
2149  * does not take ownership of obj.
2150  */
2151 static GstFlowReturn
2152 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2153     GstMiniObject * obj, gboolean * late, gboolean * step_end)
2154 {
2155   GstClockTimeDiff jitter;
2156   gboolean syncable;
2157   GstClockReturn status = GST_CLOCK_OK;
2158   GstClockTime rstart, rstop, sstart, sstop, stime;
2159   gboolean do_sync;
2160   GstBaseSinkPrivate *priv;
2161   GstFlowReturn ret;
2162   GstStepInfo *current, *pending;
2163   gboolean stepped;
2164
2165   priv = basesink->priv;
2166
2167 do_step:
2168   sstart = sstop = rstart = rstop = -1;
2169   do_sync = TRUE;
2170   stepped = FALSE;
2171
2172   priv->current_rstart = -1;
2173
2174   /* get stepping info */
2175   current = &priv->current_step;
2176   pending = &priv->pending_step;
2177
2178   /* get timing information for this object against the render segment */
2179   syncable = gst_base_sink_get_sync_times (basesink, obj,
2180       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2181       current, step_end);
2182
2183   if (G_UNLIKELY (stepped))
2184     goto step_skipped;
2185
2186   /* a syncable object needs to participate in preroll and
2187    * clocking. All buffers and EOS are syncable. */
2188   if (G_UNLIKELY (!syncable))
2189     goto not_syncable;
2190
2191   /* store timing info for current object */
2192   priv->current_rstart = rstart;
2193   priv->current_rstop = (rstop != -1 ? rstop : rstart);
2194
2195   /* save sync time for eos when the previous object needed sync */
2196   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
2197
2198 again:
2199   /* first do preroll, this makes sure we commit our state
2200    * to PAUSED and can continue to PLAYING. We cannot perform
2201    * any clock sync in PAUSED because there is no clock. */
2202   ret = gst_base_sink_do_preroll (basesink, obj);
2203   if (G_UNLIKELY (ret != GST_FLOW_OK))
2204     goto preroll_failed;
2205
2206   /* After rendering we store the position of the last buffer so that we can use
2207    * it to report the position. We need to take the lock here. */
2208   GST_OBJECT_LOCK (basesink);
2209   priv->current_sstart = sstart;
2210   priv->current_sstop = (sstop != -1 ? sstop : sstart);
2211   GST_OBJECT_UNLOCK (basesink);
2212
2213   /* update the segment with a pending step if the current one is invalid and we
2214    * have a new pending one. We only accept new step updates after a preroll */
2215   if (G_UNLIKELY (pending->valid && !current->valid)) {
2216     start_stepping (basesink, &basesink->segment, pending, current);
2217     goto do_step;
2218   }
2219
2220   if (!do_sync)
2221     goto done;
2222
2223   /* adjust for latency */
2224   stime = gst_base_sink_adjust_time (basesink, rstart);
2225
2226   /* adjust for render-delay, avoid underflows */
2227   if (stime != -1) {
2228     if (stime > priv->render_delay)
2229       stime -= priv->render_delay;
2230     else
2231       stime = 0;
2232   }
2233
2234   /* preroll done, we can sync since we are in PLAYING now. */
2235   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2236       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2237       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2238
2239   /* This function will return immediatly if start == -1, no clock
2240    * or sync is disabled with GST_CLOCK_BADTIME. */
2241   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2242
2243   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
2244
2245   /* invalid time, no clock or sync disabled, just render */
2246   if (status == GST_CLOCK_BADTIME)
2247     goto done;
2248
2249   /* waiting could have been interrupted and we can be flushing now */
2250   if (G_UNLIKELY (basesink->flushing))
2251     goto flushing;
2252
2253   /* check for unlocked by a state change, we are not flushing so
2254    * we can try to preroll on the current buffer. */
2255   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2256     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2257     priv->call_preroll = TRUE;
2258     goto again;
2259   }
2260
2261   /* successful syncing done, record observation */
2262   priv->current_jitter = jitter;
2263
2264   /* check if the object should be dropped */
2265   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2266       status, jitter);
2267
2268 done:
2269   return GST_FLOW_OK;
2270
2271   /* ERRORS */
2272 step_skipped:
2273   {
2274     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2275     *late = TRUE;
2276     return GST_FLOW_OK;
2277   }
2278 not_syncable:
2279   {
2280     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2281     return GST_FLOW_OK;
2282   }
2283 flushing:
2284   {
2285     GST_DEBUG_OBJECT (basesink, "we are flushing");
2286     return GST_FLOW_WRONG_STATE;
2287   }
2288 preroll_failed:
2289   {
2290     GST_DEBUG_OBJECT (basesink, "preroll failed");
2291     return ret;
2292   }
2293 }
2294
2295 static gboolean
2296 gst_base_sink_send_qos (GstBaseSink * basesink,
2297     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2298 {
2299   GstEvent *event;
2300   gboolean res;
2301
2302   /* generate Quality-of-Service event */
2303   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2304       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2305       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2306
2307   event = gst_event_new_qos (proportion, diff, time);
2308
2309   /* send upstream */
2310   res = gst_pad_push_event (basesink->sinkpad, event);
2311
2312   return res;
2313 }
2314
2315 static void
2316 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2317 {
2318   GstBaseSinkPrivate *priv;
2319   GstClockTime start, stop;
2320   GstClockTimeDiff jitter;
2321   GstClockTime pt, entered, left;
2322   GstClockTime duration;
2323   gdouble rate;
2324
2325   priv = sink->priv;
2326
2327   start = priv->current_rstart;
2328
2329   /* if Quality-of-Service disabled, do nothing */
2330   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2331     return;
2332
2333   stop = priv->current_rstop;
2334   jitter = priv->current_jitter;
2335
2336   if (jitter < 0) {
2337     /* this is the time the buffer entered the sink */
2338     if (start < -jitter)
2339       entered = 0;
2340     else
2341       entered = start + jitter;
2342     left = start;
2343   } else {
2344     /* this is the time the buffer entered the sink */
2345     entered = start + jitter;
2346     /* this is the time the buffer left the sink */
2347     left = start + jitter;
2348   }
2349
2350   /* calculate duration of the buffer */
2351   if (stop != -1)
2352     duration = stop - start;
2353   else
2354     duration = -1;
2355
2356   /* if we have the time when the last buffer left us, calculate
2357    * processing time */
2358   if (priv->last_left != -1) {
2359     if (entered > priv->last_left) {
2360       pt = entered - priv->last_left;
2361     } else {
2362       pt = 0;
2363     }
2364   } else {
2365     pt = priv->avg_pt;
2366   }
2367
2368   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2369       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2370       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2371       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2372       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2373       jitter);
2374
2375   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2376       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2377       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2378       priv->avg_rate);
2379
2380   /* collect running averages. for first observations, we copy the
2381    * values */
2382   if (priv->avg_duration == -1)
2383     priv->avg_duration = duration;
2384   else
2385     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2386
2387   if (priv->avg_pt == -1)
2388     priv->avg_pt = pt;
2389   else
2390     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2391
2392   if (priv->avg_duration != 0)
2393     rate =
2394         gst_guint64_to_gdouble (priv->avg_pt) /
2395         gst_guint64_to_gdouble (priv->avg_duration);
2396   else
2397     rate = 0.0;
2398
2399   if (priv->last_left != -1) {
2400     if (dropped || priv->avg_rate < 0.0) {
2401       priv->avg_rate = rate;
2402     } else {
2403       if (rate > 1.0)
2404         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2405       else
2406         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2407     }
2408   }
2409
2410   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2411       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2412       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2413       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2414
2415
2416   if (priv->avg_rate >= 0.0) {
2417     /* if we have a valid rate, start sending QoS messages */
2418     if (priv->current_jitter < 0) {
2419       /* make sure we never go below 0 when adding the jitter to the
2420        * timestamp. */
2421       if (priv->current_rstart < -priv->current_jitter)
2422         priv->current_jitter = -priv->current_rstart;
2423     }
2424     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2425         priv->current_jitter);
2426   }
2427
2428   /* record when this buffer will leave us */
2429   priv->last_left = left;
2430 }
2431
2432 /* reset all qos measuring */
2433 static void
2434 gst_base_sink_reset_qos (GstBaseSink * sink)
2435 {
2436   GstBaseSinkPrivate *priv;
2437
2438   priv = sink->priv;
2439
2440   priv->last_in_time = -1;
2441   priv->last_left = -1;
2442   priv->avg_duration = -1;
2443   priv->avg_pt = -1;
2444   priv->avg_rate = -1.0;
2445   priv->avg_render = -1;
2446   priv->rendered = 0;
2447   priv->dropped = 0;
2448
2449 }
2450
2451 /* Checks if the object was scheduled too late.
2452  *
2453  * start/stop contain the raw timestamp start and stop values
2454  * of the object.
2455  *
2456  * status and jitter contain the return values from the clock wait.
2457  *
2458  * returns TRUE if the buffer was too late.
2459  */
2460 static gboolean
2461 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2462     GstClockTime start, GstClockTime stop,
2463     GstClockReturn status, GstClockTimeDiff jitter)
2464 {
2465   gboolean late;
2466   gint64 max_lateness;
2467   GstBaseSinkPrivate *priv;
2468
2469   priv = basesink->priv;
2470
2471   late = FALSE;
2472
2473   /* only for objects that were too late */
2474   if (G_LIKELY (status != GST_CLOCK_EARLY))
2475     goto in_time;
2476
2477   max_lateness = basesink->abidata.ABI.max_lateness;
2478
2479   /* check if frame dropping is enabled */
2480   if (max_lateness == -1)
2481     goto no_drop;
2482
2483   /* only check for buffers */
2484   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2485     goto not_buffer;
2486
2487   /* can't do check if we don't have a timestamp */
2488   if (G_UNLIKELY (start == -1))
2489     goto no_timestamp;
2490
2491   /* we can add a valid stop time */
2492   if (stop != -1)
2493     max_lateness += stop;
2494   else
2495     max_lateness += start;
2496
2497   /* if the jitter bigger than duration and lateness we are too late */
2498   if ((late = start + jitter > max_lateness)) {
2499     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2500         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2501         GST_TIME_ARGS (max_lateness));
2502     /* !!emergency!!, if we did not receive anything valid for more than a 
2503      * second, render it anyway so the user sees something */
2504     if (priv->last_in_time != -1 && start - priv->last_in_time > GST_SECOND) {
2505       late = FALSE;
2506       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2507           (_("A lot of buffers are being dropped.")),
2508           ("There may be a timestamping problem, or this computer is too slow."));
2509       GST_DEBUG_OBJECT (basesink,
2510           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2511           GST_TIME_ARGS (priv->last_in_time));
2512     }
2513   }
2514
2515 done:
2516   if (!late) {
2517     priv->last_in_time = start;
2518   }
2519   return late;
2520
2521   /* all is fine */
2522 in_time:
2523   {
2524     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2525     goto done;
2526   }
2527 no_drop:
2528   {
2529     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2530     goto done;
2531   }
2532 not_buffer:
2533   {
2534     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2535     return FALSE;
2536   }
2537 no_timestamp:
2538   {
2539     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2540     return FALSE;
2541   }
2542 }
2543
2544 /* called before and after calling the render vmethod. It keeps track of how
2545  * much time was spent in the render method and is used to check if we are
2546  * flooded */
2547 static void
2548 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2549 {
2550   GstBaseSinkPrivate *priv;
2551
2552   priv = basesink->priv;
2553
2554   if (start) {
2555     priv->start = gst_util_get_timestamp ();
2556   } else {
2557     GstClockTime elapsed;
2558
2559     priv->stop = gst_util_get_timestamp ();
2560
2561     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2562
2563     if (priv->avg_render == -1)
2564       priv->avg_render = elapsed;
2565     else
2566       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2567
2568     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2569         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2570   }
2571 }
2572
2573 /* with STREAM_LOCK, PREROLL_LOCK,
2574  *
2575  * Synchronize the object on the clock and then render it.
2576  *
2577  * takes ownership of obj.
2578  */
2579 static GstFlowReturn
2580 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2581     GstMiniObject * obj)
2582 {
2583   GstFlowReturn ret;
2584   GstBaseSinkClass *bclass;
2585   gboolean late, step_end;
2586
2587   GstBaseSinkPrivate *priv;
2588
2589   priv = basesink->priv;
2590
2591 again:
2592   late = FALSE;
2593   step_end = FALSE;
2594   ret = GST_FLOW_OK;
2595
2596   /* synchronize this object, non syncable objects return OK
2597    * immediatly. */
2598   ret = gst_base_sink_do_sync (basesink, pad, obj, &late, &step_end);
2599   if (G_UNLIKELY (ret != GST_FLOW_OK))
2600     goto sync_failed;
2601
2602   /* and now render, event or buffer. */
2603   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2604     GstBuffer *buf;
2605
2606     /* drop late buffers unconditionally, let's hope it's unlikely */
2607     if (G_UNLIKELY (late && !step_end))
2608       goto dropped;
2609
2610     buf = GST_BUFFER_CAST (obj);
2611
2612     gst_base_sink_set_last_buffer (basesink, buf);
2613
2614     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2615
2616     if (G_LIKELY (bclass->render)) {
2617       gint do_qos;
2618
2619       /* read once, to get same value before and after */
2620       do_qos = g_atomic_int_get (&priv->qos_enabled);
2621
2622       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2623
2624       /* record rendering time for QoS and stats */
2625       if (do_qos)
2626         gst_base_sink_do_render_stats (basesink, TRUE);
2627
2628       ret = bclass->render (basesink, buf);
2629
2630       if (do_qos)
2631         gst_base_sink_do_render_stats (basesink, FALSE);
2632
2633       if (ret == GST_FLOW_STEP)
2634         goto again;
2635
2636       if (step_end) {
2637         stop_stepping (basesink, &basesink->segment, &priv->current_step,
2638             priv->current_rstart, priv->current_rstop);
2639         goto again;
2640       }
2641
2642       priv->rendered++;
2643     }
2644   } else {
2645     GstEvent *event = GST_EVENT_CAST (obj);
2646     gboolean event_res = TRUE;
2647     GstEventType type;
2648
2649     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2650
2651     type = GST_EVENT_TYPE (event);
2652
2653     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2654         gst_event_type_get_name (type));
2655
2656     if (bclass->event)
2657       event_res = bclass->event (basesink, event);
2658
2659     /* when we get here we could be flushing again when the event handler calls
2660      * _wait_eos(). We have to ignore this object in that case. */
2661     if (G_UNLIKELY (basesink->flushing))
2662       goto flushing;
2663
2664     if (G_LIKELY (event_res)) {
2665       guint32 seqnum;
2666
2667       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2668       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2669
2670       switch (type) {
2671         case GST_EVENT_EOS:
2672         {
2673           GstMessage *message;
2674
2675           /* the EOS event is completely handled so we mark
2676            * ourselves as being in the EOS state. eos is also 
2677            * protected by the object lock so we can read it when 
2678            * answering the POSITION query. */
2679           GST_OBJECT_LOCK (basesink);
2680           basesink->eos = TRUE;
2681           GST_OBJECT_UNLOCK (basesink);
2682
2683           /* ok, now we can post the message */
2684           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2685
2686           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2687           gst_message_set_seqnum (message, seqnum);
2688           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2689           break;
2690         }
2691         case GST_EVENT_NEWSEGMENT:
2692           /* configure the segment */
2693           gst_base_sink_configure_segment (basesink, pad, event,
2694               &basesink->segment);
2695           break;
2696         default:
2697           break;
2698       }
2699     }
2700   }
2701
2702 done:
2703   gst_base_sink_perform_qos (basesink, late);
2704
2705   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2706   gst_mini_object_unref (obj);
2707
2708   return ret;
2709
2710   /* ERRORS */
2711 sync_failed:
2712   {
2713     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2714     goto done;
2715   }
2716 dropped:
2717   {
2718     priv->dropped++;
2719     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2720     goto done;
2721   }
2722 flushing:
2723   {
2724     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2725     gst_mini_object_unref (obj);
2726     return GST_FLOW_WRONG_STATE;
2727   }
2728 }
2729
2730 /* with STREAM_LOCK, PREROLL_LOCK
2731  *
2732  * Perform preroll on the given object. For buffers this means 
2733  * calling the preroll subclass method. 
2734  * If that succeeds, the state will be commited.
2735  *
2736  * function does not take ownership of obj.
2737  */
2738 static GstFlowReturn
2739 gst_base_sink_preroll_object (GstBaseSink * basesink, GstMiniObject * obj)
2740 {
2741   GstFlowReturn ret;
2742
2743   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2744
2745   /* if it's a buffer, we need to call the preroll method */
2746   if (G_LIKELY (GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2747     GstBaseSinkClass *bclass;
2748     GstBuffer *buf;
2749     GstClockTime timestamp;
2750
2751     buf = GST_BUFFER_CAST (obj);
2752     timestamp = GST_BUFFER_TIMESTAMP (buf);
2753
2754     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2755         GST_TIME_ARGS (timestamp));
2756
2757     gst_base_sink_set_last_buffer (basesink, buf);
2758
2759     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2760     if (bclass->preroll)
2761       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2762         goto preroll_failed;
2763
2764     basesink->priv->call_preroll = FALSE;
2765   }
2766
2767   /* commit state */
2768   if (G_LIKELY (basesink->playing_async)) {
2769     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2770       goto stopping;
2771   }
2772
2773   return GST_FLOW_OK;
2774
2775   /* ERRORS */
2776 preroll_failed:
2777   {
2778     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2779     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2780     return ret;
2781   }
2782 stopping:
2783   {
2784     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2785     return GST_FLOW_WRONG_STATE;
2786   }
2787 }
2788
2789 /* with STREAM_LOCK, PREROLL_LOCK 
2790  *
2791  * Queue an object for rendering.
2792  * The first prerollable object queued will complete the preroll. If the
2793  * preroll queue if filled, we render all the objects in the queue.
2794  *
2795  * This function takes ownership of the object.
2796  */
2797 static GstFlowReturn
2798 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2799     GstMiniObject * obj, gboolean prerollable)
2800 {
2801   GstFlowReturn ret = GST_FLOW_OK;
2802   gint length;
2803   GQueue *q;
2804
2805   if (G_UNLIKELY (basesink->need_preroll)) {
2806     if (G_LIKELY (prerollable))
2807       basesink->preroll_queued++;
2808
2809     length = basesink->preroll_queued;
2810
2811     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2812
2813     /* first prerollable item needs to finish the preroll */
2814     if (length == 1) {
2815       ret = gst_base_sink_preroll_object (basesink, obj);
2816       if (G_UNLIKELY (ret != GST_FLOW_OK))
2817         goto preroll_failed;
2818     }
2819     /* need to recheck if we need preroll, commmit state during preroll 
2820      * could have made us not need more preroll. */
2821     if (G_UNLIKELY (basesink->need_preroll)) {
2822       /* see if we can render now, if we can't add the object to the preroll
2823        * queue. */
2824       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2825         goto more_preroll;
2826     }
2827   }
2828
2829   /* we can start rendering (or blocking) the queued object
2830    * if any. */
2831   q = basesink->preroll_queue;
2832   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2833     GstMiniObject *o;
2834
2835     o = g_queue_pop_head (q);
2836     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2837
2838     /* do something with the return value */
2839     ret = gst_base_sink_render_object (basesink, pad, o);
2840     if (ret != GST_FLOW_OK)
2841       goto dequeue_failed;
2842   }
2843
2844   /* now render the object */
2845   ret = gst_base_sink_render_object (basesink, pad, obj);
2846   basesink->preroll_queued = 0;
2847
2848   return ret;
2849
2850   /* special cases */
2851 preroll_failed:
2852   {
2853     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2854         gst_flow_get_name (ret));
2855     gst_mini_object_unref (obj);
2856     return ret;
2857   }
2858 more_preroll:
2859   {
2860     /* add object to the queue and return */
2861     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2862         length, basesink->preroll_queue_max_len);
2863     g_queue_push_tail (basesink->preroll_queue, obj);
2864     return GST_FLOW_OK;
2865   }
2866 dequeue_failed:
2867   {
2868     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2869         gst_flow_get_name (ret));
2870     gst_mini_object_unref (obj);
2871     return ret;
2872   }
2873 }
2874
2875 /* with STREAM_LOCK
2876  *
2877  * This function grabs the PREROLL_LOCK and adds the object to
2878  * the queue.
2879  *
2880  * This function takes ownership of obj.
2881  */
2882 static GstFlowReturn
2883 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2884     GstMiniObject * obj, gboolean prerollable)
2885 {
2886   GstFlowReturn ret;
2887
2888   GST_PAD_PREROLL_LOCK (pad);
2889   if (G_UNLIKELY (basesink->flushing))
2890     goto flushing;
2891
2892   if (G_UNLIKELY (basesink->priv->received_eos))
2893     goto was_eos;
2894
2895   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2896   GST_PAD_PREROLL_UNLOCK (pad);
2897
2898   return ret;
2899
2900   /* ERRORS */
2901 flushing:
2902   {
2903     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2904     GST_PAD_PREROLL_UNLOCK (pad);
2905     gst_mini_object_unref (obj);
2906     return GST_FLOW_WRONG_STATE;
2907   }
2908 was_eos:
2909   {
2910     GST_DEBUG_OBJECT (basesink,
2911         "we are EOS, dropping object, return UNEXPECTED");
2912     GST_PAD_PREROLL_UNLOCK (pad);
2913     gst_mini_object_unref (obj);
2914     return GST_FLOW_UNEXPECTED;
2915   }
2916 }
2917
2918 static void
2919 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2920 {
2921   /* make sure we are not blocked on the clock also clear any pending
2922    * eos state. */
2923   gst_base_sink_set_flushing (basesink, pad, TRUE);
2924
2925   /* we grab the stream lock but that is not needed since setting the
2926    * sink to flushing would make sure no state commit is being done
2927    * anymore */
2928   GST_PAD_STREAM_LOCK (pad);
2929   gst_base_sink_reset_qos (basesink);
2930   if (basesink->priv->async_enabled) {
2931     /* and we need to commit our state again on the next
2932      * prerolled buffer */
2933     basesink->playing_async = TRUE;
2934     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2935   } else {
2936     basesink->priv->have_latency = TRUE;
2937     basesink->need_preroll = FALSE;
2938   }
2939   gst_base_sink_set_last_buffer (basesink, NULL);
2940   GST_PAD_STREAM_UNLOCK (pad);
2941 }
2942
2943 static void
2944 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2945 {
2946   /* unset flushing so we can accept new data, this also flushes out any EOS
2947    * event. */
2948   gst_base_sink_set_flushing (basesink, pad, FALSE);
2949
2950   /* for position reporting */
2951   GST_OBJECT_LOCK (basesink);
2952   basesink->priv->current_sstart = -1;
2953   basesink->priv->current_sstop = -1;
2954   basesink->priv->eos_rtime = -1;
2955   basesink->priv->call_preroll = TRUE;
2956   basesink->priv->current_step.valid = FALSE;
2957   basesink->priv->pending_step.valid = FALSE;
2958   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2959     /* we need new segment info after the flush. */
2960     basesink->have_newsegment = FALSE;
2961     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2962     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2963   }
2964   GST_OBJECT_UNLOCK (basesink);
2965 }
2966
2967 static gboolean
2968 gst_base_sink_event (GstPad * pad, GstEvent * event)
2969 {
2970   GstBaseSink *basesink;
2971   gboolean result = TRUE;
2972   GstBaseSinkClass *bclass;
2973
2974   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2975
2976   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2977
2978   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2979       GST_EVENT_TYPE_NAME (event));
2980
2981   switch (GST_EVENT_TYPE (event)) {
2982     case GST_EVENT_EOS:
2983     {
2984       GstFlowReturn ret;
2985
2986       GST_PAD_PREROLL_LOCK (pad);
2987       if (G_UNLIKELY (basesink->flushing))
2988         goto flushing;
2989
2990       if (G_UNLIKELY (basesink->priv->received_eos)) {
2991         /* we can't accept anything when we are EOS */
2992         result = FALSE;
2993         gst_event_unref (event);
2994       } else {
2995         /* we set the received EOS flag here so that we can use it when testing if
2996          * we are prerolled and to refuse more buffers. */
2997         basesink->priv->received_eos = TRUE;
2998
2999         /* EOS is a prerollable object, we call the unlocked version because it
3000          * does not check the received_eos flag. */
3001         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3002             GST_MINI_OBJECT_CAST (event), TRUE);
3003         if (G_UNLIKELY (ret != GST_FLOW_OK))
3004           result = FALSE;
3005       }
3006       GST_PAD_PREROLL_UNLOCK (pad);
3007       break;
3008     }
3009     case GST_EVENT_NEWSEGMENT:
3010     {
3011       GstFlowReturn ret;
3012
3013       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3014
3015       GST_PAD_PREROLL_LOCK (pad);
3016       if (G_UNLIKELY (basesink->flushing))
3017         goto flushing;
3018
3019       if (G_UNLIKELY (basesink->priv->received_eos)) {
3020         /* we can't accept anything when we are EOS */
3021         result = FALSE;
3022         gst_event_unref (event);
3023       } else {
3024         /* the new segment is a non prerollable item and does not block anything,
3025          * we need to configure the current clipping segment and insert the event 
3026          * in the queue to serialize it with the buffers for rendering. */
3027         gst_base_sink_configure_segment (basesink, pad, event,
3028             basesink->abidata.ABI.clip_segment);
3029
3030         ret =
3031             gst_base_sink_queue_object_unlocked (basesink, pad,
3032             GST_MINI_OBJECT_CAST (event), FALSE);
3033         if (G_UNLIKELY (ret != GST_FLOW_OK))
3034           result = FALSE;
3035         else {
3036           GST_OBJECT_LOCK (basesink);
3037           basesink->have_newsegment = TRUE;
3038           GST_OBJECT_UNLOCK (basesink);
3039         }
3040       }
3041       GST_PAD_PREROLL_UNLOCK (pad);
3042       break;
3043     }
3044     case GST_EVENT_FLUSH_START:
3045       if (bclass->event)
3046         bclass->event (basesink, event);
3047
3048       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3049
3050       gst_base_sink_flush_start (basesink, pad);
3051
3052       gst_event_unref (event);
3053       break;
3054     case GST_EVENT_FLUSH_STOP:
3055       if (bclass->event)
3056         bclass->event (basesink, event);
3057
3058       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3059
3060       gst_base_sink_flush_stop (basesink, pad);
3061
3062       gst_event_unref (event);
3063       break;
3064     default:
3065       /* other events are sent to queue or subclass depending on if they
3066        * are serialized. */
3067       if (GST_EVENT_IS_SERIALIZED (event)) {
3068         gst_base_sink_queue_object (basesink, pad,
3069             GST_MINI_OBJECT_CAST (event), FALSE);
3070       } else {
3071         if (bclass->event)
3072           bclass->event (basesink, event);
3073         gst_event_unref (event);
3074       }
3075       break;
3076   }
3077 done:
3078   gst_object_unref (basesink);
3079
3080   return result;
3081
3082   /* ERRORS */
3083 flushing:
3084   {
3085     GST_DEBUG_OBJECT (basesink, "we are flushing");
3086     GST_PAD_PREROLL_UNLOCK (pad);
3087     result = FALSE;
3088     gst_event_unref (event);
3089     goto done;
3090   }
3091 }
3092
3093 /* default implementation to calculate the start and end
3094  * timestamps on a buffer, subclasses can override
3095  */
3096 static void
3097 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3098     GstClockTime * start, GstClockTime * end)
3099 {
3100   GstClockTime timestamp, duration;
3101
3102   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3103   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3104
3105     /* get duration to calculate end time */
3106     duration = GST_BUFFER_DURATION (buffer);
3107     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3108       *end = timestamp + duration;
3109     }
3110     *start = timestamp;
3111   }
3112 }
3113
3114 /* must be called with PREROLL_LOCK */
3115 static gboolean
3116 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3117 {
3118   gboolean is_prerolled, res;
3119
3120   /* we have 2 cases where the PREROLL_LOCK is released:
3121    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3122    *  2) we are syncing on the clock
3123    */
3124   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3125   res = !is_prerolled;
3126
3127   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3128       basesink->have_preroll, basesink->priv->received_eos, res);
3129
3130   return res;
3131 }
3132
3133 /* with STREAM_LOCK, PREROLL_LOCK 
3134  *
3135  * Takes a buffer and compare the timestamps with the last segment.
3136  * If the buffer falls outside of the segment boundaries, drop it.
3137  * Else queue the buffer for preroll and rendering.
3138  *
3139  * This function takes ownership of the buffer.
3140  */
3141 static GstFlowReturn
3142 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3143     GstBuffer * buf)
3144 {
3145   GstBaseSinkClass *bclass;
3146   GstFlowReturn result;
3147   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3148   GstSegment *clip_segment;
3149
3150   if (G_UNLIKELY (basesink->flushing))
3151     goto flushing;
3152
3153   if (G_UNLIKELY (basesink->priv->received_eos))
3154     goto was_eos;
3155
3156   /* for code clarity */
3157   clip_segment = basesink->abidata.ABI.clip_segment;
3158
3159   if (G_UNLIKELY (!basesink->have_newsegment)) {
3160     gboolean sync;
3161
3162     sync = gst_base_sink_get_sync (basesink);
3163     if (sync) {
3164       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3165           (_("Internal data flow problem.")),
3166           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3167     }
3168
3169     /* this means this sink will assume timestamps start from 0 */
3170     GST_OBJECT_LOCK (basesink);
3171     clip_segment->start = 0;
3172     clip_segment->stop = -1;
3173     basesink->segment.start = 0;
3174     basesink->segment.stop = -1;
3175     basesink->have_newsegment = TRUE;
3176     GST_OBJECT_UNLOCK (basesink);
3177   }
3178
3179   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3180
3181   /* check if the buffer needs to be dropped, we first ask the subclass for the
3182    * start and end */
3183   if (bclass->get_times)
3184     bclass->get_times (basesink, buf, &start, &end);
3185
3186   if (start == -1) {
3187     /* if the subclass does not want sync, we use our own values so that we at
3188      * least clip the buffer to the segment */
3189     gst_base_sink_get_times (basesink, buf, &start, &end);
3190   }
3191
3192   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3193       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3194
3195   /* a dropped buffer does not participate in anything */
3196   if (GST_CLOCK_TIME_IS_VALID (start) &&
3197       (clip_segment->format == GST_FORMAT_TIME)) {
3198     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3199                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3200       goto out_of_segment;
3201   }
3202
3203   /* now we can process the buffer in the queue, this function takes ownership
3204    * of the buffer */
3205   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3206       GST_MINI_OBJECT_CAST (buf), TRUE);
3207
3208   return result;
3209
3210   /* ERRORS */
3211 flushing:
3212   {
3213     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3214     gst_buffer_unref (buf);
3215     return GST_FLOW_WRONG_STATE;
3216   }
3217 was_eos:
3218   {
3219     GST_DEBUG_OBJECT (basesink,
3220         "we are EOS, dropping object, return UNEXPECTED");
3221     gst_buffer_unref (buf);
3222     return GST_FLOW_UNEXPECTED;
3223   }
3224 out_of_segment:
3225   {
3226     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3227     gst_buffer_unref (buf);
3228     return GST_FLOW_OK;
3229   }
3230 }
3231
3232 /* with STREAM_LOCK
3233  */
3234 static GstFlowReturn
3235 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3236 {
3237   GstBaseSink *basesink;
3238   GstFlowReturn result;
3239
3240   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3241
3242   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3243     goto wrong_mode;
3244
3245   GST_PAD_PREROLL_LOCK (pad);
3246   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3247   GST_PAD_PREROLL_UNLOCK (pad);
3248
3249 done:
3250   return result;
3251
3252   /* ERRORS */
3253 wrong_mode:
3254   {
3255     GST_OBJECT_LOCK (pad);
3256     GST_WARNING_OBJECT (basesink,
3257         "Push on pad %s:%s, but it was not activated in push mode",
3258         GST_DEBUG_PAD_NAME (pad));
3259     GST_OBJECT_UNLOCK (pad);
3260     gst_buffer_unref (buf);
3261     /* we don't post an error message this will signal to the peer
3262      * pushing that EOS is reached. */
3263     result = GST_FLOW_UNEXPECTED;
3264     goto done;
3265   }
3266 }
3267
3268 static gboolean
3269 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3270 {
3271   gboolean res = TRUE;
3272
3273   /* update our offset if the start/stop position was updated */
3274   if (segment->format == GST_FORMAT_BYTES) {
3275     segment->time = segment->start;
3276   } else if (segment->start == 0) {
3277     /* seek to start, we can implement a default for this. */
3278     segment->time = 0;
3279   } else {
3280     res = FALSE;
3281     GST_INFO_OBJECT (sink, "Can't do a default seek");
3282   }
3283
3284   return res;
3285 }
3286
3287 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3288
3289 static gboolean
3290 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3291     GstEvent * event, GstSegment * segment)
3292 {
3293   /* By default, we try one of 2 things:
3294    *   - For absolute seek positions, convert the requested position to our 
3295    *     configured processing format and place it in the output segment \
3296    *   - For relative seek positions, convert our current (input) values to the
3297    *     seek format, adjust by the relative seek offset and then convert back to
3298    *     the processing format
3299    */
3300   GstSeekType cur_type, stop_type;
3301   gint64 cur, stop;
3302   GstSeekFlags flags;
3303   GstFormat seek_format, dest_format;
3304   gdouble rate;
3305   gboolean update;
3306   gboolean res = TRUE;
3307
3308   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3309       &cur_type, &cur, &stop_type, &stop);
3310   dest_format = segment->format;
3311
3312   if (seek_format == dest_format) {
3313     gst_segment_set_seek (segment, rate, seek_format, flags,
3314         cur_type, cur, stop_type, stop, &update);
3315     return TRUE;
3316   }
3317
3318   if (cur_type != GST_SEEK_TYPE_NONE) {
3319     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3320     res =
3321         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3322         &cur);
3323     cur_type = GST_SEEK_TYPE_SET;
3324   }
3325
3326   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3327     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3328     res =
3329         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3330         &stop);
3331     stop_type = GST_SEEK_TYPE_SET;
3332   }
3333
3334   /* And finally, configure our output segment in the desired format */
3335   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3336       stop_type, stop, &update);
3337
3338   if (!res)
3339     goto no_format;
3340
3341   return res;
3342
3343 no_format:
3344   {
3345     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3346     return FALSE;
3347   }
3348 }
3349
3350 /* perform a seek, only executed in pull mode */
3351 static gboolean
3352 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3353 {
3354   gboolean flush;
3355   gdouble rate;
3356   GstFormat seek_format, dest_format;
3357   GstSeekFlags flags;
3358   GstSeekType cur_type, stop_type;
3359   gboolean seekseg_configured = FALSE;
3360   gint64 cur, stop;
3361   gboolean update, res = TRUE;
3362   GstSegment seeksegment;
3363
3364   dest_format = sink->segment.format;
3365
3366   if (event) {
3367     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3368     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3369         &cur_type, &cur, &stop_type, &stop);
3370
3371     flush = flags & GST_SEEK_FLAG_FLUSH;
3372   } else {
3373     GST_DEBUG_OBJECT (sink, "performing seek without event");
3374     flush = FALSE;
3375   }
3376
3377   if (flush) {
3378     GST_DEBUG_OBJECT (sink, "flushing upstream");
3379     gst_pad_push_event (pad, gst_event_new_flush_start ());
3380     gst_base_sink_flush_start (sink, pad);
3381   } else {
3382     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3383   }
3384
3385   GST_PAD_STREAM_LOCK (pad);
3386
3387   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3388    * copy the current segment info into the temp segment that we can actually
3389    * attempt the seek with. We only update the real segment if the seek suceeds. */
3390   if (!seekseg_configured) {
3391     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3392
3393     /* now configure the final seek segment */
3394     if (event) {
3395       if (sink->segment.format != seek_format) {
3396         /* OK, here's where we give the subclass a chance to convert the relative
3397          * seek into an absolute one in the processing format. We set up any
3398          * absolute seek above, before taking the stream lock. */
3399         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3400                 &seeksegment)) {
3401           GST_DEBUG_OBJECT (sink,
3402               "Preparing the seek failed after flushing. " "Aborting seek");
3403           res = FALSE;
3404         }
3405       } else {
3406         /* The seek format matches our processing format, no need to ask the
3407          * the subclass to configure the segment. */
3408         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3409             cur_type, cur, stop_type, stop, &update);
3410       }
3411     }
3412     /* Else, no seek event passed, so we're just (re)starting the 
3413        current segment. */
3414   }
3415
3416   if (res) {
3417     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3418         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3419         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3420
3421     /* do the seek, segment.last_stop contains the new position. */
3422     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3423   }
3424
3425
3426   if (flush) {
3427     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3428     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3429     gst_base_sink_flush_stop (sink, pad);
3430   } else if (res && sink->abidata.ABI.running) {
3431     /* we are running the current segment and doing a non-flushing seek, 
3432      * close the segment first based on the last_stop. */
3433     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3434         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3435   }
3436
3437   /* The subclass must have converted the segment to the processing format 
3438    * by now */
3439   if (res && seeksegment.format != dest_format) {
3440     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3441         "in the correct format. Aborting seek.");
3442     res = FALSE;
3443   }
3444
3445   /* if successfull seek, we update our real segment and push
3446    * out the new segment. */
3447   if (res) {
3448     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3449
3450     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3451       gst_element_post_message (GST_ELEMENT (sink),
3452           gst_message_new_segment_start (GST_OBJECT (sink),
3453               sink->segment.format, sink->segment.last_stop));
3454     }
3455   }
3456
3457   sink->priv->discont = TRUE;
3458   sink->abidata.ABI.running = TRUE;
3459
3460   GST_PAD_STREAM_UNLOCK (pad);
3461
3462   return res;
3463 }
3464
3465 static void
3466 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3467     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3468     gboolean flush, gboolean intermediate)
3469 {
3470   GST_OBJECT_LOCK (sink);
3471   pending->seqnum = seqnum;
3472   pending->format = format;
3473   pending->amount = amount;
3474   pending->position = 0;
3475   pending->rate = rate;
3476   pending->flush = flush;
3477   pending->intermediate = intermediate;
3478   pending->valid = TRUE;
3479   /* flush invalidates the current stepping segment */
3480   if (flush)
3481     current->valid = FALSE;
3482   GST_OBJECT_UNLOCK (sink);
3483 }
3484
3485 static gboolean
3486 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3487 {
3488   GstBaseSinkPrivate *priv;
3489   GstBaseSinkClass *bclass;
3490   gboolean flush, intermediate;
3491   gdouble rate;
3492   GstFormat format;
3493   guint64 amount;
3494   guint seqnum;
3495   GstStepInfo *pending, *current;
3496
3497   bclass = GST_BASE_SINK_GET_CLASS (sink);
3498   priv = sink->priv;
3499
3500   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3501
3502   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3503   seqnum = gst_event_get_seqnum (event);
3504
3505   pending = &priv->pending_step;
3506   current = &priv->current_step;
3507
3508   if (flush) {
3509     /* we need to call ::unlock before locking PREROLL_LOCK
3510      * since we lock it before going into ::render */
3511     if (bclass->unlock)
3512       bclass->unlock (sink);
3513
3514     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3515     /* now that we have the PREROLL lock, clear our unlock request */
3516     if (bclass->unlock_stop)
3517       bclass->unlock_stop (sink);
3518
3519     /* update the stepinfo and make it valid */
3520     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3521         intermediate);
3522
3523     if (sink->priv->async_enabled) {
3524       /* and we need to commit our state again on the next
3525        * prerolled buffer */
3526       sink->playing_async = TRUE;
3527       priv->pending_step.need_preroll = TRUE;
3528       sink->need_preroll = FALSE;
3529       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3530     } else {
3531       sink->priv->have_latency = TRUE;
3532       sink->need_preroll = FALSE;
3533     }
3534     priv->current_sstart = -1;
3535     priv->current_sstop = -1;
3536     priv->eos_rtime = -1;
3537     priv->call_preroll = TRUE;
3538     gst_base_sink_set_last_buffer (sink, NULL);
3539     gst_base_sink_reset_qos (sink);
3540
3541     if (sink->clock_id) {
3542       gst_clock_id_unschedule (sink->clock_id);
3543     }
3544
3545     if (sink->have_preroll) {
3546       GST_DEBUG_OBJECT (sink, "signal waiter");
3547       priv->step_unlock = TRUE;
3548       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3549     }
3550     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3551   } else {
3552     /* update the stepinfo and make it valid */
3553     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3554         intermediate);
3555   }
3556
3557   return TRUE;
3558 }
3559
3560 /* with STREAM_LOCK
3561  */
3562 static void
3563 gst_base_sink_loop (GstPad * pad)
3564 {
3565   GstBaseSink *basesink;
3566   GstBuffer *buf = NULL;
3567   GstFlowReturn result;
3568   guint blocksize;
3569   guint64 offset;
3570
3571   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3572
3573   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3574
3575   if ((blocksize = basesink->priv->blocksize) == 0)
3576     blocksize = -1;
3577
3578   offset = basesink->segment.last_stop;
3579
3580   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3581       offset, blocksize);
3582
3583   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3584   if (G_UNLIKELY (result != GST_FLOW_OK))
3585     goto paused;
3586
3587   if (G_UNLIKELY (buf == NULL))
3588     goto no_buffer;
3589
3590   offset += GST_BUFFER_SIZE (buf);
3591
3592   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3593
3594   GST_PAD_PREROLL_LOCK (pad);
3595   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3596   GST_PAD_PREROLL_UNLOCK (pad);
3597   if (G_UNLIKELY (result != GST_FLOW_OK))
3598     goto paused;
3599
3600   return;
3601
3602   /* ERRORS */
3603 paused:
3604   {
3605     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3606         gst_flow_get_name (result));
3607     gst_pad_pause_task (pad);
3608     /* fatal errors and NOT_LINKED cause EOS */
3609     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3610       if (result == GST_FLOW_UNEXPECTED) {
3611         /* perform EOS logic */
3612         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3613           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3614               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3615                   basesink->segment.format, basesink->segment.last_stop));
3616         } else {
3617           gst_base_sink_event (pad, gst_event_new_eos ());
3618         }
3619       } else {
3620         /* for fatal errors we post an error message, post the error
3621          * first so the app knows about the error first. */
3622         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3623             (_("Internal data stream error.")),
3624             ("stream stopped, reason %s", gst_flow_get_name (result)));
3625         gst_base_sink_event (pad, gst_event_new_eos ());
3626       }
3627     }
3628     return;
3629   }
3630 no_buffer:
3631   {
3632     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3633     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3634         (_("Internal data flow error.")), ("element returned NULL buffer"));
3635     result = GST_FLOW_ERROR;
3636     goto paused;
3637   }
3638 }
3639
3640 static gboolean
3641 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3642     gboolean flushing)
3643 {
3644   GstBaseSinkClass *bclass;
3645
3646   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3647
3648   if (flushing) {
3649     /* unlock any subclasses, we need to do this before grabbing the
3650      * PREROLL_LOCK since we hold this lock before going into ::render. */
3651     if (bclass->unlock)
3652       bclass->unlock (basesink);
3653   }
3654
3655   GST_PAD_PREROLL_LOCK (pad);
3656   basesink->flushing = flushing;
3657   if (flushing) {
3658     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3659     if (bclass->unlock_stop)
3660       bclass->unlock_stop (basesink);
3661
3662     /* set need_preroll before we unblock the clock. If the clock is unblocked
3663      * before timing out, we can reuse the buffer for preroll. */
3664     basesink->need_preroll = TRUE;
3665
3666     /* step 2, unblock clock sync (if any) or any other blocking thing */
3667     if (basesink->clock_id) {
3668       gst_clock_id_unschedule (basesink->clock_id);
3669     }
3670
3671     /* flush out the data thread if it's locked in finish_preroll, this will
3672      * also flush out the EOS state */
3673     GST_DEBUG_OBJECT (basesink,
3674         "flushing out data thread, need preroll to TRUE");
3675     gst_base_sink_preroll_queue_flush (basesink, pad);
3676   }
3677   GST_PAD_PREROLL_UNLOCK (pad);
3678
3679   return TRUE;
3680 }
3681
3682 static gboolean
3683 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3684 {
3685   gboolean result;
3686
3687   if (active) {
3688     /* start task */
3689     result = gst_pad_start_task (basesink->sinkpad,
3690         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3691   } else {
3692     /* step 2, make sure streaming finishes */
3693     result = gst_pad_stop_task (basesink->sinkpad);
3694   }
3695
3696   return result;
3697 }
3698
3699 static gboolean
3700 gst_base_sink_pad_activate (GstPad * pad)
3701 {
3702   gboolean result = FALSE;
3703   GstBaseSink *basesink;
3704
3705   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3706
3707   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3708
3709   gst_base_sink_set_flushing (basesink, pad, FALSE);
3710
3711   /* we need to have the pull mode enabled */
3712   if (!basesink->can_activate_pull) {
3713     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3714     goto fallback;
3715   }
3716
3717   /* check if downstreams supports pull mode at all */
3718   if (!gst_pad_check_pull_range (pad)) {
3719     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3720     goto fallback;
3721   }
3722
3723   /* set the pad mode before starting the task so that it's in the
3724    * correct state for the new thread. also the sink set_caps and get_caps
3725    * function checks this */
3726   basesink->pad_mode = GST_ACTIVATE_PULL;
3727
3728   /* we first try to negotiate a format so that when we try to activate
3729    * downstream, it knows about our format */
3730   if (!gst_base_sink_negotiate_pull (basesink)) {
3731     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3732     goto fallback;
3733   }
3734
3735   /* ok activate now */
3736   if (!gst_pad_activate_pull (pad, TRUE)) {
3737     /* clear any pending caps */
3738     GST_OBJECT_LOCK (basesink);
3739     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3740     GST_OBJECT_UNLOCK (basesink);
3741     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3742     goto fallback;
3743   }
3744
3745   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3746   result = TRUE;
3747   goto done;
3748
3749   /* push mode fallback */
3750 fallback:
3751   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3752   if ((result = gst_pad_activate_push (pad, TRUE))) {
3753     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3754   }
3755
3756 done:
3757   if (!result) {
3758     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3759     gst_base_sink_set_flushing (basesink, pad, TRUE);
3760   }
3761
3762   gst_object_unref (basesink);
3763
3764   return result;
3765 }
3766
3767 static gboolean
3768 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3769 {
3770   gboolean result;
3771   GstBaseSink *basesink;
3772
3773   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3774
3775   if (active) {
3776     if (!basesink->can_activate_push) {
3777       result = FALSE;
3778       basesink->pad_mode = GST_ACTIVATE_NONE;
3779     } else {
3780       result = TRUE;
3781       basesink->pad_mode = GST_ACTIVATE_PUSH;
3782     }
3783   } else {
3784     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3785       g_warning ("Internal GStreamer activation error!!!");
3786       result = FALSE;
3787     } else {
3788       gst_base_sink_set_flushing (basesink, pad, TRUE);
3789       result = TRUE;
3790       basesink->pad_mode = GST_ACTIVATE_NONE;
3791     }
3792   }
3793
3794   gst_object_unref (basesink);
3795
3796   return result;
3797 }
3798
3799 static gboolean
3800 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3801 {
3802   GstCaps *caps;
3803   gboolean result;
3804
3805   result = FALSE;
3806
3807   /* this returns the intersection between our caps and the peer caps. If there
3808    * is no peer, it returns NULL and we can't operate in pull mode so we can
3809    * fail the negotiation. */
3810   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3811   if (caps == NULL || gst_caps_is_empty (caps))
3812     goto no_caps_possible;
3813
3814   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3815
3816   caps = gst_caps_make_writable (caps);
3817   /* get the first (prefered) format */
3818   gst_caps_truncate (caps);
3819   /* try to fixate */
3820   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3821
3822   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3823
3824   if (gst_caps_is_any (caps)) {
3825     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3826         "allowing pull()");
3827     /* neither side has template caps in this case, so they are prepared for
3828        pull() without setcaps() */
3829     result = TRUE;
3830   } else if (gst_caps_is_fixed (caps)) {
3831     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3832       goto could_not_set_caps;
3833
3834     GST_OBJECT_LOCK (basesink);
3835     gst_caps_replace (&basesink->priv->pull_caps, caps);
3836     GST_OBJECT_UNLOCK (basesink);
3837
3838     result = TRUE;
3839   }
3840
3841   gst_caps_unref (caps);
3842
3843   return result;
3844
3845 no_caps_possible:
3846   {
3847     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3848     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3849     if (caps)
3850       gst_caps_unref (caps);
3851     return FALSE;
3852   }
3853 could_not_set_caps:
3854   {
3855     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3856     gst_caps_unref (caps);
3857     return FALSE;
3858   }
3859 }
3860
3861 /* this won't get called until we implement an activate function */
3862 static gboolean
3863 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3864 {
3865   gboolean result = FALSE;
3866   GstBaseSink *basesink;
3867   GstBaseSinkClass *bclass;
3868
3869   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3870   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3871
3872   if (active) {
3873     GstFormat format;
3874     gint64 duration;
3875
3876     /* we mark we have a newsegment here because pull based
3877      * mode works just fine without having a newsegment before the
3878      * first buffer */
3879     format = GST_FORMAT_BYTES;
3880
3881     gst_segment_init (&basesink->segment, format);
3882     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3883     GST_OBJECT_LOCK (basesink);
3884     basesink->have_newsegment = TRUE;
3885     GST_OBJECT_UNLOCK (basesink);
3886
3887     /* get the peer duration in bytes */
3888     result = gst_pad_query_peer_duration (pad, &format, &duration);
3889     if (result) {
3890       GST_DEBUG_OBJECT (basesink,
3891           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3892       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3893           duration);
3894       gst_segment_set_duration (&basesink->segment, format, duration);
3895     } else {
3896       GST_DEBUG_OBJECT (basesink, "unknown duration");
3897     }
3898
3899     if (bclass->activate_pull)
3900       result = bclass->activate_pull (basesink, TRUE);
3901     else
3902       result = FALSE;
3903
3904     if (!result)
3905       goto activate_failed;
3906
3907   } else {
3908     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3909       g_warning ("Internal GStreamer activation error!!!");
3910       result = FALSE;
3911     } else {
3912       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3913       if (bclass->activate_pull)
3914         result &= bclass->activate_pull (basesink, FALSE);
3915       basesink->pad_mode = GST_ACTIVATE_NONE;
3916       /* clear any pending caps */
3917       GST_OBJECT_LOCK (basesink);
3918       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3919       GST_OBJECT_UNLOCK (basesink);
3920     }
3921   }
3922   gst_object_unref (basesink);
3923
3924   return result;
3925
3926   /* ERRORS */
3927 activate_failed:
3928   {
3929     /* reset, as starting the thread failed */
3930     basesink->pad_mode = GST_ACTIVATE_NONE;
3931
3932     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3933     return FALSE;
3934   }
3935 }
3936
3937 /* send an event to our sinkpad peer. */
3938 static gboolean
3939 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3940 {
3941   GstPad *pad;
3942   GstBaseSink *basesink = GST_BASE_SINK (element);
3943   gboolean forward, result = TRUE;
3944   GstActivateMode mode;
3945
3946   GST_OBJECT_LOCK (element);
3947   /* get the pad and the scheduling mode */
3948   pad = gst_object_ref (basesink->sinkpad);
3949   mode = basesink->pad_mode;
3950   GST_OBJECT_UNLOCK (element);
3951
3952   /* only push UPSTREAM events upstream */
3953   forward = GST_EVENT_IS_UPSTREAM (event);
3954
3955   switch (GST_EVENT_TYPE (event)) {
3956     case GST_EVENT_LATENCY:
3957     {
3958       GstClockTime latency;
3959
3960       gst_event_parse_latency (event, &latency);
3961
3962       /* store the latency. We use this to adjust the running_time before syncing
3963        * it to the clock. */
3964       GST_OBJECT_LOCK (element);
3965       basesink->priv->latency = latency;
3966       if (!basesink->priv->have_latency)
3967         forward = FALSE;
3968       GST_OBJECT_UNLOCK (element);
3969       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3970           GST_TIME_ARGS (latency));
3971
3972       /* We forward this event so that all elements know about the global pipeline
3973        * latency. This is interesting for an element when it wants to figure out
3974        * when a particular piece of data will be rendered. */
3975       break;
3976     }
3977     case GST_EVENT_SEEK:
3978       /* in pull mode we will execute the seek */
3979       if (mode == GST_ACTIVATE_PULL)
3980         result = gst_base_sink_perform_seek (basesink, pad, event);
3981       break;
3982     case GST_EVENT_STEP:
3983       result = gst_base_sink_perform_step (basesink, pad, event);
3984       forward = FALSE;
3985       break;
3986     default:
3987       break;
3988   }
3989
3990   if (forward) {
3991     result = gst_pad_push_event (pad, event);
3992   } else {
3993     /* not forwarded, unref the event */
3994     gst_event_unref (event);
3995   }
3996
3997   gst_object_unref (pad);
3998   return result;
3999 }
4000
4001 static gboolean
4002 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
4003 {
4004   GstPad *peer;
4005   gboolean res = FALSE;
4006
4007   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
4008     res = gst_pad_query (peer, query);
4009     gst_object_unref (peer);
4010   }
4011   return res;
4012 }
4013
4014 /* get the end position of the last seen object, this is used
4015  * for EOS and for making sure that we don't report a position we
4016  * have not reached yet. With LOCK. */
4017 static gboolean
4018 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
4019     gint64 * cur)
4020 {
4021   GstFormat oformat;
4022   GstSegment *segment;
4023   gboolean ret = TRUE;
4024
4025   segment = &basesink->segment;
4026   oformat = segment->format;
4027
4028   if (oformat == GST_FORMAT_TIME) {
4029     /* return last observed stream time, we keep the stream time around in the
4030      * time format. */
4031     *cur = basesink->priv->current_sstop;
4032   } else {
4033     /* convert last stop to stream time */
4034     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4035   }
4036
4037   if (*cur != -1 && oformat != format) {
4038     GST_OBJECT_UNLOCK (basesink);
4039     /* convert to the target format if we need to, release lock first */
4040     ret =
4041         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4042     if (!ret)
4043       *cur = -1;
4044     GST_OBJECT_LOCK (basesink);
4045   }
4046
4047   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
4048       GST_TIME_ARGS (*cur));
4049
4050   return ret;
4051 }
4052
4053 /* get the position when we are PAUSED, this is the stream time of the buffer
4054  * that prerolled. If no buffer is prerolled (we are still flushing), this
4055  * value will be -1. With LOCK. */
4056 static gboolean
4057 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4058     gint64 * cur)
4059 {
4060   gboolean res;
4061   gint64 time;
4062   GstSegment *segment;
4063   GstFormat oformat;
4064
4065   /* we don't use the clip segment in pull mode, when seeking we update the
4066    * main segment directly with the new segment values without it having to be
4067    * activated by the rendering after preroll */
4068   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4069     segment = basesink->abidata.ABI.clip_segment;
4070   else
4071     segment = &basesink->segment;
4072   oformat = segment->format;
4073
4074   if (oformat == GST_FORMAT_TIME) {
4075     *cur = basesink->priv->current_sstart;
4076     if (segment->rate < 0.0 && basesink->priv->current_sstop != -1) {
4077       /* for reverse playback we prefer the stream time stop position if we have
4078        * one */
4079       *cur = basesink->priv->current_sstop;
4080     }
4081   } else {
4082     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4083   }
4084
4085   time = segment->time;
4086
4087   if (*cur != -1) {
4088     *cur = MAX (*cur, time);
4089     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4090         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4091   } else {
4092     /* we have no buffer, use the segment times. */
4093     if (segment->rate >= 0.0) {
4094       /* forward, next position is always the time of the segment */
4095       *cur = time;
4096       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4097           GST_TIME_ARGS (*cur));
4098     } else {
4099       /* reverse, next expected timestamp is segment->stop. We use the function
4100        * to get things right for negative applied_rates. */
4101       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4102       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4103           GST_TIME_ARGS (*cur));
4104     }
4105   }
4106
4107   res = (*cur != -1);
4108   if (res && oformat != format) {
4109     GST_OBJECT_UNLOCK (basesink);
4110     res =
4111         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4112     if (!res)
4113       *cur = -1;
4114     GST_OBJECT_LOCK (basesink);
4115   }
4116
4117   return res;
4118 }
4119
4120 static gboolean
4121 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4122     gint64 * cur, gboolean * upstream)
4123 {
4124   GstClock *clock;
4125   gboolean res = FALSE;
4126   GstFormat oformat, tformat;
4127   GstClockTime now, base, latency;
4128   gint64 time, accum, duration;
4129   gdouble rate;
4130   gint64 last;
4131
4132   GST_OBJECT_LOCK (basesink);
4133   /* our intermediate time format */
4134   tformat = GST_FORMAT_TIME;
4135   /* get the format in the segment */
4136   oformat = basesink->segment.format;
4137
4138   /* can only give answer based on the clock if not EOS */
4139   if (G_UNLIKELY (basesink->eos))
4140     goto in_eos;
4141
4142   /* we can only get the segment when we are not NULL or READY */
4143   if (!basesink->have_newsegment)
4144     goto wrong_state;
4145
4146   /* when not in PLAYING or when we're busy with a state change, we
4147    * cannot read from the clock so we report time based on the
4148    * last seen timestamp. */
4149   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4150       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4151     goto in_pause;
4152
4153   /* we need to sync on the clock. */
4154   if (basesink->sync == FALSE)
4155     goto no_sync;
4156
4157   /* and we need a clock */
4158   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4159     goto no_sync;
4160
4161   /* collect all data we need holding the lock */
4162   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4163     time = basesink->segment.time;
4164   else
4165     time = 0;
4166
4167   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4168     duration = basesink->segment.stop - basesink->segment.start;
4169   else
4170     duration = 0;
4171
4172   base = GST_ELEMENT_CAST (basesink)->base_time;
4173   accum = basesink->segment.accum;
4174   rate = basesink->segment.rate * basesink->segment.applied_rate;
4175   latency = basesink->priv->latency;
4176
4177   gst_object_ref (clock);
4178
4179   /* this function might release the LOCK */
4180   gst_base_sink_get_position_last (basesink, format, &last);
4181
4182   /* need to release the object lock before we can get the time, 
4183    * a clock might take the LOCK of the provider, which could be
4184    * a basesink subclass. */
4185   GST_OBJECT_UNLOCK (basesink);
4186
4187   now = gst_clock_get_time (clock);
4188
4189   if (oformat != tformat) {
4190     /* convert accum, time and duration to time */
4191     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4192             &accum))
4193       goto convert_failed;
4194     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4195             &duration))
4196       goto convert_failed;
4197     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4198             &time))
4199       goto convert_failed;
4200   }
4201
4202   /* subtract base time and accumulated time from the clock time. 
4203    * Make sure we don't go negative. This is the current time in
4204    * the segment which we need to scale with the combined 
4205    * rate and applied rate. */
4206   base += accum;
4207   base += latency;
4208   base = MIN (now, base);
4209
4210   /* for negative rates we need to count back from from the segment
4211    * duration. */
4212   if (rate < 0.0)
4213     time += duration;
4214
4215   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4216
4217   /* never report more than last seen position */
4218   if (last != -1)
4219     *cur = MIN (last, *cur);
4220
4221   gst_object_unref (clock);
4222
4223   GST_DEBUG_OBJECT (basesink,
4224       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4225       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4226       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4227       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4228
4229   if (oformat != format) {
4230     /* convert time to final format */
4231     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4232       goto convert_failed;
4233   }
4234
4235   res = TRUE;
4236
4237 done:
4238   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4239       res, GST_TIME_ARGS (*cur));
4240   return res;
4241
4242   /* special cases */
4243 in_eos:
4244   {
4245     GST_DEBUG_OBJECT (basesink, "position in EOS");
4246     res = gst_base_sink_get_position_last (basesink, format, cur);
4247     GST_OBJECT_UNLOCK (basesink);
4248     goto done;
4249   }
4250 in_pause:
4251   {
4252     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4253     res = gst_base_sink_get_position_paused (basesink, format, cur);
4254     GST_OBJECT_UNLOCK (basesink);
4255     goto done;
4256   }
4257 wrong_state:
4258   {
4259     /* in NULL or READY we always return FALSE and -1 */
4260     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4261     res = FALSE;
4262     *cur = -1;
4263     GST_OBJECT_UNLOCK (basesink);
4264     goto done;
4265   }
4266 no_sync:
4267   {
4268     /* report last seen timestamp if any, else ask upstream to answer */
4269     if ((*cur = basesink->priv->current_sstart) != -1)
4270       res = TRUE;
4271     else
4272       *upstream = TRUE;
4273
4274     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4275         res, GST_TIME_ARGS (*cur));
4276     GST_OBJECT_UNLOCK (basesink);
4277     return res;
4278   }
4279 convert_failed:
4280   {
4281     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4282     *upstream = TRUE;
4283     return FALSE;
4284   }
4285 }
4286
4287 static gboolean
4288 gst_base_sink_query (GstElement * element, GstQuery * query)
4289 {
4290   gboolean res = FALSE;
4291
4292   GstBaseSink *basesink = GST_BASE_SINK (element);
4293
4294   switch (GST_QUERY_TYPE (query)) {
4295     case GST_QUERY_POSITION:
4296     {
4297       gint64 cur = 0;
4298       GstFormat format;
4299       gboolean upstream = FALSE;
4300
4301       gst_query_parse_position (query, &format, NULL);
4302
4303       GST_DEBUG_OBJECT (basesink, "position format %d", format);
4304
4305       /* first try to get the position based on the clock */
4306       if ((res =
4307               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4308         gst_query_set_position (query, format, cur);
4309       } else if (upstream) {
4310         /* fallback to peer query */
4311         res = gst_base_sink_peer_query (basesink, query);
4312       }
4313       break;
4314     }
4315     case GST_QUERY_DURATION:
4316     {
4317       GstFormat format, uformat;
4318       gint64 duration, uduration;
4319
4320       gst_query_parse_duration (query, &format, NULL);
4321
4322       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4323           gst_format_get_name (format));
4324
4325       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4326         uformat = GST_FORMAT_BYTES;
4327
4328         /* get the duration in bytes, in pull mode that's all we are sure to
4329          * know. We have to explicitly get this value from upstream instead of
4330          * using our cached value because it might change. Duration caching
4331          * should be done at a higher level. */
4332         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4333             &uduration);
4334         if (res) {
4335           gst_segment_set_duration (&basesink->segment, uformat, uduration);
4336           if (format != uformat) {
4337             /* convert to the requested format */
4338             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4339                 &format, &duration);
4340           } else {
4341             duration = uduration;
4342           }
4343           if (res) {
4344             /* set the result */
4345             gst_query_set_duration (query, format, duration);
4346           }
4347         }
4348       } else {
4349         /* in push mode we simply forward upstream */
4350         res = gst_base_sink_peer_query (basesink, query);
4351       }
4352       break;
4353     }
4354     case GST_QUERY_LATENCY:
4355     {
4356       gboolean live, us_live;
4357       GstClockTime min, max;
4358
4359       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4360                   &max))) {
4361         gst_query_set_latency (query, live, min, max);
4362       }
4363       break;
4364     }
4365     case GST_QUERY_JITTER:
4366       break;
4367     case GST_QUERY_RATE:
4368       /* gst_query_set_rate (query, basesink->segment_rate); */
4369       res = TRUE;
4370       break;
4371     case GST_QUERY_SEGMENT:
4372     {
4373       /* FIXME, bring start/stop to stream time */
4374       gst_query_set_segment (query, basesink->segment.rate,
4375           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4376       break;
4377     }
4378     case GST_QUERY_SEEKING:
4379     case GST_QUERY_CONVERT:
4380     case GST_QUERY_FORMATS:
4381     default:
4382       res = gst_base_sink_peer_query (basesink, query);
4383       break;
4384   }
4385   return res;
4386 }
4387
4388 static GstStateChangeReturn
4389 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4390 {
4391   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4392   GstBaseSink *basesink = GST_BASE_SINK (element);
4393   GstBaseSinkClass *bclass;
4394   GstBaseSinkPrivate *priv;
4395
4396   priv = basesink->priv;
4397
4398   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4399
4400   switch (transition) {
4401     case GST_STATE_CHANGE_NULL_TO_READY:
4402       if (bclass->start)
4403         if (!bclass->start (basesink))
4404           goto start_failed;
4405       break;
4406     case GST_STATE_CHANGE_READY_TO_PAUSED:
4407       /* need to complete preroll before this state change completes, there
4408        * is no data flow in READY so we can safely assume we need to preroll. */
4409       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4410       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4411       basesink->have_newsegment = FALSE;
4412       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4413       gst_segment_init (basesink->abidata.ABI.clip_segment,
4414           GST_FORMAT_UNDEFINED);
4415       basesink->offset = 0;
4416       basesink->have_preroll = FALSE;
4417       priv->step_unlock = FALSE;
4418       basesink->need_preroll = TRUE;
4419       basesink->playing_async = TRUE;
4420       priv->current_sstart = -1;
4421       priv->current_sstop = -1;
4422       priv->eos_rtime = -1;
4423       priv->latency = 0;
4424       basesink->eos = FALSE;
4425       priv->received_eos = FALSE;
4426       gst_base_sink_reset_qos (basesink);
4427       priv->commited = FALSE;
4428       priv->call_preroll = TRUE;
4429       priv->current_step.valid = FALSE;
4430       priv->pending_step.valid = FALSE;
4431       if (priv->async_enabled) {
4432         GST_DEBUG_OBJECT (basesink, "doing async state change");
4433         /* when async enabled, post async-start message and return ASYNC from
4434          * the state change function */
4435         ret = GST_STATE_CHANGE_ASYNC;
4436         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4437             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4438       } else {
4439         priv->have_latency = TRUE;
4440       }
4441       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4442       break;
4443     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4444       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4445       if (!gst_base_sink_needs_preroll (basesink)) {
4446         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4447         /* no preroll needed anymore now. */
4448         basesink->playing_async = FALSE;
4449         basesink->need_preroll = FALSE;
4450         if (basesink->eos) {
4451           GstMessage *message;
4452
4453           /* need to post EOS message here */
4454           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4455           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4456           gst_message_set_seqnum (message, basesink->priv->seqnum);
4457           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4458         } else {
4459           GST_DEBUG_OBJECT (basesink, "signal preroll");
4460           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4461         }
4462       } else {
4463         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4464         basesink->need_preroll = TRUE;
4465         basesink->playing_async = TRUE;
4466         priv->call_preroll = TRUE;
4467         priv->commited = FALSE;
4468         if (priv->async_enabled) {
4469           GST_DEBUG_OBJECT (basesink, "doing async state change");
4470           ret = GST_STATE_CHANGE_ASYNC;
4471           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4472               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4473         }
4474       }
4475       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4476       break;
4477     default:
4478       break;
4479   }
4480
4481   {
4482     GstStateChangeReturn bret;
4483
4484     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4485     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4486       goto activate_failed;
4487   }
4488
4489   switch (transition) {
4490     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4491       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4492       /* FIXME, make sure we cannot enter _render first */
4493
4494       /* we need to call ::unlock before locking PREROLL_LOCK
4495        * since we lock it before going into ::render */
4496       if (bclass->unlock)
4497         bclass->unlock (basesink);
4498
4499       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4500       /* now that we have the PREROLL lock, clear our unlock request */
4501       if (bclass->unlock_stop)
4502         bclass->unlock_stop (basesink);
4503
4504       /* we need preroll again and we set the flag before unlocking the clockid
4505        * because if the clockid is unlocked before a current buffer expired, we
4506        * can use that buffer to preroll with */
4507       basesink->need_preroll = TRUE;
4508
4509       if (basesink->clock_id) {
4510         gst_clock_id_unschedule (basesink->clock_id);
4511       }
4512
4513       /* if we don't have a preroll buffer we need to wait for a preroll and
4514        * return ASYNC. */
4515       if (!gst_base_sink_needs_preroll (basesink)) {
4516         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4517         basesink->playing_async = FALSE;
4518       } else {
4519         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4520           ret = GST_STATE_CHANGE_SUCCESS;
4521         } else {
4522           GST_DEBUG_OBJECT (basesink,
4523               "PLAYING to PAUSED, we are not prerolled");
4524           basesink->playing_async = TRUE;
4525           priv->commited = FALSE;
4526           priv->call_preroll = TRUE;
4527           if (priv->async_enabled) {
4528             GST_DEBUG_OBJECT (basesink, "doing async state change");
4529             ret = GST_STATE_CHANGE_ASYNC;
4530             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4531                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4532                     FALSE));
4533           }
4534         }
4535       }
4536       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4537           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4538
4539       gst_base_sink_reset_qos (basesink);
4540       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4541       break;
4542     case GST_STATE_CHANGE_PAUSED_TO_READY:
4543       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4544       /* start by reseting our position state with the object lock so that the
4545        * position query gets the right idea. We do this before we post the
4546        * messages so that the message handlers pick this up. */
4547       GST_OBJECT_LOCK (basesink);
4548       basesink->have_newsegment = FALSE;
4549       priv->current_sstart = -1;
4550       priv->current_sstop = -1;
4551       priv->have_latency = FALSE;
4552       GST_OBJECT_UNLOCK (basesink);
4553
4554       gst_base_sink_set_last_buffer (basesink, NULL);
4555       priv->call_preroll = FALSE;
4556
4557       if (!priv->commited) {
4558         if (priv->async_enabled) {
4559           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4560
4561           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4562               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4563                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4564
4565           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4566               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4567         }
4568         priv->commited = TRUE;
4569       } else {
4570         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4571       }
4572       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4573       break;
4574     case GST_STATE_CHANGE_READY_TO_NULL:
4575       if (bclass->stop) {
4576         if (!bclass->stop (basesink)) {
4577           GST_WARNING_OBJECT (basesink, "failed to stop");
4578         }
4579       }
4580       gst_base_sink_set_last_buffer (basesink, NULL);
4581       priv->call_preroll = FALSE;
4582       break;
4583     default:
4584       break;
4585   }
4586
4587   return ret;
4588
4589   /* ERRORS */
4590 start_failed:
4591   {
4592     GST_DEBUG_OBJECT (basesink, "failed to start");
4593     return GST_STATE_CHANGE_FAILURE;
4594   }
4595 activate_failed:
4596   {
4597     GST_DEBUG_OBJECT (basesink,
4598         "element failed to change states -- activation problem?");
4599     return GST_STATE_CHANGE_FAILURE;
4600   }
4601 }