basesink: Change awkward wording in a translateable message.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
156
157 typedef struct
158 {
159   gboolean valid;               /* if this info is valid */
160   guint32 seqnum;               /* the seqnum of the STEP event */
161   GstFormat format;             /* the format of the amount */
162   guint64 amount;               /* the total amount of data to skip */
163   guint64 position;             /* the position in the stepped data */
164   guint64 duration;             /* the duration in time of the skipped data */
165   guint64 start;                /* running_time of the start */
166   gdouble rate;                 /* rate of skipping */
167   gdouble start_rate;           /* rate before skipping */
168   gboolean flush;               /* if this was a flushing step */
169   gboolean intermediate;        /* if this is an intermediate step */
170   gboolean need_preroll;        /* if we need preroll after this step */
171 } GstStepInfo;
172
173 /* FIXME, some stuff in ABI.data and other in Private...
174  * Make up your mind please.
175  */
176 struct _GstBaseSinkPrivate
177 {
178   gint qos_enabled;             /* ATOMIC */
179   gboolean async_enabled;
180   GstClockTimeDiff ts_offset;
181   GstClockTime render_delay;
182
183   /* start, stop of current buffer, stream time, used to report position */
184   GstClockTime current_sstart;
185   GstClockTime current_sstop;
186
187   /* start, stop and jitter of current buffer, running time */
188   GstClockTime current_rstart;
189   GstClockTime current_rstop;
190   GstClockTimeDiff current_jitter;
191
192   /* EOS sync time in running time */
193   GstClockTime eos_rtime;
194
195   /* last buffer that arrived in time, running time */
196   GstClockTime last_in_time;
197   /* when the last buffer left the sink, running time */
198   GstClockTime last_left;
199
200   /* running averages go here these are done on running time */
201   GstClockTime avg_pt;
202   GstClockTime avg_duration;
203   gdouble avg_rate;
204
205   /* these are done on system time. avg_jitter and avg_render are
206    * compared to eachother to see if the rendering time takes a
207    * huge amount of the processing, If so we are flooded with
208    * buffers. */
209   GstClockTime last_left_systime;
210   GstClockTime avg_jitter;
211   GstClockTime start, stop;
212   GstClockTime avg_render;
213
214   /* number of rendered and dropped frames */
215   guint64 rendered;
216   guint64 dropped;
217
218   /* latency stuff */
219   GstClockTime latency;
220
221   /* if we already commited the state */
222   gboolean commited;
223
224   /* when we received EOS */
225   gboolean received_eos;
226
227   /* when we are prerolled and able to report latency */
228   gboolean have_latency;
229
230   /* the last buffer we prerolled or rendered. Useful for making snapshots */
231   GstBuffer *last_buffer;
232
233   /* caps for pull based scheduling */
234   GstCaps *pull_caps;
235
236   /* blocksize for pulling */
237   guint blocksize;
238
239   gboolean discont;
240
241   /* seqnum of the stream */
242   guint32 seqnum;
243
244   gboolean call_preroll;
245   gboolean step_unlock;
246
247   /* we have a pending and a current step operation */
248   GstStepInfo current_step;
249   GstStepInfo pending_step;
250 };
251
252 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
253
254 /* generic running average, this has a neutral window size */
255 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
256
257 /* the windows for these running averages are experimentally obtained.
258  * possitive values get averaged more while negative values use a small
259  * window so we can react faster to badness. */
260 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
261 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
262
263 /* BaseSink properties */
264
265 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
266 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
267
268 #define DEFAULT_PREROLL_QUEUE_LEN       0
269 #define DEFAULT_SYNC                    TRUE
270 #define DEFAULT_MAX_LATENESS            -1
271 #define DEFAULT_QOS                     FALSE
272 #define DEFAULT_ASYNC                   TRUE
273 #define DEFAULT_TS_OFFSET               0
274 #define DEFAULT_BLOCKSIZE               4096
275 #define DEFAULT_RENDER_DELAY            0
276
277 enum
278 {
279   PROP_0,
280   PROP_PREROLL_QUEUE_LEN,
281   PROP_SYNC,
282   PROP_MAX_LATENESS,
283   PROP_QOS,
284   PROP_ASYNC,
285   PROP_TS_OFFSET,
286   PROP_LAST_BUFFER,
287   PROP_BLOCKSIZE,
288   PROP_RENDER_DELAY,
289   PROP_LAST
290 };
291
292 static GstElementClass *parent_class = NULL;
293
294 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
295 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
296 static void gst_base_sink_finalize (GObject * object);
297
298 GType
299 gst_base_sink_get_type (void)
300 {
301   static volatile gsize base_sink_type = 0;
302
303   if (g_once_init_enter (&base_sink_type)) {
304     GType _type;
305     static const GTypeInfo base_sink_info = {
306       sizeof (GstBaseSinkClass),
307       NULL,
308       NULL,
309       (GClassInitFunc) gst_base_sink_class_init,
310       NULL,
311       NULL,
312       sizeof (GstBaseSink),
313       0,
314       (GInstanceInitFunc) gst_base_sink_init,
315     };
316
317     _type = g_type_register_static (GST_TYPE_ELEMENT,
318         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
319     g_once_init_leave (&base_sink_type, _type);
320   }
321   return base_sink_type;
322 }
323
324 static void gst_base_sink_set_property (GObject * object, guint prop_id,
325     const GValue * value, GParamSpec * pspec);
326 static void gst_base_sink_get_property (GObject * object, guint prop_id,
327     GValue * value, GParamSpec * pspec);
328
329 static gboolean gst_base_sink_send_event (GstElement * element,
330     GstEvent * event);
331 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
332
333 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
334 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
335 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
336     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
337 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
338     GstClockTime * start, GstClockTime * end);
339 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
340     GstPad * pad, gboolean flushing);
341 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
342     gboolean active);
343 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
344     GstSegment * segment);
345 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
346     GstEvent * event, GstSegment * segment);
347
348 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
349     GstStateChange transition);
350
351 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
352 static void gst_base_sink_loop (GstPad * pad);
353 static gboolean gst_base_sink_pad_activate (GstPad * pad);
354 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
355 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
356 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
357 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
358
359 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
360
361 /* check if an object was too late */
362 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
363     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
364     GstClockReturn status, GstClockTimeDiff jitter);
365 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
366     GstMiniObject * obj);
367
368 static void
369 gst_base_sink_class_init (GstBaseSinkClass * klass)
370 {
371   GObjectClass *gobject_class;
372   GstElementClass *gstelement_class;
373
374   gobject_class = G_OBJECT_CLASS (klass);
375   gstelement_class = GST_ELEMENT_CLASS (klass);
376
377   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
378       "basesink element");
379
380   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
381
382   parent_class = g_type_class_peek_parent (klass);
383
384   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
385   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
386   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
387
388   /* FIXME, this next value should be configured using an event from the
389    * upstream element, ie, the BUFFER_SIZE event. */
390   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
391       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
392           "Number of buffers to queue during preroll", 0, G_MAXUINT,
393           DEFAULT_PREROLL_QUEUE_LEN,
394           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
395
396   g_object_class_install_property (gobject_class, PROP_SYNC,
397       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399
400   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
401       g_param_spec_int64 ("max-lateness", "Max Lateness",
402           "Maximum number of nanoseconds that a buffer can be late before it "
403           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
404           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
405
406   g_object_class_install_property (gobject_class, PROP_QOS,
407       g_param_spec_boolean ("qos", "Qos",
408           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
409           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410   /**
411    * GstBaseSink:async
412    *
413    * If set to #TRUE, the basesink will perform asynchronous state changes.
414    * When set to #FALSE, the sink will not signal the parent when it prerolls.
415    * Use this option when dealing with sparse streams or when synchronisation is
416    * not required.
417    *
418    * Since: 0.10.15
419    */
420   g_object_class_install_property (gobject_class, PROP_ASYNC,
421       g_param_spec_boolean ("async", "Async",
422           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424   /**
425    * GstBaseSink:ts-offset
426    *
427    * Controls the final synchronisation, a negative value will render the buffer
428    * earlier while a positive value delays playback. This property can be 
429    * used to fix synchronisation in bad files.
430    *
431    * Since: 0.10.15
432    */
433   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
434       g_param_spec_int64 ("ts-offset", "TS Offset",
435           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
436           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437   /**
438    * GstBaseSink:last-buffer
439    *
440    * The last buffer that arrived in the sink and was used for preroll or for
441    * rendering. This property can be used to generate thumbnails. This property
442    * can be NULL when the sink has not yet received a bufer.
443    *
444    * Since: 0.10.15
445    */
446   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
447       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
448           "The last buffer received in the sink", GST_TYPE_BUFFER,
449           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
450   /**
451    * GstBaseSink:blocksize
452    *
453    * The amount of bytes to pull when operating in pull mode.
454    *
455    * Since: 0.10.22
456    */
457   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
458       g_param_spec_uint ("blocksize", "Block size",
459           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
460           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
461   /**
462    * GstBaseSink:render-delay
463    *
464    * The additional delay between synchronisation and actual rendering of the
465    * media. This property will add additional latency to the device in order to
466    * make other sinks compensate for the delay.
467    *
468    * Since: 0.10.22
469    */
470   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
471       g_param_spec_uint64 ("render-delay", "Render Delay",
472           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
473           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474
475   gstelement_class->change_state =
476       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
477   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
478   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
479
480   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
481   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
482   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
483   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
484   klass->activate_pull =
485       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
486 }
487
488 static GstCaps *
489 gst_base_sink_pad_getcaps (GstPad * pad)
490 {
491   GstBaseSinkClass *bclass;
492   GstBaseSink *bsink;
493   GstCaps *caps = NULL;
494
495   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
496   bclass = GST_BASE_SINK_GET_CLASS (bsink);
497
498   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
499     /* if we are operating in pull mode we only accept the negotiated caps */
500     GST_OBJECT_LOCK (pad);
501     if ((caps = GST_PAD_CAPS (pad)))
502       gst_caps_ref (caps);
503     GST_OBJECT_UNLOCK (pad);
504   }
505   if (caps == NULL) {
506     if (bclass->get_caps)
507       caps = bclass->get_caps (bsink);
508
509     if (caps == NULL) {
510       GstPadTemplate *pad_template;
511
512       pad_template =
513           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
514           "sink");
515       if (pad_template != NULL) {
516         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
517       }
518     }
519   }
520   gst_object_unref (bsink);
521
522   return caps;
523 }
524
525 static gboolean
526 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
527 {
528   GstBaseSinkClass *bclass;
529   GstBaseSink *bsink;
530   gboolean res = TRUE;
531
532   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
533   bclass = GST_BASE_SINK_GET_CLASS (bsink);
534
535   if (res && bclass->set_caps)
536     res = bclass->set_caps (bsink, caps);
537
538   gst_object_unref (bsink);
539
540   return res;
541 }
542
543 static void
544 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
545 {
546   GstBaseSinkClass *bclass;
547   GstBaseSink *bsink;
548
549   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
550   bclass = GST_BASE_SINK_GET_CLASS (bsink);
551
552   if (bclass->fixate)
553     bclass->fixate (bsink, caps);
554
555   gst_object_unref (bsink);
556 }
557
558 static GstFlowReturn
559 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
560     GstCaps * caps, GstBuffer ** buf)
561 {
562   GstBaseSinkClass *bclass;
563   GstBaseSink *bsink;
564   GstFlowReturn result = GST_FLOW_OK;
565
566   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
567   bclass = GST_BASE_SINK_GET_CLASS (bsink);
568
569   if (bclass->buffer_alloc)
570     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
571   else
572     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
573
574   gst_object_unref (bsink);
575
576   return result;
577 }
578
579 static void
580 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
581 {
582   GstPadTemplate *pad_template;
583   GstBaseSinkPrivate *priv;
584
585   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
586
587   pad_template =
588       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
589   g_return_if_fail (pad_template != NULL);
590
591   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
592
593   gst_pad_set_getcaps_function (basesink->sinkpad,
594       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
595   gst_pad_set_setcaps_function (basesink->sinkpad,
596       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
597   gst_pad_set_fixatecaps_function (basesink->sinkpad,
598       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
599   gst_pad_set_bufferalloc_function (basesink->sinkpad,
600       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
601   gst_pad_set_activate_function (basesink->sinkpad,
602       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
603   gst_pad_set_activatepush_function (basesink->sinkpad,
604       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
605   gst_pad_set_activatepull_function (basesink->sinkpad,
606       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
607   gst_pad_set_event_function (basesink->sinkpad,
608       GST_DEBUG_FUNCPTR (gst_base_sink_event));
609   gst_pad_set_chain_function (basesink->sinkpad,
610       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
611   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
612
613   basesink->pad_mode = GST_ACTIVATE_NONE;
614   basesink->preroll_queue = g_queue_new ();
615   basesink->abidata.ABI.clip_segment = gst_segment_new ();
616   priv->have_latency = FALSE;
617
618   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
619   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
620
621   basesink->sync = DEFAULT_SYNC;
622   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
623   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
624   priv->async_enabled = DEFAULT_ASYNC;
625   priv->ts_offset = DEFAULT_TS_OFFSET;
626   priv->render_delay = DEFAULT_RENDER_DELAY;
627   priv->blocksize = DEFAULT_BLOCKSIZE;
628
629   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
630 }
631
632 static void
633 gst_base_sink_finalize (GObject * object)
634 {
635   GstBaseSink *basesink;
636
637   basesink = GST_BASE_SINK (object);
638
639   g_queue_free (basesink->preroll_queue);
640   gst_segment_free (basesink->abidata.ABI.clip_segment);
641
642   G_OBJECT_CLASS (parent_class)->finalize (object);
643 }
644
645 /**
646  * gst_base_sink_set_sync:
647  * @sink: the sink
648  * @sync: the new sync value.
649  *
650  * Configures @sink to synchronize on the clock or not. When
651  * @sync is FALSE, incomming samples will be played as fast as
652  * possible. If @sync is TRUE, the timestamps of the incomming
653  * buffers will be used to schedule the exact render time of its
654  * contents.
655  *
656  * Since: 0.10.4
657  */
658 void
659 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
660 {
661   g_return_if_fail (GST_IS_BASE_SINK (sink));
662
663   GST_OBJECT_LOCK (sink);
664   sink->sync = sync;
665   GST_OBJECT_UNLOCK (sink);
666 }
667
668 /**
669  * gst_base_sink_get_sync:
670  * @sink: the sink
671  *
672  * Checks if @sink is currently configured to synchronize against the
673  * clock.
674  *
675  * Returns: TRUE if the sink is configured to synchronize against the clock.
676  *
677  * Since: 0.10.4
678  */
679 gboolean
680 gst_base_sink_get_sync (GstBaseSink * sink)
681 {
682   gboolean res;
683
684   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
685
686   GST_OBJECT_LOCK (sink);
687   res = sink->sync;
688   GST_OBJECT_UNLOCK (sink);
689
690   return res;
691 }
692
693 /**
694  * gst_base_sink_set_max_lateness:
695  * @sink: the sink
696  * @max_lateness: the new max lateness value.
697  *
698  * Sets the new max lateness value to @max_lateness. This value is
699  * used to decide if a buffer should be dropped or not based on the
700  * buffer timestamp and the current clock time. A value of -1 means
701  * an unlimited time.
702  *
703  * Since: 0.10.4
704  */
705 void
706 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
707 {
708   g_return_if_fail (GST_IS_BASE_SINK (sink));
709
710   GST_OBJECT_LOCK (sink);
711   sink->abidata.ABI.max_lateness = max_lateness;
712   GST_OBJECT_UNLOCK (sink);
713 }
714
715 /**
716  * gst_base_sink_get_max_lateness:
717  * @sink: the sink
718  *
719  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
720  * more details.
721  *
722  * Returns: The maximum time in nanoseconds that a buffer can be late
723  * before it is dropped and not rendered. A value of -1 means an
724  * unlimited time.
725  *
726  * Since: 0.10.4
727  */
728 gint64
729 gst_base_sink_get_max_lateness (GstBaseSink * sink)
730 {
731   gint64 res;
732
733   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
734
735   GST_OBJECT_LOCK (sink);
736   res = sink->abidata.ABI.max_lateness;
737   GST_OBJECT_UNLOCK (sink);
738
739   return res;
740 }
741
742 /**
743  * gst_base_sink_set_qos_enabled:
744  * @sink: the sink
745  * @enabled: the new qos value.
746  *
747  * Configures @sink to send Quality-of-Service events upstream.
748  *
749  * Since: 0.10.5
750  */
751 void
752 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
753 {
754   g_return_if_fail (GST_IS_BASE_SINK (sink));
755
756   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
757 }
758
759 /**
760  * gst_base_sink_is_qos_enabled:
761  * @sink: the sink
762  *
763  * Checks if @sink is currently configured to send Quality-of-Service events
764  * upstream.
765  *
766  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
767  *
768  * Since: 0.10.5
769  */
770 gboolean
771 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
772 {
773   gboolean res;
774
775   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
776
777   res = g_atomic_int_get (&sink->priv->qos_enabled);
778
779   return res;
780 }
781
782 /**
783  * gst_base_sink_set_async_enabled:
784  * @sink: the sink
785  * @enabled: the new async value.
786  *
787  * Configures @sink to perform all state changes asynchronusly. When async is
788  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
789  * preroll buffer. This feature is usefull if the sink does not synchronize
790  * against the clock or when it is dealing with sparse streams.
791  *
792  * Since: 0.10.15
793  */
794 void
795 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
796 {
797   g_return_if_fail (GST_IS_BASE_SINK (sink));
798
799   GST_PAD_PREROLL_LOCK (sink->sinkpad);
800   sink->priv->async_enabled = enabled;
801   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
802   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
803 }
804
805 /**
806  * gst_base_sink_is_async_enabled:
807  * @sink: the sink
808  *
809  * Checks if @sink is currently configured to perform asynchronous state
810  * changes to PAUSED.
811  *
812  * Returns: TRUE if the sink is configured to perform asynchronous state
813  * changes.
814  *
815  * Since: 0.10.15
816  */
817 gboolean
818 gst_base_sink_is_async_enabled (GstBaseSink * sink)
819 {
820   gboolean res;
821
822   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
823
824   GST_PAD_PREROLL_LOCK (sink->sinkpad);
825   res = sink->priv->async_enabled;
826   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
827
828   return res;
829 }
830
831 /**
832  * gst_base_sink_set_ts_offset:
833  * @sink: the sink
834  * @offset: the new offset
835  *
836  * Adjust the synchronisation of @sink with @offset. A negative value will
837  * render buffers earlier than their timestamp. A positive value will delay
838  * rendering. This function can be used to fix playback of badly timestamped
839  * buffers.
840  *
841  * Since: 0.10.15
842  */
843 void
844 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
845 {
846   g_return_if_fail (GST_IS_BASE_SINK (sink));
847
848   GST_OBJECT_LOCK (sink);
849   sink->priv->ts_offset = offset;
850   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
851   GST_OBJECT_UNLOCK (sink);
852 }
853
854 /**
855  * gst_base_sink_get_ts_offset:
856  * @sink: the sink
857  *
858  * Get the synchronisation offset of @sink.
859  *
860  * Returns: The synchronisation offset.
861  *
862  * Since: 0.10.15
863  */
864 GstClockTimeDiff
865 gst_base_sink_get_ts_offset (GstBaseSink * sink)
866 {
867   GstClockTimeDiff res;
868
869   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
870
871   GST_OBJECT_LOCK (sink);
872   res = sink->priv->ts_offset;
873   GST_OBJECT_UNLOCK (sink);
874
875   return res;
876 }
877
878 /**
879  * gst_base_sink_get_last_buffer:
880  * @sink: the sink
881  *
882  * Get the last buffer that arrived in the sink and was used for preroll or for
883  * rendering. This property can be used to generate thumbnails.
884  *
885  * The #GstCaps on the buffer can be used to determine the type of the buffer.
886  * 
887  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
888  * NULL when no buffer has arrived in the sink yet or when the sink is not in
889  * PAUSED or PLAYING.
890  *
891  * Since: 0.10.15
892  */
893 GstBuffer *
894 gst_base_sink_get_last_buffer (GstBaseSink * sink)
895 {
896   GstBuffer *res;
897
898   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
899
900   GST_OBJECT_LOCK (sink);
901   if ((res = sink->priv->last_buffer))
902     gst_buffer_ref (res);
903   GST_OBJECT_UNLOCK (sink);
904
905   return res;
906 }
907
908 static void
909 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
910 {
911   GstBuffer *old;
912
913   GST_OBJECT_LOCK (sink);
914   old = sink->priv->last_buffer;
915   if (G_LIKELY (old != buffer)) {
916     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
917     if (G_LIKELY (buffer))
918       gst_buffer_ref (buffer);
919     sink->priv->last_buffer = buffer;
920   } else {
921     old = NULL;
922   }
923   GST_OBJECT_UNLOCK (sink);
924
925   /* avoid unreffing with the lock because cleanup code might want to take the
926    * lock too */
927   if (G_LIKELY (old))
928     gst_buffer_unref (old);
929 }
930
931 /**
932  * gst_base_sink_get_latency:
933  * @sink: the sink
934  *
935  * Get the currently configured latency.
936  *
937  * Returns: The configured latency.
938  *
939  * Since: 0.10.12
940  */
941 GstClockTime
942 gst_base_sink_get_latency (GstBaseSink * sink)
943 {
944   GstClockTime res;
945
946   GST_OBJECT_LOCK (sink);
947   res = sink->priv->latency;
948   GST_OBJECT_UNLOCK (sink);
949
950   return res;
951 }
952
953 /**
954  * gst_base_sink_query_latency:
955  * @sink: the sink
956  * @live: if the sink is live
957  * @upstream_live: if an upstream element is live
958  * @min_latency: the min latency of the upstream elements
959  * @max_latency: the max latency of the upstream elements
960  *
961  * Query the sink for the latency parameters. The latency will be queried from
962  * the upstream elements. @live will be TRUE if @sink is configured to
963  * synchronize against the clock. @upstream_live will be TRUE if an upstream
964  * element is live. 
965  *
966  * If both @live and @upstream_live are TRUE, the sink will want to compensate
967  * for the latency introduced by the upstream elements by setting the
968  * @min_latency to a strictly possitive value.
969  *
970  * This function is mostly used by subclasses. 
971  *
972  * Returns: TRUE if the query succeeded.
973  *
974  * Since: 0.10.12
975  */
976 gboolean
977 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
978     gboolean * upstream_live, GstClockTime * min_latency,
979     GstClockTime * max_latency)
980 {
981   gboolean l, us_live, res, have_latency;
982   GstClockTime min, max, render_delay;
983   GstQuery *query;
984   GstClockTime us_min, us_max;
985
986   /* we are live when we sync to the clock */
987   GST_OBJECT_LOCK (sink);
988   l = sink->sync;
989   have_latency = sink->priv->have_latency;
990   render_delay = sink->priv->render_delay;
991   GST_OBJECT_UNLOCK (sink);
992
993   /* assume no latency */
994   min = 0;
995   max = -1;
996   us_live = FALSE;
997
998   if (have_latency) {
999     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1000     /* we are ready for a latency query this is when we preroll or when we are
1001      * not async. */
1002     query = gst_query_new_latency ();
1003
1004     /* ask the peer for the latency */
1005     if ((res = gst_base_sink_peer_query (sink, query))) {
1006       /* get upstream min and max latency */
1007       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1008
1009       if (us_live) {
1010         /* upstream live, use its latency, subclasses should use these
1011          * values to create the complete latency. */
1012         min = us_min;
1013         max = us_max;
1014       }
1015       if (l) {
1016         /* we need to add the render delay if we are live */
1017         if (min != -1)
1018           min += render_delay;
1019         if (max != -1)
1020           max += render_delay;
1021       }
1022     }
1023     gst_query_unref (query);
1024   } else {
1025     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1026     res = FALSE;
1027   }
1028
1029   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1030   if (!res) {
1031     if (!l) {
1032       res = TRUE;
1033       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1034     } else {
1035       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1036     }
1037   }
1038
1039   if (res) {
1040     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1041         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1042         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1043
1044     if (live)
1045       *live = l;
1046     if (upstream_live)
1047       *upstream_live = us_live;
1048     if (min_latency)
1049       *min_latency = min;
1050     if (max_latency)
1051       *max_latency = max;
1052   }
1053   return res;
1054 }
1055
1056 /**
1057  * gst_base_sink_set_render_delay:
1058  * @sink: a #GstBaseSink
1059  * @delay: the new delay
1060  *
1061  * Set the render delay in @sink to @delay. The render delay is the time 
1062  * between actual rendering of a buffer and its synchronisation time. Some
1063  * devices might delay media rendering which can be compensated for with this
1064  * function. 
1065  *
1066  * After calling this function, this sink will report additional latency and
1067  * other sinks will adjust their latency to delay the rendering of their media.
1068  *
1069  * This function is usually called by subclasses.
1070  *
1071  * Since: 0.10.21
1072  */
1073 void
1074 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1075 {
1076   GstClockTime old_render_delay;
1077
1078   g_return_if_fail (GST_IS_BASE_SINK (sink));
1079
1080   GST_OBJECT_LOCK (sink);
1081   old_render_delay = sink->priv->render_delay;
1082   sink->priv->render_delay = delay;
1083   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1084       GST_TIME_ARGS (delay));
1085   GST_OBJECT_UNLOCK (sink);
1086
1087   if (delay != old_render_delay) {
1088     GST_DEBUG_OBJECT (sink, "posting latency changed");
1089     gst_element_post_message (GST_ELEMENT_CAST (sink),
1090         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1091   }
1092 }
1093
1094 /**
1095  * gst_base_sink_get_render_delay:
1096  * @sink: a #GstBaseSink
1097  *
1098  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1099  * information about the render delay.
1100  *
1101  * Returns: the render delay of @sink.
1102  *
1103  * Since: 0.10.21
1104  */
1105 GstClockTime
1106 gst_base_sink_get_render_delay (GstBaseSink * sink)
1107 {
1108   GstClockTimeDiff res;
1109
1110   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1111
1112   GST_OBJECT_LOCK (sink);
1113   res = sink->priv->render_delay;
1114   GST_OBJECT_UNLOCK (sink);
1115
1116   return res;
1117 }
1118
1119 /**
1120  * gst_base_sink_set_blocksize:
1121  * @sink: a #GstBaseSink
1122  * @blocksize: the blocksize in bytes
1123  *
1124  * Set the number of bytes that the sink will pull when it is operating in pull
1125  * mode.
1126  *
1127  * Since: 0.10.22
1128  */
1129 void
1130 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1131 {
1132   g_return_if_fail (GST_IS_BASE_SINK (sink));
1133
1134   GST_OBJECT_LOCK (sink);
1135   sink->priv->blocksize = blocksize;
1136   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1137   GST_OBJECT_UNLOCK (sink);
1138 }
1139
1140 /**
1141  * gst_base_sink_get_blocksize:
1142  * @sink: a #GstBaseSink
1143  *
1144  * Get the number of bytes that the sink will pull when it is operating in pull
1145  * mode.
1146  *
1147  * Returns: the number of bytes @sink will pull in pull mode.
1148  *
1149  * Since: 0.10.22
1150  */
1151 guint
1152 gst_base_sink_get_blocksize (GstBaseSink * sink)
1153 {
1154   guint res;
1155
1156   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1157
1158   GST_OBJECT_LOCK (sink);
1159   res = sink->priv->blocksize;
1160   GST_OBJECT_UNLOCK (sink);
1161
1162   return res;
1163 }
1164
1165 static void
1166 gst_base_sink_set_property (GObject * object, guint prop_id,
1167     const GValue * value, GParamSpec * pspec)
1168 {
1169   GstBaseSink *sink = GST_BASE_SINK (object);
1170
1171   switch (prop_id) {
1172     case PROP_PREROLL_QUEUE_LEN:
1173       /* preroll lock necessary to serialize with finish_preroll */
1174       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1175       sink->preroll_queue_max_len = g_value_get_uint (value);
1176       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1177       break;
1178     case PROP_SYNC:
1179       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1180       break;
1181     case PROP_MAX_LATENESS:
1182       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1183       break;
1184     case PROP_QOS:
1185       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1186       break;
1187     case PROP_ASYNC:
1188       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1189       break;
1190     case PROP_TS_OFFSET:
1191       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1192       break;
1193     case PROP_BLOCKSIZE:
1194       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1195       break;
1196     case PROP_RENDER_DELAY:
1197       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1198       break;
1199     default:
1200       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1201       break;
1202   }
1203 }
1204
1205 static void
1206 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1207     GParamSpec * pspec)
1208 {
1209   GstBaseSink *sink = GST_BASE_SINK (object);
1210
1211   switch (prop_id) {
1212     case PROP_PREROLL_QUEUE_LEN:
1213       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1214       g_value_set_uint (value, sink->preroll_queue_max_len);
1215       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1216       break;
1217     case PROP_SYNC:
1218       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1219       break;
1220     case PROP_MAX_LATENESS:
1221       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1222       break;
1223     case PROP_QOS:
1224       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1225       break;
1226     case PROP_ASYNC:
1227       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1228       break;
1229     case PROP_TS_OFFSET:
1230       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1231       break;
1232     case PROP_LAST_BUFFER:
1233       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1234       break;
1235     case PROP_BLOCKSIZE:
1236       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1237       break;
1238     case PROP_RENDER_DELAY:
1239       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1240       break;
1241     default:
1242       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1243       break;
1244   }
1245 }
1246
1247
1248 static GstCaps *
1249 gst_base_sink_get_caps (GstBaseSink * sink)
1250 {
1251   return NULL;
1252 }
1253
1254 static gboolean
1255 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1256 {
1257   return TRUE;
1258 }
1259
1260 static GstFlowReturn
1261 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1262     GstCaps * caps, GstBuffer ** buf)
1263 {
1264   *buf = NULL;
1265   return GST_FLOW_OK;
1266 }
1267
1268 /* with PREROLL_LOCK, STREAM_LOCK */
1269 static void
1270 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1271 {
1272   GstMiniObject *obj;
1273
1274   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1275   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1276     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1277     gst_mini_object_unref (obj);
1278   }
1279   /* we can't have EOS anymore now */
1280   basesink->eos = FALSE;
1281   basesink->priv->received_eos = FALSE;
1282   basesink->have_preroll = FALSE;
1283   basesink->priv->step_unlock = FALSE;
1284   basesink->eos_queued = FALSE;
1285   basesink->preroll_queued = 0;
1286   basesink->buffers_queued = 0;
1287   basesink->events_queued = 0;
1288   /* can't report latency anymore until we preroll again */
1289   if (basesink->priv->async_enabled) {
1290     GST_OBJECT_LOCK (basesink);
1291     basesink->priv->have_latency = FALSE;
1292     GST_OBJECT_UNLOCK (basesink);
1293   }
1294   /* and signal any waiters now */
1295   GST_PAD_PREROLL_SIGNAL (pad);
1296 }
1297
1298 /* with STREAM_LOCK, configures given segment with the event information. */
1299 static void
1300 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1301     GstEvent * event, GstSegment * segment)
1302 {
1303   gboolean update;
1304   gdouble rate, arate;
1305   GstFormat format;
1306   gint64 start;
1307   gint64 stop;
1308   gint64 time;
1309
1310   /* the newsegment event is needed to bring the buffer timestamps to the
1311    * stream time and to drop samples outside of the playback segment. */
1312   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1313       &start, &stop, &time);
1314
1315   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1316    * We protect with the OBJECT_LOCK so that we can use the values to
1317    * safely answer a POSITION query. */
1318   GST_OBJECT_LOCK (basesink);
1319   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1320       stop, time);
1321
1322   if (format == GST_FORMAT_TIME) {
1323     GST_DEBUG_OBJECT (basesink,
1324         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1325         "format GST_FORMAT_TIME, "
1326         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1327         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1328         update, rate, arate, GST_TIME_ARGS (segment->start),
1329         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1330         GST_TIME_ARGS (segment->accum));
1331   } else {
1332     GST_DEBUG_OBJECT (basesink,
1333         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1334         "format %d, "
1335         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1336         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1337         segment->format, segment->start, segment->stop, segment->time,
1338         segment->accum);
1339   }
1340   GST_OBJECT_UNLOCK (basesink);
1341 }
1342
1343 /* with PREROLL_LOCK, STREAM_LOCK */
1344 static gboolean
1345 gst_base_sink_commit_state (GstBaseSink * basesink)
1346 {
1347   /* commit state and proceed to next pending state */
1348   GstState current, next, pending, post_pending;
1349   gboolean post_paused = FALSE;
1350   gboolean post_async_done = FALSE;
1351   gboolean post_playing = FALSE;
1352
1353   /* we are certainly not playing async anymore now */
1354   basesink->playing_async = FALSE;
1355
1356   GST_OBJECT_LOCK (basesink);
1357   current = GST_STATE (basesink);
1358   next = GST_STATE_NEXT (basesink);
1359   pending = GST_STATE_PENDING (basesink);
1360   post_pending = pending;
1361
1362   switch (pending) {
1363     case GST_STATE_PLAYING:
1364     {
1365       GstBaseSinkClass *bclass;
1366       GstStateChangeReturn ret;
1367
1368       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1369
1370       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1371
1372       basesink->need_preroll = FALSE;
1373       post_async_done = TRUE;
1374       basesink->priv->commited = TRUE;
1375       post_playing = TRUE;
1376       /* post PAUSED too when we were READY */
1377       if (current == GST_STATE_READY) {
1378         post_paused = TRUE;
1379       }
1380
1381       /* make sure we notify the subclass of async playing */
1382       if (bclass->async_play) {
1383         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1384         ret = bclass->async_play (basesink);
1385         if (ret == GST_STATE_CHANGE_FAILURE)
1386           goto async_failed;
1387       }
1388       break;
1389     }
1390     case GST_STATE_PAUSED:
1391       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1392       post_paused = TRUE;
1393       post_async_done = TRUE;
1394       basesink->priv->commited = TRUE;
1395       post_pending = GST_STATE_VOID_PENDING;
1396       break;
1397     case GST_STATE_READY:
1398     case GST_STATE_NULL:
1399       goto stopping;
1400     case GST_STATE_VOID_PENDING:
1401       goto nothing_pending;
1402     default:
1403       break;
1404   }
1405
1406   /* we can report latency queries now */
1407   basesink->priv->have_latency = TRUE;
1408
1409   GST_STATE (basesink) = pending;
1410   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1411   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1412   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1413   GST_OBJECT_UNLOCK (basesink);
1414
1415   if (post_paused) {
1416     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1417     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1418         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1419             current, next, post_pending));
1420   }
1421   if (post_async_done) {
1422     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1423     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1424         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1425   }
1426   if (post_playing) {
1427     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1428     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1429         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1430             next, pending, GST_STATE_VOID_PENDING));
1431   }
1432
1433   GST_STATE_BROADCAST (basesink);
1434
1435   return TRUE;
1436
1437 nothing_pending:
1438   {
1439     /* Depending on the state, set our vars. We get in this situation when the
1440      * state change function got a change to update the state vars before the
1441      * streaming thread did. This is fine but we need to make sure that we
1442      * update the need_preroll var since it was TRUE when we got here and might
1443      * become FALSE if we got to PLAYING. */
1444     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1445         gst_element_state_get_name (current));
1446     switch (current) {
1447       case GST_STATE_PLAYING:
1448         basesink->need_preroll = FALSE;
1449         break;
1450       case GST_STATE_PAUSED:
1451         basesink->need_preroll = TRUE;
1452         break;
1453       default:
1454         basesink->need_preroll = FALSE;
1455         basesink->flushing = TRUE;
1456         break;
1457     }
1458     /* we can report latency queries now */
1459     basesink->priv->have_latency = TRUE;
1460     GST_OBJECT_UNLOCK (basesink);
1461     return TRUE;
1462   }
1463 stopping:
1464   {
1465     /* app is going to READY */
1466     GST_DEBUG_OBJECT (basesink, "stopping");
1467     basesink->need_preroll = FALSE;
1468     basesink->flushing = TRUE;
1469     GST_OBJECT_UNLOCK (basesink);
1470     return FALSE;
1471   }
1472 async_failed:
1473   {
1474     GST_DEBUG_OBJECT (basesink, "async commit failed");
1475     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1476     GST_OBJECT_UNLOCK (basesink);
1477     return FALSE;
1478   }
1479 }
1480
1481 static void
1482 start_stepping (GstBaseSink * sink, GstSegment * segment,
1483     GstStepInfo * pending, GstStepInfo * current)
1484 {
1485   GST_DEBUG_OBJECT (sink, "update pending step");
1486   memcpy (current, pending, sizeof (GstStepInfo));
1487   pending->valid = FALSE;
1488
1489   /* get the running time of where we paused and remember it */
1490   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1491   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1492
1493   /* set the new rate for the remainder of the segment */
1494   current->start_rate = segment->rate;
1495   segment->rate *= current->rate;
1496   segment->abs_rate = ABS (segment->rate);
1497
1498   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1499       GST_TIME_ARGS (current->start));
1500
1501   if (current->amount == -1) {
1502     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1503     current->valid = FALSE;
1504   } else {
1505     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1506         "rate: %f", current->amount, gst_format_get_name (current->format),
1507         current->rate);
1508   }
1509 }
1510
1511 static void
1512 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1513     GstStepInfo * current, guint64 cstart, guint64 cstop, gint64 * rstart,
1514     gint64 * rstop)
1515 {
1516   gint64 stop, position;
1517   GstMessage *message;
1518
1519   GST_DEBUG_OBJECT (sink, "step complete");
1520
1521   if (segment->rate > 0.0)
1522     stop = *rstart;
1523   else
1524     stop = *rstop;
1525
1526   GST_DEBUG_OBJECT (sink,
1527       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1528
1529   if (stop == -1)
1530     current->duration = current->position;
1531   else
1532     current->duration = stop - current->start;
1533
1534   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1535       GST_TIME_ARGS (current->duration));
1536
1537   /* now move the segment to the new running time */
1538   position = current->start + current->duration;
1539   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1540   gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1541
1542   /* restore the previous rate */
1543   segment->rate = current->start_rate;
1544   segment->abs_rate = ABS (segment->rate);
1545
1546   if (current->flush) {
1547     /* and remove the accumulated time we flushed */
1548     segment->accum = current->start;
1549   }
1550
1551   /* the clip segment is used for position report in paused... */
1552   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1553
1554   /* post the step done when we know the stepped duration in TIME */
1555   message =
1556       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1557       current->amount, current->rate, current->flush, current->intermediate,
1558       current->duration);
1559   gst_message_set_seqnum (message, current->seqnum);
1560   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1561
1562   if (!current->intermediate)
1563     sink->need_preroll = current->need_preroll;
1564
1565   /* and the current step info finished and becomes invalid */
1566   current->valid = FALSE;
1567 }
1568
1569 static gboolean
1570 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1571     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1572     gint64 * rstop)
1573 {
1574   GstBaseSinkPrivate *priv;
1575   gboolean step_end = FALSE;
1576
1577   priv = sink->priv;
1578
1579   /* see if we need to skip this buffer because of stepping */
1580   switch (current->format) {
1581     case GST_FORMAT_TIME:
1582     {
1583       guint64 end;
1584       gint64 first, last;
1585
1586       if (segment->rate > 0.0) {
1587         first = *rstart;
1588         last = *rstop;
1589       } else {
1590         first = *rstop;
1591         last = *rstart;
1592       }
1593
1594       end = current->start + current->amount;
1595       current->position = first - current->start;
1596
1597       if (G_UNLIKELY (segment->abs_rate != 1.0))
1598         current->position /= segment->abs_rate;
1599
1600       GST_DEBUG_OBJECT (sink,
1601           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1602           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1603       GST_DEBUG_OBJECT (sink,
1604           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1605           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1606           GST_TIME_ARGS (last - current->start),
1607           GST_TIME_ARGS (current->amount));
1608
1609       if (current->position >= current->amount || last >= end) {
1610         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1611         step_end = TRUE;
1612         if (segment->rate > 0.0) {
1613           *rstart = end;
1614           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1615         } else {
1616           *rstop = end;
1617           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1618         }
1619       }
1620       GST_DEBUG_OBJECT (sink,
1621           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1622           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1623       GST_DEBUG_OBJECT (sink,
1624           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1625           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1626       break;
1627     }
1628     case GST_FORMAT_BUFFERS:
1629       GST_DEBUG_OBJECT (sink,
1630           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1631           current->position, current->amount);
1632
1633       if (current->position < current->amount) {
1634         current->position++;
1635       } else {
1636         step_end = TRUE;
1637       }
1638       break;
1639     case GST_FORMAT_DEFAULT:
1640     default:
1641       GST_DEBUG_OBJECT (sink,
1642           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1643           current->position, current->amount);
1644       break;
1645   }
1646   return step_end;
1647 }
1648
1649 /* with STREAM_LOCK, PREROLL_LOCK
1650  *
1651  * Returns TRUE if the object needs synchronisation and takes therefore
1652  * part in prerolling.
1653  *
1654  * rsstart/rsstop contain the start/stop in stream time.
1655  * rrstart/rrstop contain the start/stop in running time.
1656  */
1657 static gboolean
1658 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1659     GstClockTime * rsstart, GstClockTime * rsstop,
1660     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1661     gboolean * stepped, GstSegment * segment, GstStepInfo * step)
1662 {
1663   GstBaseSinkClass *bclass;
1664   GstBuffer *buffer;
1665   GstClockTime start, stop;     /* raw start/stop timestamps */
1666   gint64 cstart, cstop;         /* clipped raw timestamps */
1667   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1668   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1669   GstFormat format;
1670   GstBaseSinkPrivate *priv;
1671   gboolean step_end;
1672
1673   priv = basesink->priv;
1674
1675   /* start with nothing */
1676   start = stop = -1;
1677
1678   step_end = FALSE;
1679
1680   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1681     GstEvent *event = GST_EVENT_CAST (obj);
1682
1683     switch (GST_EVENT_TYPE (event)) {
1684         /* EOS event needs syncing */
1685       case GST_EVENT_EOS:
1686       {
1687         if (basesink->segment.rate >= 0.0) {
1688           sstart = sstop = priv->current_sstop;
1689           if (sstart == -1) {
1690             /* we have not seen a buffer yet, use the segment values */
1691             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1692                 basesink->segment.format, basesink->segment.stop);
1693           }
1694         } else {
1695           sstart = sstop = priv->current_sstart;
1696           if (sstart == -1) {
1697             /* we have not seen a buffer yet, use the segment values */
1698             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1699                 basesink->segment.format, basesink->segment.start);
1700           }
1701         }
1702
1703         rstart = rstop = priv->eos_rtime;
1704         *do_sync = rstart != -1;
1705         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1706             GST_TIME_ARGS (rstart));
1707         /* if we are stepping, we end now */
1708         step_end = step->valid;
1709         goto eos_done;
1710       }
1711       default:
1712         /* other events do not need syncing */
1713         /* FIXME, maybe NEWSEGMENT might need synchronisation
1714          * since the POSITION query depends on accumulated times and
1715          * we cannot accumulate the current segment before the previous
1716          * one completed.
1717          */
1718         return FALSE;
1719     }
1720   }
1721
1722   /* else do buffer sync code */
1723   buffer = GST_BUFFER_CAST (obj);
1724
1725   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1726
1727   /* just get the times to see if we need syncing, if the start returns -1 we
1728    * don't sync. */
1729   if (bclass->get_times)
1730     bclass->get_times (basesink, buffer, &start, &stop);
1731
1732   if (start == -1) {
1733     /* we don't need to sync but we still want to get the timestamps for
1734      * tracking the position */
1735     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1736     *do_sync = FALSE;
1737   } else {
1738     *do_sync = TRUE;
1739   }
1740
1741   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1742       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1743       GST_TIME_ARGS (stop), *do_sync);
1744
1745   /* collect segment and format for code clarity */
1746   format = segment->format;
1747
1748   /* no timestamp clipping if we did not get a TIME segment format */
1749   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1750     cstart = start;
1751     cstop = stop;
1752     /* do running and stream time in TIME format */
1753     format = GST_FORMAT_TIME;
1754     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1755     goto do_times;
1756   }
1757
1758   /* clip, only when we know about time */
1759   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1760               (gint64) start, (gint64) stop, &cstart, &cstop)))
1761     goto out_of_segment;
1762
1763   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1764     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1765         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1766         GST_TIME_ARGS (cstop));
1767   }
1768
1769   /* set last stop position */
1770   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1771     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1772   else
1773     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1774
1775 do_times:
1776   rstart = gst_segment_to_running_time (segment, format, cstart);
1777   rstop = gst_segment_to_running_time (segment, format, cstop);
1778
1779   if (G_UNLIKELY (step->valid)) {
1780     if (!(step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1781                 &rstart, &rstop))) {
1782       /* step is still busy, we discard data when we are flushing */
1783       *stepped = step->flush;
1784     }
1785   }
1786   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1787    * upstream is behaving very badly */
1788   sstart = gst_segment_to_stream_time (segment, format, cstart);
1789   sstop = gst_segment_to_stream_time (segment, format, cstop);
1790
1791 eos_done:
1792   /* done label only called when doing EOS, we also stop stepping then */
1793   if (step_end)
1794     stop_stepping (basesink, segment, step, cstart, cstop, &rstart, &rstop);
1795
1796   /* save times */
1797   *rsstart = sstart;
1798   *rsstop = sstop;
1799   *rrstart = rstart;
1800   *rrstop = rstop;
1801
1802   /* buffers and EOS always need syncing and preroll */
1803   return TRUE;
1804
1805   /* special cases */
1806 out_of_segment:
1807   {
1808     /* we usually clip in the chain function already but stepping could cause
1809      * the segment to be updated later. we return FALSE so that we don't try
1810      * to sync on it. */
1811     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1812     return FALSE;
1813   }
1814 }
1815
1816 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1817  * adjust a timestamp with the latency and timestamp offset */
1818 static GstClockTime
1819 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1820 {
1821   GstClockTimeDiff ts_offset;
1822
1823   /* don't do anything funny with invalid timestamps */
1824   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1825     return time;
1826
1827   time += basesink->priv->latency;
1828
1829   /* apply offset, be carefull for underflows */
1830   ts_offset = basesink->priv->ts_offset;
1831   if (ts_offset < 0) {
1832     ts_offset = -ts_offset;
1833     if (ts_offset < time)
1834       time -= ts_offset;
1835     else
1836       time = 0;
1837   } else
1838     time += ts_offset;
1839
1840   return time;
1841 }
1842
1843 /**
1844  * gst_base_sink_wait_clock:
1845  * @sink: the sink
1846  * @time: the running_time to be reached
1847  * @jitter: the jitter to be filled with time diff (can be NULL)
1848  *
1849  * This function will block until @time is reached. It is usually called by
1850  * subclasses that use their own internal synchronisation.
1851  *
1852  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1853  * returned. Likewise, if synchronisation is disabled in the element or there
1854  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1855  *
1856  * This function should only be called with the PREROLL_LOCK held, like when
1857  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1858  * the ::render vmethod.
1859  *
1860  * The @time argument should be the running_time of when this method should
1861  * return and is not adjusted with any latency or offset configured in the
1862  * sink.
1863  *
1864  * Since 0.10.20
1865  *
1866  * Returns: #GstClockReturn
1867  */
1868 GstClockReturn
1869 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1870     GstClockTimeDiff * jitter)
1871 {
1872   GstClockID id;
1873   GstClockReturn ret;
1874   GstClock *clock;
1875
1876   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1877     goto invalid_time;
1878
1879   GST_OBJECT_LOCK (sink);
1880   if (G_UNLIKELY (!sink->sync))
1881     goto no_sync;
1882
1883   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1884     goto no_clock;
1885
1886   /* add base_time to running_time to get the time against the clock */
1887   time += GST_ELEMENT_CAST (sink)->base_time;
1888
1889   id = gst_clock_new_single_shot_id (clock, time);
1890   GST_OBJECT_UNLOCK (sink);
1891
1892   /* A blocking wait is performed on the clock. We save the ClockID
1893    * so we can unlock the entry at any time. While we are blocking, we 
1894    * release the PREROLL_LOCK so that other threads can interrupt the
1895    * entry. */
1896   sink->clock_id = id;
1897   /* release the preroll lock while waiting */
1898   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1899
1900   ret = gst_clock_id_wait (id, jitter);
1901
1902   GST_PAD_PREROLL_LOCK (sink->sinkpad);
1903   gst_clock_id_unref (id);
1904   sink->clock_id = NULL;
1905
1906   return ret;
1907
1908   /* no syncing needed */
1909 invalid_time:
1910   {
1911     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
1912     return GST_CLOCK_BADTIME;
1913   }
1914 no_sync:
1915   {
1916     GST_DEBUG_OBJECT (sink, "sync disabled");
1917     GST_OBJECT_UNLOCK (sink);
1918     return GST_CLOCK_BADTIME;
1919   }
1920 no_clock:
1921   {
1922     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
1923     GST_OBJECT_UNLOCK (sink);
1924     return GST_CLOCK_BADTIME;
1925   }
1926 }
1927
1928 /**
1929  * gst_base_sink_wait_preroll:
1930  * @sink: the sink
1931  *
1932  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1933  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1934  * this method before continuing to render the remaining data.
1935  *
1936  * This function will block until a state change to PLAYING happens (in which
1937  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1938  * to a state change to READY or a FLUSH event (in which case this function
1939  * returns #GST_FLOW_WRONG_STATE).
1940  *
1941  * This function should only be called with the PREROLL_LOCK held, like in the
1942  * render function.
1943  *
1944  * Since: 0.10.11
1945  *
1946  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1947  * continue. Any other return value should be returned from the render vmethod.
1948  */
1949 GstFlowReturn
1950 gst_base_sink_wait_preroll (GstBaseSink * sink)
1951 {
1952   sink->have_preroll = TRUE;
1953   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1954   /* block until the state changes, or we get a flush, or something */
1955   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1956   sink->have_preroll = FALSE;
1957   if (G_UNLIKELY (sink->flushing))
1958     goto stopping;
1959   if (G_UNLIKELY (sink->priv->step_unlock))
1960     goto step_unlocked;
1961   GST_DEBUG_OBJECT (sink, "continue after preroll");
1962
1963   return GST_FLOW_OK;
1964
1965   /* ERRORS */
1966 stopping:
1967   {
1968     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
1969     return GST_FLOW_WRONG_STATE;
1970   }
1971 step_unlocked:
1972   {
1973     sink->priv->step_unlock = FALSE;
1974     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
1975     return GST_FLOW_STEP;
1976   }
1977 }
1978
1979 /**
1980  * gst_base_sink_do_preroll:
1981  * @sink: the sink
1982  * @obj: the object that caused the preroll
1983  *
1984  * If the @sink spawns its own thread for pulling buffers from upstream it
1985  * should call this method after it has pulled a buffer. If the element needed
1986  * to preroll, this function will perform the preroll and will then block
1987  * until the element state is changed.
1988  *
1989  * This function should be called with the PREROLL_LOCK held.
1990  *
1991  * Since 0.10.22
1992  *
1993  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1994  * continue. Any other return value should be returned from the render vmethod.
1995  */
1996 GstFlowReturn
1997 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
1998 {
1999   GstFlowReturn ret;
2000
2001   while (G_UNLIKELY (sink->need_preroll)) {
2002     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2003
2004     ret = gst_base_sink_preroll_object (sink, obj);
2005     if (ret != GST_FLOW_OK)
2006       goto preroll_failed;
2007
2008     /* need to recheck here because the commit state could have
2009      * made us not need the preroll anymore */
2010     if (G_LIKELY (sink->need_preroll)) {
2011       /* block until the state changes, or we get a flush, or something */
2012       ret = gst_base_sink_wait_preroll (sink);
2013       if (ret != GST_FLOW_OK) {
2014         if (ret == GST_FLOW_STEP)
2015           ret = GST_FLOW_OK;
2016         else
2017           goto preroll_failed;
2018       }
2019     }
2020   }
2021   return GST_FLOW_OK;
2022
2023   /* ERRORS */
2024 preroll_failed:
2025   {
2026     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2027     return ret;
2028   }
2029 }
2030
2031 /**
2032  * gst_base_sink_wait_eos:
2033  * @sink: the sink
2034  * @time: the running_time to be reached
2035  * @jitter: the jitter to be filled with time diff (can be NULL)
2036  *
2037  * This function will block until @time is reached. It is usually called by
2038  * subclasses that use their own internal synchronisation but want to let the
2039  * EOS be handled by the base class.
2040  *
2041  * This function should only be called with the PREROLL_LOCK held, like when
2042  * receiving an EOS event in the ::event vmethod.
2043  *
2044  * The @time argument should be the running_time of when the EOS should happen
2045  * and will be adjusted with any latency and offset configured in the sink.
2046  *
2047  * Since 0.10.15
2048  *
2049  * Returns: #GstFlowReturn
2050  */
2051 GstFlowReturn
2052 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2053     GstClockTimeDiff * jitter)
2054 {
2055   GstClockReturn status;
2056   GstFlowReturn ret;
2057
2058   do {
2059     GstClockTime stime;
2060
2061     GST_DEBUG_OBJECT (sink, "checking preroll");
2062
2063     /* first wait for the playing state before we can continue */
2064     if (G_UNLIKELY (sink->need_preroll)) {
2065       ret = gst_base_sink_wait_preroll (sink);
2066       if (ret != GST_FLOW_OK) {
2067         if (ret == GST_FLOW_STEP)
2068           ret = GST_FLOW_OK;
2069         else
2070           goto flushing;
2071       }
2072     }
2073
2074     /* preroll done, we can sync since we are in PLAYING now. */
2075     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2076         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2077
2078     /* compensate for latency and ts_offset. We don't adjust for render delay
2079      * because we don't interact with the device on EOS normally. */
2080     stime = gst_base_sink_adjust_time (sink, time);
2081
2082     /* wait for the clock, this can be interrupted because we got shut down or 
2083      * we PAUSED. */
2084     status = gst_base_sink_wait_clock (sink, stime, jitter);
2085
2086     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2087
2088     /* invalid time, no clock or sync disabled, just continue then */
2089     if (status == GST_CLOCK_BADTIME)
2090       break;
2091
2092     /* waiting could have been interrupted and we can be flushing now */
2093     if (G_UNLIKELY (sink->flushing))
2094       goto flushing;
2095
2096     /* retry if we got unscheduled, which means we did not reach the timeout
2097      * yet. if some other error occures, we continue. */
2098   } while (status == GST_CLOCK_UNSCHEDULED);
2099
2100   GST_DEBUG_OBJECT (sink, "end of stream");
2101
2102   return GST_FLOW_OK;
2103
2104   /* ERRORS */
2105 flushing:
2106   {
2107     GST_DEBUG_OBJECT (sink, "we are flushing");
2108     return GST_FLOW_WRONG_STATE;
2109   }
2110 }
2111
2112 /* with STREAM_LOCK, PREROLL_LOCK
2113  *
2114  * Make sure we are in PLAYING and synchronize an object to the clock.
2115  *
2116  * If we need preroll, we are not in PLAYING. We try to commit the state
2117  * if needed and then block if we still are not PLAYING.
2118  *
2119  * We start waiting on the clock in PLAYING. If we got interrupted, we
2120  * immediatly try to re-preroll.
2121  *
2122  * Some objects do not need synchronisation (most events) and so this function
2123  * immediatly returns GST_FLOW_OK.
2124  *
2125  * for objects that arrive later than max-lateness to be synchronized to the 
2126  * clock have the @late boolean set to TRUE.
2127  *
2128  * This function keeps a running average of the jitter (the diff between the
2129  * clock time and the requested sync time). The jitter is negative for
2130  * objects that arrive in time and positive for late buffers.
2131  *
2132  * does not take ownership of obj.
2133  */
2134 static GstFlowReturn
2135 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2136     GstMiniObject * obj, gboolean * late)
2137 {
2138   GstClockTimeDiff jitter;
2139   gboolean syncable;
2140   GstClockReturn status = GST_CLOCK_OK;
2141   GstClockTime rstart, rstop, sstart, sstop, stime;
2142   gboolean do_sync;
2143   GstBaseSinkPrivate *priv;
2144   GstFlowReturn ret;
2145   GstStepInfo *current, *pending;
2146   gboolean stepped;
2147
2148   priv = basesink->priv;
2149
2150 do_step:
2151   sstart = sstop = rstart = rstop = -1;
2152   do_sync = TRUE;
2153   stepped = FALSE;
2154
2155   priv->current_rstart = -1;
2156
2157   /* get stepping info */
2158   current = &priv->current_step;
2159   pending = &priv->pending_step;
2160
2161   /* get timing information for this object against the render segment */
2162   syncable = gst_base_sink_get_sync_times (basesink, obj,
2163       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2164       current);
2165
2166   if (G_UNLIKELY (stepped))
2167     goto step_skipped;
2168
2169   /* a syncable object needs to participate in preroll and
2170    * clocking. All buffers and EOS are syncable. */
2171   if (G_UNLIKELY (!syncable))
2172     goto not_syncable;
2173
2174   /* store timing info for current object */
2175   priv->current_rstart = rstart;
2176   priv->current_rstop = (rstop != -1 ? rstop : rstart);
2177
2178   /* save sync time for eos when the previous object needed sync */
2179   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
2180
2181 again:
2182   /* first do preroll, this makes sure we commit our state
2183    * to PAUSED and can continue to PLAYING. We cannot perform
2184    * any clock sync in PAUSED because there is no clock. */
2185   ret = gst_base_sink_do_preroll (basesink, obj);
2186   if (G_UNLIKELY (ret != GST_FLOW_OK))
2187     goto preroll_failed;
2188
2189   /* After rendering we store the position of the last buffer so that we can use
2190    * it to report the position. We need to take the lock here. */
2191   GST_OBJECT_LOCK (basesink);
2192   priv->current_sstart = sstart;
2193   priv->current_sstop = (sstop != -1 ? sstop : sstart);
2194   GST_OBJECT_UNLOCK (basesink);
2195
2196   /* update the segment with a pending step if the current one is invalid and we
2197    * have a new pending one. We only accept new step updates after a preroll */
2198   if (G_UNLIKELY (pending->valid && !current->valid)) {
2199     start_stepping (basesink, &basesink->segment, pending, current);
2200     goto do_step;
2201   }
2202
2203   if (!do_sync)
2204     goto done;
2205
2206   /* adjust for latency */
2207   stime = gst_base_sink_adjust_time (basesink, rstart);
2208
2209   /* adjust for render-delay, avoid underflows */
2210   if (stime != -1) {
2211     if (stime > priv->render_delay)
2212       stime -= priv->render_delay;
2213     else
2214       stime = 0;
2215   }
2216
2217   /* preroll done, we can sync since we are in PLAYING now. */
2218   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2219       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2220       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2221
2222   /* This function will return immediatly if start == -1, no clock
2223    * or sync is disabled with GST_CLOCK_BADTIME. */
2224   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2225
2226   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
2227
2228   /* invalid time, no clock or sync disabled, just render */
2229   if (status == GST_CLOCK_BADTIME)
2230     goto done;
2231
2232   /* waiting could have been interrupted and we can be flushing now */
2233   if (G_UNLIKELY (basesink->flushing))
2234     goto flushing;
2235
2236   /* check for unlocked by a state change, we are not flushing so
2237    * we can try to preroll on the current buffer. */
2238   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2239     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2240     priv->call_preroll = TRUE;
2241     goto again;
2242   }
2243
2244   /* successful syncing done, record observation */
2245   priv->current_jitter = jitter;
2246
2247   /* check if the object should be dropped */
2248   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2249       status, jitter);
2250
2251 done:
2252   return GST_FLOW_OK;
2253
2254   /* ERRORS */
2255 step_skipped:
2256   {
2257     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2258     *late = TRUE;
2259     return GST_FLOW_OK;
2260   }
2261 not_syncable:
2262   {
2263     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2264     return GST_FLOW_OK;
2265   }
2266 flushing:
2267   {
2268     GST_DEBUG_OBJECT (basesink, "we are flushing");
2269     return GST_FLOW_WRONG_STATE;
2270   }
2271 preroll_failed:
2272   {
2273     GST_DEBUG_OBJECT (basesink, "preroll failed");
2274     return ret;
2275   }
2276 }
2277
2278 static gboolean
2279 gst_base_sink_send_qos (GstBaseSink * basesink,
2280     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2281 {
2282   GstEvent *event;
2283   gboolean res;
2284
2285   /* generate Quality-of-Service event */
2286   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2287       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2288       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2289
2290   event = gst_event_new_qos (proportion, diff, time);
2291
2292   /* send upstream */
2293   res = gst_pad_push_event (basesink->sinkpad, event);
2294
2295   return res;
2296 }
2297
2298 static void
2299 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2300 {
2301   GstBaseSinkPrivate *priv;
2302   GstClockTime start, stop;
2303   GstClockTimeDiff jitter;
2304   GstClockTime pt, entered, left;
2305   GstClockTime duration;
2306   gdouble rate;
2307
2308   priv = sink->priv;
2309
2310   start = priv->current_rstart;
2311
2312   /* if Quality-of-Service disabled, do nothing */
2313   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2314     return;
2315
2316   stop = priv->current_rstop;
2317   jitter = priv->current_jitter;
2318
2319   if (jitter < 0) {
2320     /* this is the time the buffer entered the sink */
2321     if (start < -jitter)
2322       entered = 0;
2323     else
2324       entered = start + jitter;
2325     left = start;
2326   } else {
2327     /* this is the time the buffer entered the sink */
2328     entered = start + jitter;
2329     /* this is the time the buffer left the sink */
2330     left = start + jitter;
2331   }
2332
2333   /* calculate duration of the buffer */
2334   if (stop != -1)
2335     duration = stop - start;
2336   else
2337     duration = -1;
2338
2339   /* if we have the time when the last buffer left us, calculate
2340    * processing time */
2341   if (priv->last_left != -1) {
2342     if (entered > priv->last_left) {
2343       pt = entered - priv->last_left;
2344     } else {
2345       pt = 0;
2346     }
2347   } else {
2348     pt = priv->avg_pt;
2349   }
2350
2351   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2352       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2353       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2354       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2355       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2356       jitter);
2357
2358   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2359       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2360       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2361       priv->avg_rate);
2362
2363   /* collect running averages. for first observations, we copy the
2364    * values */
2365   if (priv->avg_duration == -1)
2366     priv->avg_duration = duration;
2367   else
2368     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2369
2370   if (priv->avg_pt == -1)
2371     priv->avg_pt = pt;
2372   else
2373     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2374
2375   if (priv->avg_duration != 0)
2376     rate =
2377         gst_guint64_to_gdouble (priv->avg_pt) /
2378         gst_guint64_to_gdouble (priv->avg_duration);
2379   else
2380     rate = 0.0;
2381
2382   if (priv->last_left != -1) {
2383     if (dropped || priv->avg_rate < 0.0) {
2384       priv->avg_rate = rate;
2385     } else {
2386       if (rate > 1.0)
2387         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2388       else
2389         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2390     }
2391   }
2392
2393   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2394       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2395       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2396       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2397
2398
2399   if (priv->avg_rate >= 0.0) {
2400     /* if we have a valid rate, start sending QoS messages */
2401     if (priv->current_jitter < 0) {
2402       /* make sure we never go below 0 when adding the jitter to the
2403        * timestamp. */
2404       if (priv->current_rstart < -priv->current_jitter)
2405         priv->current_jitter = -priv->current_rstart;
2406     }
2407     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2408         priv->current_jitter);
2409   }
2410
2411   /* record when this buffer will leave us */
2412   priv->last_left = left;
2413 }
2414
2415 /* reset all qos measuring */
2416 static void
2417 gst_base_sink_reset_qos (GstBaseSink * sink)
2418 {
2419   GstBaseSinkPrivate *priv;
2420
2421   priv = sink->priv;
2422
2423   priv->last_in_time = -1;
2424   priv->last_left = -1;
2425   priv->avg_duration = -1;
2426   priv->avg_pt = -1;
2427   priv->avg_rate = -1.0;
2428   priv->avg_render = -1;
2429   priv->rendered = 0;
2430   priv->dropped = 0;
2431
2432 }
2433
2434 /* Checks if the object was scheduled too late.
2435  *
2436  * start/stop contain the raw timestamp start and stop values
2437  * of the object.
2438  *
2439  * status and jitter contain the return values from the clock wait.
2440  *
2441  * returns TRUE if the buffer was too late.
2442  */
2443 static gboolean
2444 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2445     GstClockTime start, GstClockTime stop,
2446     GstClockReturn status, GstClockTimeDiff jitter)
2447 {
2448   gboolean late;
2449   gint64 max_lateness;
2450   GstBaseSinkPrivate *priv;
2451
2452   priv = basesink->priv;
2453
2454   late = FALSE;
2455
2456   /* only for objects that were too late */
2457   if (G_LIKELY (status != GST_CLOCK_EARLY))
2458     goto in_time;
2459
2460   max_lateness = basesink->abidata.ABI.max_lateness;
2461
2462   /* check if frame dropping is enabled */
2463   if (max_lateness == -1)
2464     goto no_drop;
2465
2466   /* only check for buffers */
2467   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2468     goto not_buffer;
2469
2470   /* can't do check if we don't have a timestamp */
2471   if (G_UNLIKELY (start == -1))
2472     goto no_timestamp;
2473
2474   /* we can add a valid stop time */
2475   if (stop != -1)
2476     max_lateness += stop;
2477   else
2478     max_lateness += start;
2479
2480   /* if the jitter bigger than duration and lateness we are too late */
2481   if ((late = start + jitter > max_lateness)) {
2482     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2483         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2484         GST_TIME_ARGS (max_lateness));
2485     /* !!emergency!!, if we did not receive anything valid for more than a 
2486      * second, render it anyway so the user sees something */
2487     if (priv->last_in_time != -1 && start - priv->last_in_time > GST_SECOND) {
2488       late = FALSE;
2489       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2490           (_("A lot of buffers are being dropped.")),
2491           ("There may be a timestamping problem, or this computer is too slow."));
2492       GST_DEBUG_OBJECT (basesink,
2493           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2494           GST_TIME_ARGS (priv->last_in_time));
2495     }
2496   }
2497
2498 done:
2499   if (!late) {
2500     priv->last_in_time = start;
2501   }
2502   return late;
2503
2504   /* all is fine */
2505 in_time:
2506   {
2507     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2508     goto done;
2509   }
2510 no_drop:
2511   {
2512     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2513     goto done;
2514   }
2515 not_buffer:
2516   {
2517     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2518     return FALSE;
2519   }
2520 no_timestamp:
2521   {
2522     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2523     return FALSE;
2524   }
2525 }
2526
2527 /* called before and after calling the render vmethod. It keeps track of how
2528  * much time was spent in the render method and is used to check if we are
2529  * flooded */
2530 static void
2531 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2532 {
2533   GstBaseSinkPrivate *priv;
2534
2535   priv = basesink->priv;
2536
2537   if (start) {
2538     priv->start = gst_util_get_timestamp ();
2539   } else {
2540     GstClockTime elapsed;
2541
2542     priv->stop = gst_util_get_timestamp ();
2543
2544     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2545
2546     if (priv->avg_render == -1)
2547       priv->avg_render = elapsed;
2548     else
2549       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2550
2551     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2552         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2553   }
2554 }
2555
2556 /* with STREAM_LOCK, PREROLL_LOCK,
2557  *
2558  * Synchronize the object on the clock and then render it.
2559  *
2560  * takes ownership of obj.
2561  */
2562 static GstFlowReturn
2563 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2564     GstMiniObject * obj)
2565 {
2566   GstFlowReturn ret;
2567   GstBaseSinkClass *bclass;
2568   gboolean late;
2569
2570   GstBaseSinkPrivate *priv;
2571
2572   priv = basesink->priv;
2573
2574 again:
2575   late = FALSE;
2576   ret = GST_FLOW_OK;
2577
2578   /* synchronize this object, non syncable objects return OK
2579    * immediatly. */
2580   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2581   if (G_UNLIKELY (ret != GST_FLOW_OK))
2582     goto sync_failed;
2583
2584   /* and now render, event or buffer. */
2585   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2586     GstBuffer *buf;
2587
2588     /* drop late buffers unconditionally, let's hope it's unlikely */
2589     if (G_UNLIKELY (late))
2590       goto dropped;
2591
2592     buf = GST_BUFFER_CAST (obj);
2593
2594     gst_base_sink_set_last_buffer (basesink, buf);
2595
2596     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2597
2598     if (G_LIKELY (bclass->render)) {
2599       gint do_qos;
2600
2601       /* read once, to get same value before and after */
2602       do_qos = g_atomic_int_get (&priv->qos_enabled);
2603
2604       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2605
2606       /* record rendering time for QoS and stats */
2607       if (do_qos)
2608         gst_base_sink_do_render_stats (basesink, TRUE);
2609
2610       ret = bclass->render (basesink, buf);
2611
2612       if (do_qos)
2613         gst_base_sink_do_render_stats (basesink, FALSE);
2614
2615       if (ret == GST_FLOW_STEP)
2616         goto again;
2617
2618       priv->rendered++;
2619     }
2620   } else {
2621     GstEvent *event = GST_EVENT_CAST (obj);
2622     gboolean event_res = TRUE;
2623     GstEventType type;
2624
2625     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2626
2627     type = GST_EVENT_TYPE (event);
2628
2629     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2630         gst_event_type_get_name (type));
2631
2632     if (bclass->event)
2633       event_res = bclass->event (basesink, event);
2634
2635     /* when we get here we could be flushing again when the event handler calls
2636      * _wait_eos(). We have to ignore this object in that case. */
2637     if (G_UNLIKELY (basesink->flushing))
2638       goto flushing;
2639
2640     if (G_LIKELY (event_res)) {
2641       guint32 seqnum;
2642
2643       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2644       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2645
2646       switch (type) {
2647         case GST_EVENT_EOS:
2648         {
2649           GstMessage *message;
2650
2651           /* the EOS event is completely handled so we mark
2652            * ourselves as being in the EOS state. eos is also 
2653            * protected by the object lock so we can read it when 
2654            * answering the POSITION query. */
2655           GST_OBJECT_LOCK (basesink);
2656           basesink->eos = TRUE;
2657           GST_OBJECT_UNLOCK (basesink);
2658
2659           /* ok, now we can post the message */
2660           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2661
2662           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2663           gst_message_set_seqnum (message, seqnum);
2664           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2665           break;
2666         }
2667         case GST_EVENT_NEWSEGMENT:
2668           /* configure the segment */
2669           gst_base_sink_configure_segment (basesink, pad, event,
2670               &basesink->segment);
2671           break;
2672         default:
2673           break;
2674       }
2675     }
2676   }
2677
2678 done:
2679   gst_base_sink_perform_qos (basesink, late);
2680
2681   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2682   gst_mini_object_unref (obj);
2683
2684   return ret;
2685
2686   /* ERRORS */
2687 sync_failed:
2688   {
2689     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2690     goto done;
2691   }
2692 dropped:
2693   {
2694     priv->dropped++;
2695     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2696     goto done;
2697   }
2698 flushing:
2699   {
2700     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2701     gst_mini_object_unref (obj);
2702     return GST_FLOW_WRONG_STATE;
2703   }
2704 }
2705
2706 /* with STREAM_LOCK, PREROLL_LOCK
2707  *
2708  * Perform preroll on the given object. For buffers this means 
2709  * calling the preroll subclass method. 
2710  * If that succeeds, the state will be commited.
2711  *
2712  * function does not take ownership of obj.
2713  */
2714 static GstFlowReturn
2715 gst_base_sink_preroll_object (GstBaseSink * basesink, GstMiniObject * obj)
2716 {
2717   GstFlowReturn ret;
2718
2719   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2720
2721   /* if it's a buffer, we need to call the preroll method */
2722   if (G_LIKELY (GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2723     GstBaseSinkClass *bclass;
2724     GstBuffer *buf;
2725     GstClockTime timestamp;
2726
2727     buf = GST_BUFFER_CAST (obj);
2728     timestamp = GST_BUFFER_TIMESTAMP (buf);
2729
2730     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2731         GST_TIME_ARGS (timestamp));
2732
2733     gst_base_sink_set_last_buffer (basesink, buf);
2734
2735     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2736     if (bclass->preroll)
2737       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2738         goto preroll_failed;
2739
2740     basesink->priv->call_preroll = FALSE;
2741   }
2742
2743   /* commit state */
2744   if (G_LIKELY (basesink->playing_async)) {
2745     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2746       goto stopping;
2747   }
2748
2749   return GST_FLOW_OK;
2750
2751   /* ERRORS */
2752 preroll_failed:
2753   {
2754     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2755     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2756     return ret;
2757   }
2758 stopping:
2759   {
2760     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2761     return GST_FLOW_WRONG_STATE;
2762   }
2763 }
2764
2765 /* with STREAM_LOCK, PREROLL_LOCK 
2766  *
2767  * Queue an object for rendering.
2768  * The first prerollable object queued will complete the preroll. If the
2769  * preroll queue if filled, we render all the objects in the queue.
2770  *
2771  * This function takes ownership of the object.
2772  */
2773 static GstFlowReturn
2774 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2775     GstMiniObject * obj, gboolean prerollable)
2776 {
2777   GstFlowReturn ret = GST_FLOW_OK;
2778   gint length;
2779   GQueue *q;
2780
2781   if (G_UNLIKELY (basesink->need_preroll)) {
2782     if (G_LIKELY (prerollable))
2783       basesink->preroll_queued++;
2784
2785     length = basesink->preroll_queued;
2786
2787     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2788
2789     /* first prerollable item needs to finish the preroll */
2790     if (length == 1) {
2791       ret = gst_base_sink_preroll_object (basesink, obj);
2792       if (G_UNLIKELY (ret != GST_FLOW_OK))
2793         goto preroll_failed;
2794     }
2795     /* need to recheck if we need preroll, commmit state during preroll 
2796      * could have made us not need more preroll. */
2797     if (G_UNLIKELY (basesink->need_preroll)) {
2798       /* see if we can render now, if we can't add the object to the preroll
2799        * queue. */
2800       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2801         goto more_preroll;
2802     }
2803   }
2804
2805   /* we can start rendering (or blocking) the queued object
2806    * if any. */
2807   q = basesink->preroll_queue;
2808   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2809     GstMiniObject *o;
2810
2811     o = g_queue_pop_head (q);
2812     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2813
2814     /* do something with the return value */
2815     ret = gst_base_sink_render_object (basesink, pad, o);
2816     if (ret != GST_FLOW_OK)
2817       goto dequeue_failed;
2818   }
2819
2820   /* now render the object */
2821   ret = gst_base_sink_render_object (basesink, pad, obj);
2822   basesink->preroll_queued = 0;
2823
2824   return ret;
2825
2826   /* special cases */
2827 preroll_failed:
2828   {
2829     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2830         gst_flow_get_name (ret));
2831     gst_mini_object_unref (obj);
2832     return ret;
2833   }
2834 more_preroll:
2835   {
2836     /* add object to the queue and return */
2837     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2838         length, basesink->preroll_queue_max_len);
2839     g_queue_push_tail (basesink->preroll_queue, obj);
2840     return GST_FLOW_OK;
2841   }
2842 dequeue_failed:
2843   {
2844     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2845         gst_flow_get_name (ret));
2846     gst_mini_object_unref (obj);
2847     return ret;
2848   }
2849 }
2850
2851 /* with STREAM_LOCK
2852  *
2853  * This function grabs the PREROLL_LOCK and adds the object to
2854  * the queue.
2855  *
2856  * This function takes ownership of obj.
2857  */
2858 static GstFlowReturn
2859 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2860     GstMiniObject * obj, gboolean prerollable)
2861 {
2862   GstFlowReturn ret;
2863
2864   GST_PAD_PREROLL_LOCK (pad);
2865   if (G_UNLIKELY (basesink->flushing))
2866     goto flushing;
2867
2868   if (G_UNLIKELY (basesink->priv->received_eos))
2869     goto was_eos;
2870
2871   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2872   GST_PAD_PREROLL_UNLOCK (pad);
2873
2874   return ret;
2875
2876   /* ERRORS */
2877 flushing:
2878   {
2879     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2880     GST_PAD_PREROLL_UNLOCK (pad);
2881     gst_mini_object_unref (obj);
2882     return GST_FLOW_WRONG_STATE;
2883   }
2884 was_eos:
2885   {
2886     GST_DEBUG_OBJECT (basesink,
2887         "we are EOS, dropping object, return UNEXPECTED");
2888     GST_PAD_PREROLL_UNLOCK (pad);
2889     gst_mini_object_unref (obj);
2890     return GST_FLOW_UNEXPECTED;
2891   }
2892 }
2893
2894 static void
2895 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2896 {
2897   /* make sure we are not blocked on the clock also clear any pending
2898    * eos state. */
2899   gst_base_sink_set_flushing (basesink, pad, TRUE);
2900
2901   /* we grab the stream lock but that is not needed since setting the
2902    * sink to flushing would make sure no state commit is being done
2903    * anymore */
2904   GST_PAD_STREAM_LOCK (pad);
2905   gst_base_sink_reset_qos (basesink);
2906   if (basesink->priv->async_enabled) {
2907     /* and we need to commit our state again on the next
2908      * prerolled buffer */
2909     basesink->playing_async = TRUE;
2910     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2911   } else {
2912     basesink->priv->have_latency = TRUE;
2913     basesink->need_preroll = FALSE;
2914   }
2915   gst_base_sink_set_last_buffer (basesink, NULL);
2916   GST_PAD_STREAM_UNLOCK (pad);
2917 }
2918
2919 static void
2920 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2921 {
2922   /* unset flushing so we can accept new data, this also flushes out any EOS
2923    * event. */
2924   gst_base_sink_set_flushing (basesink, pad, FALSE);
2925
2926   /* for position reporting */
2927   GST_OBJECT_LOCK (basesink);
2928   basesink->priv->current_sstart = -1;
2929   basesink->priv->current_sstop = -1;
2930   basesink->priv->eos_rtime = -1;
2931   basesink->priv->call_preroll = TRUE;
2932   basesink->priv->current_step.valid = FALSE;
2933   basesink->priv->pending_step.valid = FALSE;
2934   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2935     /* we need new segment info after the flush. */
2936     basesink->have_newsegment = FALSE;
2937     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2938     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2939   }
2940   GST_OBJECT_UNLOCK (basesink);
2941 }
2942
2943 static gboolean
2944 gst_base_sink_event (GstPad * pad, GstEvent * event)
2945 {
2946   GstBaseSink *basesink;
2947   gboolean result = TRUE;
2948   GstBaseSinkClass *bclass;
2949
2950   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2951
2952   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2953
2954   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2955       GST_EVENT_TYPE_NAME (event));
2956
2957   switch (GST_EVENT_TYPE (event)) {
2958     case GST_EVENT_EOS:
2959     {
2960       GstFlowReturn ret;
2961
2962       GST_PAD_PREROLL_LOCK (pad);
2963       if (G_UNLIKELY (basesink->flushing))
2964         goto flushing;
2965
2966       if (G_UNLIKELY (basesink->priv->received_eos)) {
2967         /* we can't accept anything when we are EOS */
2968         result = FALSE;
2969         gst_event_unref (event);
2970       } else {
2971         /* we set the received EOS flag here so that we can use it when testing if
2972          * we are prerolled and to refuse more buffers. */
2973         basesink->priv->received_eos = TRUE;
2974
2975         /* EOS is a prerollable object, we call the unlocked version because it
2976          * does not check the received_eos flag. */
2977         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2978             GST_MINI_OBJECT_CAST (event), TRUE);
2979         if (G_UNLIKELY (ret != GST_FLOW_OK))
2980           result = FALSE;
2981       }
2982       GST_PAD_PREROLL_UNLOCK (pad);
2983       break;
2984     }
2985     case GST_EVENT_NEWSEGMENT:
2986     {
2987       GstFlowReturn ret;
2988
2989       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2990
2991       GST_PAD_PREROLL_LOCK (pad);
2992       if (G_UNLIKELY (basesink->flushing))
2993         goto flushing;
2994
2995       if (G_UNLIKELY (basesink->priv->received_eos)) {
2996         /* we can't accept anything when we are EOS */
2997         result = FALSE;
2998         gst_event_unref (event);
2999       } else {
3000         /* the new segment is a non prerollable item and does not block anything,
3001          * we need to configure the current clipping segment and insert the event 
3002          * in the queue to serialize it with the buffers for rendering. */
3003         gst_base_sink_configure_segment (basesink, pad, event,
3004             basesink->abidata.ABI.clip_segment);
3005
3006         ret =
3007             gst_base_sink_queue_object_unlocked (basesink, pad,
3008             GST_MINI_OBJECT_CAST (event), FALSE);
3009         if (G_UNLIKELY (ret != GST_FLOW_OK))
3010           result = FALSE;
3011         else {
3012           GST_OBJECT_LOCK (basesink);
3013           basesink->have_newsegment = TRUE;
3014           GST_OBJECT_UNLOCK (basesink);
3015         }
3016       }
3017       GST_PAD_PREROLL_UNLOCK (pad);
3018       break;
3019     }
3020     case GST_EVENT_FLUSH_START:
3021       if (bclass->event)
3022         bclass->event (basesink, event);
3023
3024       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3025
3026       gst_base_sink_flush_start (basesink, pad);
3027
3028       gst_event_unref (event);
3029       break;
3030     case GST_EVENT_FLUSH_STOP:
3031       if (bclass->event)
3032         bclass->event (basesink, event);
3033
3034       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3035
3036       gst_base_sink_flush_stop (basesink, pad);
3037
3038       gst_event_unref (event);
3039       break;
3040     default:
3041       /* other events are sent to queue or subclass depending on if they
3042        * are serialized. */
3043       if (GST_EVENT_IS_SERIALIZED (event)) {
3044         gst_base_sink_queue_object (basesink, pad,
3045             GST_MINI_OBJECT_CAST (event), FALSE);
3046       } else {
3047         if (bclass->event)
3048           bclass->event (basesink, event);
3049         gst_event_unref (event);
3050       }
3051       break;
3052   }
3053 done:
3054   gst_object_unref (basesink);
3055
3056   return result;
3057
3058   /* ERRORS */
3059 flushing:
3060   {
3061     GST_DEBUG_OBJECT (basesink, "we are flushing");
3062     GST_PAD_PREROLL_UNLOCK (pad);
3063     result = FALSE;
3064     gst_event_unref (event);
3065     goto done;
3066   }
3067 }
3068
3069 /* default implementation to calculate the start and end
3070  * timestamps on a buffer, subclasses can override
3071  */
3072 static void
3073 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3074     GstClockTime * start, GstClockTime * end)
3075 {
3076   GstClockTime timestamp, duration;
3077
3078   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3079   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3080
3081     /* get duration to calculate end time */
3082     duration = GST_BUFFER_DURATION (buffer);
3083     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3084       *end = timestamp + duration;
3085     }
3086     *start = timestamp;
3087   }
3088 }
3089
3090 /* must be called with PREROLL_LOCK */
3091 static gboolean
3092 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3093 {
3094   gboolean is_prerolled, res;
3095
3096   /* we have 2 cases where the PREROLL_LOCK is released:
3097    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3098    *  2) we are syncing on the clock
3099    */
3100   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3101   res = !is_prerolled;
3102
3103   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3104       basesink->have_preroll, basesink->priv->received_eos, res);
3105
3106   return res;
3107 }
3108
3109 /* with STREAM_LOCK, PREROLL_LOCK 
3110  *
3111  * Takes a buffer and compare the timestamps with the last segment.
3112  * If the buffer falls outside of the segment boundaries, drop it.
3113  * Else queue the buffer for preroll and rendering.
3114  *
3115  * This function takes ownership of the buffer.
3116  */
3117 static GstFlowReturn
3118 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3119     GstBuffer * buf)
3120 {
3121   GstBaseSinkClass *bclass;
3122   GstFlowReturn result;
3123   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3124   GstSegment *clip_segment;
3125
3126   if (G_UNLIKELY (basesink->flushing))
3127     goto flushing;
3128
3129   if (G_UNLIKELY (basesink->priv->received_eos))
3130     goto was_eos;
3131
3132   /* for code clarity */
3133   clip_segment = basesink->abidata.ABI.clip_segment;
3134
3135   if (G_UNLIKELY (!basesink->have_newsegment)) {
3136     gboolean sync;
3137
3138     sync = gst_base_sink_get_sync (basesink);
3139     if (sync) {
3140       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3141           (_("Internal data flow problem.")),
3142           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3143     }
3144
3145     /* this means this sink will assume timestamps start from 0 */
3146     GST_OBJECT_LOCK (basesink);
3147     clip_segment->start = 0;
3148     clip_segment->stop = -1;
3149     basesink->segment.start = 0;
3150     basesink->segment.stop = -1;
3151     basesink->have_newsegment = TRUE;
3152     GST_OBJECT_UNLOCK (basesink);
3153   }
3154
3155   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3156
3157   /* check if the buffer needs to be dropped, we first ask the subclass for the
3158    * start and end */
3159   if (bclass->get_times)
3160     bclass->get_times (basesink, buf, &start, &end);
3161
3162   if (start == -1) {
3163     /* if the subclass does not want sync, we use our own values so that we at
3164      * least clip the buffer to the segment */
3165     gst_base_sink_get_times (basesink, buf, &start, &end);
3166   }
3167
3168   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3169       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3170
3171   /* a dropped buffer does not participate in anything */
3172   if (GST_CLOCK_TIME_IS_VALID (start) &&
3173       (clip_segment->format == GST_FORMAT_TIME)) {
3174     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3175                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3176       goto out_of_segment;
3177   }
3178
3179   /* now we can process the buffer in the queue, this function takes ownership
3180    * of the buffer */
3181   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3182       GST_MINI_OBJECT_CAST (buf), TRUE);
3183
3184   return result;
3185
3186   /* ERRORS */
3187 flushing:
3188   {
3189     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3190     gst_buffer_unref (buf);
3191     return GST_FLOW_WRONG_STATE;
3192   }
3193 was_eos:
3194   {
3195     GST_DEBUG_OBJECT (basesink,
3196         "we are EOS, dropping object, return UNEXPECTED");
3197     gst_buffer_unref (buf);
3198     return GST_FLOW_UNEXPECTED;
3199   }
3200 out_of_segment:
3201   {
3202     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3203     gst_buffer_unref (buf);
3204     return GST_FLOW_OK;
3205   }
3206 }
3207
3208 /* with STREAM_LOCK
3209  */
3210 static GstFlowReturn
3211 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3212 {
3213   GstBaseSink *basesink;
3214   GstFlowReturn result;
3215
3216   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3217
3218   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3219     goto wrong_mode;
3220
3221   GST_PAD_PREROLL_LOCK (pad);
3222   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3223   GST_PAD_PREROLL_UNLOCK (pad);
3224
3225 done:
3226   return result;
3227
3228   /* ERRORS */
3229 wrong_mode:
3230   {
3231     GST_OBJECT_LOCK (pad);
3232     GST_WARNING_OBJECT (basesink,
3233         "Push on pad %s:%s, but it was not activated in push mode",
3234         GST_DEBUG_PAD_NAME (pad));
3235     GST_OBJECT_UNLOCK (pad);
3236     gst_buffer_unref (buf);
3237     /* we don't post an error message this will signal to the peer
3238      * pushing that EOS is reached. */
3239     result = GST_FLOW_UNEXPECTED;
3240     goto done;
3241   }
3242 }
3243
3244 static gboolean
3245 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3246 {
3247   gboolean res = TRUE;
3248
3249   /* update our offset if the start/stop position was updated */
3250   if (segment->format == GST_FORMAT_BYTES) {
3251     segment->time = segment->start;
3252   } else if (segment->start == 0) {
3253     /* seek to start, we can implement a default for this. */
3254     segment->time = 0;
3255   } else {
3256     res = FALSE;
3257     GST_INFO_OBJECT (sink, "Can't do a default seek");
3258   }
3259
3260   return res;
3261 }
3262
3263 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3264
3265 static gboolean
3266 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3267     GstEvent * event, GstSegment * segment)
3268 {
3269   /* By default, we try one of 2 things:
3270    *   - For absolute seek positions, convert the requested position to our 
3271    *     configured processing format and place it in the output segment \
3272    *   - For relative seek positions, convert our current (input) values to the
3273    *     seek format, adjust by the relative seek offset and then convert back to
3274    *     the processing format
3275    */
3276   GstSeekType cur_type, stop_type;
3277   gint64 cur, stop;
3278   GstSeekFlags flags;
3279   GstFormat seek_format, dest_format;
3280   gdouble rate;
3281   gboolean update;
3282   gboolean res = TRUE;
3283
3284   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3285       &cur_type, &cur, &stop_type, &stop);
3286   dest_format = segment->format;
3287
3288   if (seek_format == dest_format) {
3289     gst_segment_set_seek (segment, rate, seek_format, flags,
3290         cur_type, cur, stop_type, stop, &update);
3291     return TRUE;
3292   }
3293
3294   if (cur_type != GST_SEEK_TYPE_NONE) {
3295     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3296     res =
3297         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3298         &cur);
3299     cur_type = GST_SEEK_TYPE_SET;
3300   }
3301
3302   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3303     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3304     res =
3305         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3306         &stop);
3307     stop_type = GST_SEEK_TYPE_SET;
3308   }
3309
3310   /* And finally, configure our output segment in the desired format */
3311   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3312       stop_type, stop, &update);
3313
3314   if (!res)
3315     goto no_format;
3316
3317   return res;
3318
3319 no_format:
3320   {
3321     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3322     return FALSE;
3323   }
3324 }
3325
3326 /* perform a seek, only executed in pull mode */
3327 static gboolean
3328 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3329 {
3330   gboolean flush;
3331   gdouble rate;
3332   GstFormat seek_format, dest_format;
3333   GstSeekFlags flags;
3334   GstSeekType cur_type, stop_type;
3335   gboolean seekseg_configured = FALSE;
3336   gint64 cur, stop;
3337   gboolean update, res = TRUE;
3338   GstSegment seeksegment;
3339
3340   dest_format = sink->segment.format;
3341
3342   if (event) {
3343     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3344     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3345         &cur_type, &cur, &stop_type, &stop);
3346
3347     flush = flags & GST_SEEK_FLAG_FLUSH;
3348   } else {
3349     GST_DEBUG_OBJECT (sink, "performing seek without event");
3350     flush = FALSE;
3351   }
3352
3353   if (flush) {
3354     GST_DEBUG_OBJECT (sink, "flushing upstream");
3355     gst_pad_push_event (pad, gst_event_new_flush_start ());
3356     gst_base_sink_flush_start (sink, pad);
3357   } else {
3358     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3359   }
3360
3361   GST_PAD_STREAM_LOCK (pad);
3362
3363   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3364    * copy the current segment info into the temp segment that we can actually
3365    * attempt the seek with. We only update the real segment if the seek suceeds. */
3366   if (!seekseg_configured) {
3367     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3368
3369     /* now configure the final seek segment */
3370     if (event) {
3371       if (sink->segment.format != seek_format) {
3372         /* OK, here's where we give the subclass a chance to convert the relative
3373          * seek into an absolute one in the processing format. We set up any
3374          * absolute seek above, before taking the stream lock. */
3375         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3376                 &seeksegment)) {
3377           GST_DEBUG_OBJECT (sink,
3378               "Preparing the seek failed after flushing. " "Aborting seek");
3379           res = FALSE;
3380         }
3381       } else {
3382         /* The seek format matches our processing format, no need to ask the
3383          * the subclass to configure the segment. */
3384         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3385             cur_type, cur, stop_type, stop, &update);
3386       }
3387     }
3388     /* Else, no seek event passed, so we're just (re)starting the 
3389        current segment. */
3390   }
3391
3392   if (res) {
3393     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3394         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3395         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3396
3397     /* do the seek, segment.last_stop contains the new position. */
3398     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3399   }
3400
3401
3402   if (flush) {
3403     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3404     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3405     gst_base_sink_flush_stop (sink, pad);
3406   } else if (res && sink->abidata.ABI.running) {
3407     /* we are running the current segment and doing a non-flushing seek, 
3408      * close the segment first based on the last_stop. */
3409     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3410         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3411   }
3412
3413   /* The subclass must have converted the segment to the processing format 
3414    * by now */
3415   if (res && seeksegment.format != dest_format) {
3416     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3417         "in the correct format. Aborting seek.");
3418     res = FALSE;
3419   }
3420
3421   /* if successfull seek, we update our real segment and push
3422    * out the new segment. */
3423   if (res) {
3424     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3425
3426     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3427       gst_element_post_message (GST_ELEMENT (sink),
3428           gst_message_new_segment_start (GST_OBJECT (sink),
3429               sink->segment.format, sink->segment.last_stop));
3430     }
3431   }
3432
3433   sink->priv->discont = TRUE;
3434   sink->abidata.ABI.running = TRUE;
3435
3436   GST_PAD_STREAM_UNLOCK (pad);
3437
3438   return res;
3439 }
3440
3441 static void
3442 set_step_info (GstBaseSink * sink, GstStepInfo * info, guint seqnum,
3443     GstFormat format, guint64 amount, gdouble rate, gboolean flush,
3444     gboolean intermediate)
3445 {
3446   GST_OBJECT_LOCK (sink);
3447   info->seqnum = seqnum;
3448   info->format = format;
3449   info->amount = amount;
3450   info->position = 0;
3451   info->rate = rate;
3452   info->flush = flush;
3453   info->intermediate = intermediate;
3454   info->valid = TRUE;
3455   GST_OBJECT_UNLOCK (sink);
3456 }
3457
3458 static gboolean
3459 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3460 {
3461   GstBaseSinkPrivate *priv;
3462   GstBaseSinkClass *bclass;
3463   gboolean flush, intermediate;
3464   gdouble rate;
3465   GstFormat format;
3466   guint64 amount;
3467   guint seqnum;
3468   GstStepInfo *pending;
3469
3470   bclass = GST_BASE_SINK_GET_CLASS (sink);
3471   priv = sink->priv;
3472
3473   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3474
3475   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3476   seqnum = gst_event_get_seqnum (event);
3477
3478   pending = &priv->pending_step;
3479
3480   if (flush) {
3481     /* we need to call ::unlock before locking PREROLL_LOCK
3482      * since we lock it before going into ::render */
3483     if (bclass->unlock)
3484       bclass->unlock (sink);
3485
3486     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3487     /* now that we have the PREROLL lock, clear our unlock request */
3488     if (bclass->unlock_stop)
3489       bclass->unlock_stop (sink);
3490
3491     /* update the stepinfo and make it valid */
3492     set_step_info (sink, pending, seqnum, format, amount, rate, flush,
3493         intermediate);
3494
3495     if (sink->priv->async_enabled) {
3496       /* and we need to commit our state again on the next
3497        * prerolled buffer */
3498       sink->playing_async = TRUE;
3499       priv->pending_step.need_preroll = TRUE;
3500       sink->need_preroll = FALSE;
3501       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3502     } else {
3503       sink->priv->have_latency = TRUE;
3504       sink->need_preroll = FALSE;
3505     }
3506     priv->current_sstart = -1;
3507     priv->current_sstop = -1;
3508     priv->eos_rtime = -1;
3509     priv->call_preroll = TRUE;
3510     gst_base_sink_set_last_buffer (sink, NULL);
3511     gst_base_sink_reset_qos (sink);
3512
3513     if (sink->clock_id) {
3514       gst_clock_id_unschedule (sink->clock_id);
3515     }
3516
3517     if (sink->have_preroll) {
3518       GST_DEBUG_OBJECT (sink, "signal waiter");
3519       priv->step_unlock = TRUE;
3520       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3521     }
3522     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3523   } else {
3524     /* update the stepinfo and make it valid */
3525     set_step_info (sink, pending, seqnum, format, amount, rate, flush,
3526         intermediate);
3527   }
3528
3529   return TRUE;
3530 }
3531
3532 /* with STREAM_LOCK
3533  */
3534 static void
3535 gst_base_sink_loop (GstPad * pad)
3536 {
3537   GstBaseSink *basesink;
3538   GstBuffer *buf = NULL;
3539   GstFlowReturn result;
3540   guint blocksize;
3541   guint64 offset;
3542
3543   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3544
3545   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3546
3547   if ((blocksize = basesink->priv->blocksize) == 0)
3548     blocksize = -1;
3549
3550   offset = basesink->segment.last_stop;
3551
3552   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3553       offset, blocksize);
3554
3555   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3556   if (G_UNLIKELY (result != GST_FLOW_OK))
3557     goto paused;
3558
3559   if (G_UNLIKELY (buf == NULL))
3560     goto no_buffer;
3561
3562   offset += GST_BUFFER_SIZE (buf);
3563
3564   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3565
3566   GST_PAD_PREROLL_LOCK (pad);
3567   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3568   GST_PAD_PREROLL_UNLOCK (pad);
3569   if (G_UNLIKELY (result != GST_FLOW_OK))
3570     goto paused;
3571
3572   return;
3573
3574   /* ERRORS */
3575 paused:
3576   {
3577     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3578         gst_flow_get_name (result));
3579     gst_pad_pause_task (pad);
3580     /* fatal errors and NOT_LINKED cause EOS */
3581     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3582       if (result == GST_FLOW_UNEXPECTED) {
3583         /* perform EOS logic */
3584         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3585           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3586               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3587                   basesink->segment.format, basesink->segment.last_stop));
3588         } else {
3589           gst_base_sink_event (pad, gst_event_new_eos ());
3590         }
3591       } else {
3592         /* for fatal errors we post an error message, post the error
3593          * first so the app knows about the error first. */
3594         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3595             (_("Internal data stream error.")),
3596             ("stream stopped, reason %s", gst_flow_get_name (result)));
3597         gst_base_sink_event (pad, gst_event_new_eos ());
3598       }
3599     }
3600     return;
3601   }
3602 no_buffer:
3603   {
3604     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3605     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3606         (_("Internal data flow error.")), ("element returned NULL buffer"));
3607     result = GST_FLOW_ERROR;
3608     goto paused;
3609   }
3610 }
3611
3612 static gboolean
3613 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3614     gboolean flushing)
3615 {
3616   GstBaseSinkClass *bclass;
3617
3618   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3619
3620   if (flushing) {
3621     /* unlock any subclasses, we need to do this before grabbing the
3622      * PREROLL_LOCK since we hold this lock before going into ::render. */
3623     if (bclass->unlock)
3624       bclass->unlock (basesink);
3625   }
3626
3627   GST_PAD_PREROLL_LOCK (pad);
3628   basesink->flushing = flushing;
3629   if (flushing) {
3630     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3631     if (bclass->unlock_stop)
3632       bclass->unlock_stop (basesink);
3633
3634     /* set need_preroll before we unblock the clock. If the clock is unblocked
3635      * before timing out, we can reuse the buffer for preroll. */
3636     basesink->need_preroll = TRUE;
3637
3638     /* step 2, unblock clock sync (if any) or any other blocking thing */
3639     if (basesink->clock_id) {
3640       gst_clock_id_unschedule (basesink->clock_id);
3641     }
3642
3643     /* flush out the data thread if it's locked in finish_preroll, this will
3644      * also flush out the EOS state */
3645     GST_DEBUG_OBJECT (basesink,
3646         "flushing out data thread, need preroll to TRUE");
3647     gst_base_sink_preroll_queue_flush (basesink, pad);
3648   }
3649   GST_PAD_PREROLL_UNLOCK (pad);
3650
3651   return TRUE;
3652 }
3653
3654 static gboolean
3655 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3656 {
3657   gboolean result;
3658
3659   if (active) {
3660     /* start task */
3661     result = gst_pad_start_task (basesink->sinkpad,
3662         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3663   } else {
3664     /* step 2, make sure streaming finishes */
3665     result = gst_pad_stop_task (basesink->sinkpad);
3666   }
3667
3668   return result;
3669 }
3670
3671 static gboolean
3672 gst_base_sink_pad_activate (GstPad * pad)
3673 {
3674   gboolean result = FALSE;
3675   GstBaseSink *basesink;
3676
3677   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3678
3679   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3680
3681   gst_base_sink_set_flushing (basesink, pad, FALSE);
3682
3683   /* we need to have the pull mode enabled */
3684   if (!basesink->can_activate_pull) {
3685     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3686     goto fallback;
3687   }
3688
3689   /* check if downstreams supports pull mode at all */
3690   if (!gst_pad_check_pull_range (pad)) {
3691     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3692     goto fallback;
3693   }
3694
3695   /* set the pad mode before starting the task so that it's in the
3696    * correct state for the new thread. also the sink set_caps and get_caps
3697    * function checks this */
3698   basesink->pad_mode = GST_ACTIVATE_PULL;
3699
3700   /* we first try to negotiate a format so that when we try to activate
3701    * downstream, it knows about our format */
3702   if (!gst_base_sink_negotiate_pull (basesink)) {
3703     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3704     goto fallback;
3705   }
3706
3707   /* ok activate now */
3708   if (!gst_pad_activate_pull (pad, TRUE)) {
3709     /* clear any pending caps */
3710     GST_OBJECT_LOCK (basesink);
3711     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3712     GST_OBJECT_UNLOCK (basesink);
3713     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3714     goto fallback;
3715   }
3716
3717   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3718   result = TRUE;
3719   goto done;
3720
3721   /* push mode fallback */
3722 fallback:
3723   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3724   if ((result = gst_pad_activate_push (pad, TRUE))) {
3725     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3726   }
3727
3728 done:
3729   if (!result) {
3730     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3731     gst_base_sink_set_flushing (basesink, pad, TRUE);
3732   }
3733
3734   gst_object_unref (basesink);
3735
3736   return result;
3737 }
3738
3739 static gboolean
3740 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3741 {
3742   gboolean result;
3743   GstBaseSink *basesink;
3744
3745   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3746
3747   if (active) {
3748     if (!basesink->can_activate_push) {
3749       result = FALSE;
3750       basesink->pad_mode = GST_ACTIVATE_NONE;
3751     } else {
3752       result = TRUE;
3753       basesink->pad_mode = GST_ACTIVATE_PUSH;
3754     }
3755   } else {
3756     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3757       g_warning ("Internal GStreamer activation error!!!");
3758       result = FALSE;
3759     } else {
3760       gst_base_sink_set_flushing (basesink, pad, TRUE);
3761       result = TRUE;
3762       basesink->pad_mode = GST_ACTIVATE_NONE;
3763     }
3764   }
3765
3766   gst_object_unref (basesink);
3767
3768   return result;
3769 }
3770
3771 static gboolean
3772 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3773 {
3774   GstCaps *caps;
3775   gboolean result;
3776
3777   result = FALSE;
3778
3779   /* this returns the intersection between our caps and the peer caps. If there
3780    * is no peer, it returns NULL and we can't operate in pull mode so we can
3781    * fail the negotiation. */
3782   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3783   if (caps == NULL || gst_caps_is_empty (caps))
3784     goto no_caps_possible;
3785
3786   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3787
3788   caps = gst_caps_make_writable (caps);
3789   /* get the first (prefered) format */
3790   gst_caps_truncate (caps);
3791   /* try to fixate */
3792   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3793
3794   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3795
3796   if (gst_caps_is_any (caps)) {
3797     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3798         "allowing pull()");
3799     /* neither side has template caps in this case, so they are prepared for
3800        pull() without setcaps() */
3801     result = TRUE;
3802   } else if (gst_caps_is_fixed (caps)) {
3803     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3804       goto could_not_set_caps;
3805
3806     GST_OBJECT_LOCK (basesink);
3807     gst_caps_replace (&basesink->priv->pull_caps, caps);
3808     GST_OBJECT_UNLOCK (basesink);
3809
3810     result = TRUE;
3811   }
3812
3813   gst_caps_unref (caps);
3814
3815   return result;
3816
3817 no_caps_possible:
3818   {
3819     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3820     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3821     if (caps)
3822       gst_caps_unref (caps);
3823     return FALSE;
3824   }
3825 could_not_set_caps:
3826   {
3827     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3828     gst_caps_unref (caps);
3829     return FALSE;
3830   }
3831 }
3832
3833 /* this won't get called until we implement an activate function */
3834 static gboolean
3835 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3836 {
3837   gboolean result = FALSE;
3838   GstBaseSink *basesink;
3839   GstBaseSinkClass *bclass;
3840
3841   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3842   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3843
3844   if (active) {
3845     GstFormat format;
3846     gint64 duration;
3847
3848     /* we mark we have a newsegment here because pull based
3849      * mode works just fine without having a newsegment before the
3850      * first buffer */
3851     format = GST_FORMAT_BYTES;
3852
3853     gst_segment_init (&basesink->segment, format);
3854     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3855     GST_OBJECT_LOCK (basesink);
3856     basesink->have_newsegment = TRUE;
3857     GST_OBJECT_UNLOCK (basesink);
3858
3859     /* get the peer duration in bytes */
3860     result = gst_pad_query_peer_duration (pad, &format, &duration);
3861     if (result) {
3862       GST_DEBUG_OBJECT (basesink,
3863           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3864       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3865           duration);
3866       gst_segment_set_duration (&basesink->segment, format, duration);
3867     } else {
3868       GST_DEBUG_OBJECT (basesink, "unknown duration");
3869     }
3870
3871     if (bclass->activate_pull)
3872       result = bclass->activate_pull (basesink, TRUE);
3873     else
3874       result = FALSE;
3875
3876     if (!result)
3877       goto activate_failed;
3878
3879   } else {
3880     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3881       g_warning ("Internal GStreamer activation error!!!");
3882       result = FALSE;
3883     } else {
3884       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3885       if (bclass->activate_pull)
3886         result &= bclass->activate_pull (basesink, FALSE);
3887       basesink->pad_mode = GST_ACTIVATE_NONE;
3888       /* clear any pending caps */
3889       GST_OBJECT_LOCK (basesink);
3890       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3891       GST_OBJECT_UNLOCK (basesink);
3892     }
3893   }
3894   gst_object_unref (basesink);
3895
3896   return result;
3897
3898   /* ERRORS */
3899 activate_failed:
3900   {
3901     /* reset, as starting the thread failed */
3902     basesink->pad_mode = GST_ACTIVATE_NONE;
3903
3904     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3905     return FALSE;
3906   }
3907 }
3908
3909 /* send an event to our sinkpad peer. */
3910 static gboolean
3911 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3912 {
3913   GstPad *pad;
3914   GstBaseSink *basesink = GST_BASE_SINK (element);
3915   gboolean forward, result = TRUE;
3916   GstActivateMode mode;
3917
3918   GST_OBJECT_LOCK (element);
3919   /* get the pad and the scheduling mode */
3920   pad = gst_object_ref (basesink->sinkpad);
3921   mode = basesink->pad_mode;
3922   GST_OBJECT_UNLOCK (element);
3923
3924   /* only push UPSTREAM events upstream */
3925   forward = GST_EVENT_IS_UPSTREAM (event);
3926
3927   switch (GST_EVENT_TYPE (event)) {
3928     case GST_EVENT_LATENCY:
3929     {
3930       GstClockTime latency;
3931
3932       gst_event_parse_latency (event, &latency);
3933
3934       /* store the latency. We use this to adjust the running_time before syncing
3935        * it to the clock. */
3936       GST_OBJECT_LOCK (element);
3937       basesink->priv->latency = latency;
3938       if (!basesink->priv->have_latency)
3939         forward = FALSE;
3940       GST_OBJECT_UNLOCK (element);
3941       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3942           GST_TIME_ARGS (latency));
3943
3944       /* We forward this event so that all elements know about the global pipeline
3945        * latency. This is interesting for an element when it wants to figure out
3946        * when a particular piece of data will be rendered. */
3947       break;
3948     }
3949     case GST_EVENT_SEEK:
3950       /* in pull mode we will execute the seek */
3951       if (mode == GST_ACTIVATE_PULL)
3952         result = gst_base_sink_perform_seek (basesink, pad, event);
3953       break;
3954     case GST_EVENT_STEP:
3955       result = gst_base_sink_perform_step (basesink, pad, event);
3956       forward = FALSE;
3957       break;
3958     default:
3959       break;
3960   }
3961
3962   if (forward) {
3963     result = gst_pad_push_event (pad, event);
3964   } else {
3965     /* not forwarded, unref the event */
3966     gst_event_unref (event);
3967   }
3968
3969   gst_object_unref (pad);
3970   return result;
3971 }
3972
3973 static gboolean
3974 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3975 {
3976   GstPad *peer;
3977   gboolean res = FALSE;
3978
3979   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3980     res = gst_pad_query (peer, query);
3981     gst_object_unref (peer);
3982   }
3983   return res;
3984 }
3985
3986 /* get the end position of the last seen object, this is used
3987  * for EOS and for making sure that we don't report a position we
3988  * have not reached yet. With LOCK. */
3989 static gboolean
3990 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
3991     gint64 * cur)
3992 {
3993   GstFormat oformat;
3994   GstSegment *segment;
3995   gboolean ret = TRUE;
3996
3997   segment = &basesink->segment;
3998   oformat = segment->format;
3999
4000   if (oformat == GST_FORMAT_TIME) {
4001     /* return last observed stream time, we keep the stream time around in the
4002      * time format. */
4003     *cur = basesink->priv->current_sstop;
4004   } else {
4005     /* convert last stop to stream time */
4006     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4007   }
4008
4009   if (*cur != -1 && oformat != format) {
4010     GST_OBJECT_UNLOCK (basesink);
4011     /* convert to the target format if we need to, release lock first */
4012     ret =
4013         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4014     if (!ret)
4015       *cur = -1;
4016     GST_OBJECT_LOCK (basesink);
4017   }
4018
4019   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
4020       GST_TIME_ARGS (*cur));
4021
4022   return ret;
4023 }
4024
4025 /* get the position when we are PAUSED, this is the stream time of the buffer
4026  * that prerolled. If no buffer is prerolled (we are still flushing), this
4027  * value will be -1. With LOCK. */
4028 static gboolean
4029 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4030     gint64 * cur)
4031 {
4032   gboolean res;
4033   gint64 time;
4034   GstSegment *segment;
4035   GstFormat oformat;
4036
4037   /* we don't use the clip segment in pull mode, when seeking we update the
4038    * main segment directly with the new segment values without it having to be
4039    * activated by the rendering after preroll */
4040   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4041     segment = basesink->abidata.ABI.clip_segment;
4042   else
4043     segment = &basesink->segment;
4044   oformat = segment->format;
4045
4046   if (oformat == GST_FORMAT_TIME) {
4047     *cur = basesink->priv->current_sstart;
4048     if (segment->rate < 0.0 && basesink->priv->current_sstop != -1) {
4049       /* for reverse playback we prefer the stream time stop position if we have
4050        * one */
4051       *cur = basesink->priv->current_sstop;
4052     }
4053   } else {
4054     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4055   }
4056
4057   time = segment->time;
4058
4059   if (*cur != -1) {
4060     *cur = MAX (*cur, time);
4061     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4062         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4063   } else {
4064     /* we have no buffer, use the segment times. */
4065     if (segment->rate >= 0.0) {
4066       /* forward, next position is always the time of the segment */
4067       *cur = time;
4068       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4069           GST_TIME_ARGS (*cur));
4070     } else {
4071       /* reverse, next expected timestamp is segment->stop. We use the function
4072        * to get things right for negative applied_rates. */
4073       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4074       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4075           GST_TIME_ARGS (*cur));
4076     }
4077   }
4078
4079   res = (*cur != -1);
4080   if (res && oformat != format) {
4081     GST_OBJECT_UNLOCK (basesink);
4082     res =
4083         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4084     if (!res)
4085       *cur = -1;
4086     GST_OBJECT_LOCK (basesink);
4087   }
4088
4089   return res;
4090 }
4091
4092 static gboolean
4093 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4094     gint64 * cur, gboolean * upstream)
4095 {
4096   GstClock *clock;
4097   gboolean res = FALSE;
4098   GstFormat oformat, tformat;
4099   GstClockTime now, base, latency;
4100   gint64 time, accum, duration;
4101   gdouble rate;
4102   gint64 last;
4103
4104   GST_OBJECT_LOCK (basesink);
4105   /* our intermediate time format */
4106   tformat = GST_FORMAT_TIME;
4107   /* get the format in the segment */
4108   oformat = basesink->segment.format;
4109
4110   /* can only give answer based on the clock if not EOS */
4111   if (G_UNLIKELY (basesink->eos))
4112     goto in_eos;
4113
4114   /* we can only get the segment when we are not NULL or READY */
4115   if (!basesink->have_newsegment)
4116     goto wrong_state;
4117
4118   /* when not in PLAYING or when we're busy with a state change, we
4119    * cannot read from the clock so we report time based on the
4120    * last seen timestamp. */
4121   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4122       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4123     goto in_pause;
4124
4125   /* we need to sync on the clock. */
4126   if (basesink->sync == FALSE)
4127     goto no_sync;
4128
4129   /* and we need a clock */
4130   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4131     goto no_sync;
4132
4133   /* collect all data we need holding the lock */
4134   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4135     time = basesink->segment.time;
4136   else
4137     time = 0;
4138
4139   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4140     duration = basesink->segment.stop - basesink->segment.start;
4141   else
4142     duration = 0;
4143
4144   base = GST_ELEMENT_CAST (basesink)->base_time;
4145   accum = basesink->segment.accum;
4146   rate = basesink->segment.rate * basesink->segment.applied_rate;
4147   latency = basesink->priv->latency;
4148
4149   gst_object_ref (clock);
4150
4151   /* this function might release the LOCK */
4152   gst_base_sink_get_position_last (basesink, format, &last);
4153
4154   /* need to release the object lock before we can get the time, 
4155    * a clock might take the LOCK of the provider, which could be
4156    * a basesink subclass. */
4157   GST_OBJECT_UNLOCK (basesink);
4158
4159   now = gst_clock_get_time (clock);
4160
4161   if (oformat != tformat) {
4162     /* convert accum, time and duration to time */
4163     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4164             &accum))
4165       goto convert_failed;
4166     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4167             &duration))
4168       goto convert_failed;
4169     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4170             &time))
4171       goto convert_failed;
4172   }
4173
4174   /* subtract base time and accumulated time from the clock time. 
4175    * Make sure we don't go negative. This is the current time in
4176    * the segment which we need to scale with the combined 
4177    * rate and applied rate. */
4178   base += accum;
4179   base += latency;
4180   base = MIN (now, base);
4181
4182   /* for negative rates we need to count back from from the segment
4183    * duration. */
4184   if (rate < 0.0)
4185     time += duration;
4186
4187   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4188
4189   /* never report more than last seen position */
4190   if (last != -1)
4191     *cur = MIN (last, *cur);
4192
4193   gst_object_unref (clock);
4194
4195   GST_DEBUG_OBJECT (basesink,
4196       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4197       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4198       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4199       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4200
4201   if (oformat != format) {
4202     /* convert time to final format */
4203     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4204       goto convert_failed;
4205   }
4206
4207   res = TRUE;
4208
4209 done:
4210   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4211       res, GST_TIME_ARGS (*cur));
4212   return res;
4213
4214   /* special cases */
4215 in_eos:
4216   {
4217     GST_DEBUG_OBJECT (basesink, "position in EOS");
4218     res = gst_base_sink_get_position_last (basesink, format, cur);
4219     GST_OBJECT_UNLOCK (basesink);
4220     goto done;
4221   }
4222 in_pause:
4223   {
4224     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4225     res = gst_base_sink_get_position_paused (basesink, format, cur);
4226     GST_OBJECT_UNLOCK (basesink);
4227     goto done;
4228   }
4229 wrong_state:
4230   {
4231     /* in NULL or READY we always return FALSE and -1 */
4232     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4233     res = FALSE;
4234     *cur = -1;
4235     GST_OBJECT_UNLOCK (basesink);
4236     goto done;
4237   }
4238 no_sync:
4239   {
4240     /* report last seen timestamp if any, else ask upstream to answer */
4241     if ((*cur = basesink->priv->current_sstart) != -1)
4242       res = TRUE;
4243     else
4244       *upstream = TRUE;
4245
4246     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4247         res, GST_TIME_ARGS (*cur));
4248     GST_OBJECT_UNLOCK (basesink);
4249     return res;
4250   }
4251 convert_failed:
4252   {
4253     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4254     *upstream = TRUE;
4255     return FALSE;
4256   }
4257 }
4258
4259 static gboolean
4260 gst_base_sink_query (GstElement * element, GstQuery * query)
4261 {
4262   gboolean res = FALSE;
4263
4264   GstBaseSink *basesink = GST_BASE_SINK (element);
4265
4266   switch (GST_QUERY_TYPE (query)) {
4267     case GST_QUERY_POSITION:
4268     {
4269       gint64 cur = 0;
4270       GstFormat format;
4271       gboolean upstream = FALSE;
4272
4273       gst_query_parse_position (query, &format, NULL);
4274
4275       GST_DEBUG_OBJECT (basesink, "position format %d", format);
4276
4277       /* first try to get the position based on the clock */
4278       if ((res =
4279               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4280         gst_query_set_position (query, format, cur);
4281       } else if (upstream) {
4282         /* fallback to peer query */
4283         res = gst_base_sink_peer_query (basesink, query);
4284       }
4285       break;
4286     }
4287     case GST_QUERY_DURATION:
4288     {
4289       GstFormat format, uformat;
4290       gint64 duration, uduration;
4291
4292       gst_query_parse_duration (query, &format, NULL);
4293
4294       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4295           gst_format_get_name (format));
4296
4297       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4298         uformat = GST_FORMAT_BYTES;
4299
4300         /* get the duration in bytes, in pull mode that's all we are sure to
4301          * know. We have to explicitly get this value from upstream instead of
4302          * using our cached value because it might change. Duration caching
4303          * should be done at a higher level. */
4304         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4305             &uduration);
4306         if (res) {
4307           gst_segment_set_duration (&basesink->segment, uformat, uduration);
4308           if (format != uformat) {
4309             /* convert to the requested format */
4310             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4311                 &format, &duration);
4312           } else {
4313             duration = uduration;
4314           }
4315           if (res) {
4316             /* set the result */
4317             gst_query_set_duration (query, format, duration);
4318           }
4319         }
4320       } else {
4321         /* in push mode we simply forward upstream */
4322         res = gst_base_sink_peer_query (basesink, query);
4323       }
4324       break;
4325     }
4326     case GST_QUERY_LATENCY:
4327     {
4328       gboolean live, us_live;
4329       GstClockTime min, max;
4330
4331       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4332                   &max))) {
4333         gst_query_set_latency (query, live, min, max);
4334       }
4335       break;
4336     }
4337     case GST_QUERY_JITTER:
4338       break;
4339     case GST_QUERY_RATE:
4340       /* gst_query_set_rate (query, basesink->segment_rate); */
4341       res = TRUE;
4342       break;
4343     case GST_QUERY_SEGMENT:
4344     {
4345       /* FIXME, bring start/stop to stream time */
4346       gst_query_set_segment (query, basesink->segment.rate,
4347           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4348       break;
4349     }
4350     case GST_QUERY_SEEKING:
4351     case GST_QUERY_CONVERT:
4352     case GST_QUERY_FORMATS:
4353     default:
4354       res = gst_base_sink_peer_query (basesink, query);
4355       break;
4356   }
4357   return res;
4358 }
4359
4360 static GstStateChangeReturn
4361 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4362 {
4363   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4364   GstBaseSink *basesink = GST_BASE_SINK (element);
4365   GstBaseSinkClass *bclass;
4366   GstBaseSinkPrivate *priv;
4367
4368   priv = basesink->priv;
4369
4370   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4371
4372   switch (transition) {
4373     case GST_STATE_CHANGE_NULL_TO_READY:
4374       if (bclass->start)
4375         if (!bclass->start (basesink))
4376           goto start_failed;
4377       break;
4378     case GST_STATE_CHANGE_READY_TO_PAUSED:
4379       /* need to complete preroll before this state change completes, there
4380        * is no data flow in READY so we can safely assume we need to preroll. */
4381       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4382       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4383       basesink->have_newsegment = FALSE;
4384       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4385       gst_segment_init (basesink->abidata.ABI.clip_segment,
4386           GST_FORMAT_UNDEFINED);
4387       basesink->offset = 0;
4388       basesink->have_preroll = FALSE;
4389       priv->step_unlock = FALSE;
4390       basesink->need_preroll = TRUE;
4391       basesink->playing_async = TRUE;
4392       priv->current_sstart = -1;
4393       priv->current_sstop = -1;
4394       priv->eos_rtime = -1;
4395       priv->latency = 0;
4396       basesink->eos = FALSE;
4397       priv->received_eos = FALSE;
4398       gst_base_sink_reset_qos (basesink);
4399       priv->commited = FALSE;
4400       priv->call_preroll = TRUE;
4401       priv->current_step.valid = FALSE;
4402       priv->pending_step.valid = FALSE;
4403       if (priv->async_enabled) {
4404         GST_DEBUG_OBJECT (basesink, "doing async state change");
4405         /* when async enabled, post async-start message and return ASYNC from
4406          * the state change function */
4407         ret = GST_STATE_CHANGE_ASYNC;
4408         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4409             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4410       } else {
4411         priv->have_latency = TRUE;
4412       }
4413       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4414       break;
4415     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4416       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4417       if (!gst_base_sink_needs_preroll (basesink)) {
4418         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4419         /* no preroll needed anymore now. */
4420         basesink->playing_async = FALSE;
4421         basesink->need_preroll = FALSE;
4422         if (basesink->eos) {
4423           GstMessage *message;
4424
4425           /* need to post EOS message here */
4426           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4427           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4428           gst_message_set_seqnum (message, basesink->priv->seqnum);
4429           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4430         } else {
4431           GST_DEBUG_OBJECT (basesink, "signal preroll");
4432           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4433         }
4434       } else {
4435         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4436         basesink->need_preroll = TRUE;
4437         basesink->playing_async = TRUE;
4438         priv->call_preroll = TRUE;
4439         priv->commited = FALSE;
4440         if (priv->async_enabled) {
4441           GST_DEBUG_OBJECT (basesink, "doing async state change");
4442           ret = GST_STATE_CHANGE_ASYNC;
4443           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4444               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4445         }
4446       }
4447       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4448       break;
4449     default:
4450       break;
4451   }
4452
4453   {
4454     GstStateChangeReturn bret;
4455
4456     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4457     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4458       goto activate_failed;
4459   }
4460
4461   switch (transition) {
4462     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4463       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4464       /* FIXME, make sure we cannot enter _render first */
4465
4466       /* we need to call ::unlock before locking PREROLL_LOCK
4467        * since we lock it before going into ::render */
4468       if (bclass->unlock)
4469         bclass->unlock (basesink);
4470
4471       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4472       /* now that we have the PREROLL lock, clear our unlock request */
4473       if (bclass->unlock_stop)
4474         bclass->unlock_stop (basesink);
4475
4476       /* we need preroll again and we set the flag before unlocking the clockid
4477        * because if the clockid is unlocked before a current buffer expired, we
4478        * can use that buffer to preroll with */
4479       basesink->need_preroll = TRUE;
4480
4481       if (basesink->clock_id) {
4482         gst_clock_id_unschedule (basesink->clock_id);
4483       }
4484
4485       /* if we don't have a preroll buffer we need to wait for a preroll and
4486        * return ASYNC. */
4487       if (!gst_base_sink_needs_preroll (basesink)) {
4488         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4489         basesink->playing_async = FALSE;
4490       } else {
4491         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4492           ret = GST_STATE_CHANGE_SUCCESS;
4493         } else {
4494           GST_DEBUG_OBJECT (basesink,
4495               "PLAYING to PAUSED, we are not prerolled");
4496           basesink->playing_async = TRUE;
4497           priv->commited = FALSE;
4498           priv->call_preroll = TRUE;
4499           if (priv->async_enabled) {
4500             GST_DEBUG_OBJECT (basesink, "doing async state change");
4501             ret = GST_STATE_CHANGE_ASYNC;
4502             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4503                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4504                     FALSE));
4505           }
4506         }
4507       }
4508       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4509           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4510
4511       gst_base_sink_reset_qos (basesink);
4512       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4513       break;
4514     case GST_STATE_CHANGE_PAUSED_TO_READY:
4515       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4516       /* start by reseting our position state with the object lock so that the
4517        * position query gets the right idea. We do this before we post the
4518        * messages so that the message handlers pick this up. */
4519       GST_OBJECT_LOCK (basesink);
4520       basesink->have_newsegment = FALSE;
4521       priv->current_sstart = -1;
4522       priv->current_sstop = -1;
4523       priv->have_latency = FALSE;
4524       GST_OBJECT_UNLOCK (basesink);
4525
4526       gst_base_sink_set_last_buffer (basesink, NULL);
4527       priv->call_preroll = FALSE;
4528
4529       if (!priv->commited) {
4530         if (priv->async_enabled) {
4531           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4532
4533           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4534               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4535                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4536
4537           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4538               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4539         }
4540         priv->commited = TRUE;
4541       } else {
4542         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4543       }
4544       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4545       break;
4546     case GST_STATE_CHANGE_READY_TO_NULL:
4547       if (bclass->stop) {
4548         if (!bclass->stop (basesink)) {
4549           GST_WARNING_OBJECT (basesink, "failed to stop");
4550         }
4551       }
4552       gst_base_sink_set_last_buffer (basesink, NULL);
4553       priv->call_preroll = FALSE;
4554       break;
4555     default:
4556       break;
4557   }
4558
4559   return ret;
4560
4561   /* ERRORS */
4562 start_failed:
4563   {
4564     GST_DEBUG_OBJECT (basesink, "failed to start");
4565     return GST_STATE_CHANGE_FAILURE;
4566   }
4567 activate_failed:
4568   {
4569     GST_DEBUG_OBJECT (basesink,
4570         "element failed to change states -- activation problem?");
4571     return GST_STATE_CHANGE_FAILURE;
4572   }
4573 }