basesink: Pass along miniobject type through various functions
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199
200   /* EOS sync time in running time */
201   GstClockTime eos_rtime;
202
203   /* last buffer that arrived in time, running time */
204   GstClockTime last_in_time;
205   /* when the last buffer left the sink, running time */
206   GstClockTime last_left;
207
208   /* running averages go here these are done on running time */
209   GstClockTime avg_pt;
210   GstClockTime avg_duration;
211   gdouble avg_rate;
212
213   /* these are done on system time. avg_jitter and avg_render are
214    * compared to eachother to see if the rendering time takes a
215    * huge amount of the processing, If so we are flooded with
216    * buffers. */
217   GstClockTime last_left_systime;
218   GstClockTime avg_jitter;
219   GstClockTime start, stop;
220   GstClockTime avg_render;
221
222   /* number of rendered and dropped frames */
223   guint64 rendered;
224   guint64 dropped;
225
226   /* latency stuff */
227   GstClockTime latency;
228
229   /* if we already commited the state */
230   gboolean commited;
231
232   /* when we received EOS */
233   gboolean received_eos;
234
235   /* when we are prerolled and able to report latency */
236   gboolean have_latency;
237
238   /* the last buffer we prerolled or rendered. Useful for making snapshots */
239   gint enable_last_buffer;      /* atomic */
240   GstBuffer *last_buffer;
241
242   /* caps for pull based scheduling */
243   GstCaps *pull_caps;
244
245   /* blocksize for pulling */
246   guint blocksize;
247
248   gboolean discont;
249
250   /* seqnum of the stream */
251   guint32 seqnum;
252
253   gboolean call_preroll;
254   gboolean step_unlock;
255
256   /* we have a pending and a current step operation */
257   GstStepInfo current_step;
258   GstStepInfo pending_step;
259 };
260
261 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
262
263 /* generic running average, this has a neutral window size */
264 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
265
266 /* the windows for these running averages are experimentally obtained.
267  * possitive values get averaged more while negative values use a small
268  * window so we can react faster to badness. */
269 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
270 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
271
272 enum
273 {
274   _PR_IS_NOTHING = 1 << 0,
275   _PR_IS_BUFFER = 1 << 1,
276   _PR_IS_BUFFERLIST = 1 << 2,
277   _PR_IS_EVENT = 1 << 3
278 } PrivateObjectType;
279
280 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
281 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
282 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
283 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
284
285 /* BaseSink properties */
286
287 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
288 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
289
290 #define DEFAULT_PREROLL_QUEUE_LEN   0
291 #define DEFAULT_SYNC                TRUE
292 #define DEFAULT_MAX_LATENESS        -1
293 #define DEFAULT_QOS                 FALSE
294 #define DEFAULT_ASYNC               TRUE
295 #define DEFAULT_TS_OFFSET           0
296 #define DEFAULT_BLOCKSIZE           4096
297 #define DEFAULT_RENDER_DELAY        0
298 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
299
300 enum
301 {
302   PROP_0,
303   PROP_PREROLL_QUEUE_LEN,
304   PROP_SYNC,
305   PROP_MAX_LATENESS,
306   PROP_QOS,
307   PROP_ASYNC,
308   PROP_TS_OFFSET,
309   PROP_ENABLE_LAST_BUFFER,
310   PROP_LAST_BUFFER,
311   PROP_BLOCKSIZE,
312   PROP_RENDER_DELAY,
313   PROP_LAST
314 };
315
316 static GstElementClass *parent_class = NULL;
317
318 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
319 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
320 static void gst_base_sink_finalize (GObject * object);
321
322 GType
323 gst_base_sink_get_type (void)
324 {
325   static volatile gsize base_sink_type = 0;
326
327   if (g_once_init_enter (&base_sink_type)) {
328     GType _type;
329     static const GTypeInfo base_sink_info = {
330       sizeof (GstBaseSinkClass),
331       NULL,
332       NULL,
333       (GClassInitFunc) gst_base_sink_class_init,
334       NULL,
335       NULL,
336       sizeof (GstBaseSink),
337       0,
338       (GInstanceInitFunc) gst_base_sink_init,
339     };
340
341     _type = g_type_register_static (GST_TYPE_ELEMENT,
342         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
343     g_once_init_leave (&base_sink_type, _type);
344   }
345   return base_sink_type;
346 }
347
348 static void gst_base_sink_set_property (GObject * object, guint prop_id,
349     const GValue * value, GParamSpec * pspec);
350 static void gst_base_sink_get_property (GObject * object, guint prop_id,
351     GValue * value, GParamSpec * pspec);
352
353 static gboolean gst_base_sink_send_event (GstElement * element,
354     GstEvent * event);
355 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
356 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
357
358 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
359 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
360 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
361     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
362 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
363     GstClockTime * start, GstClockTime * end);
364 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
365     GstPad * pad, gboolean flushing);
366 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
367     gboolean active);
368 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
369     GstSegment * segment);
370 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
371     GstEvent * event, GstSegment * segment);
372
373 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
374     GstStateChange transition);
375
376 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
377 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
378     GstBufferList * list);
379
380 static void gst_base_sink_loop (GstPad * pad);
381 static gboolean gst_base_sink_pad_activate (GstPad * pad);
382 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
383 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
384 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
385
386 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
387 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
388 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
389 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
390 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
391     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
392
393
394 /* check if an object was too late */
395 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
396     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
397     GstClockReturn status, GstClockTimeDiff jitter);
398 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
399     guint8 obj_type, GstMiniObject * obj);
400
401 static void
402 gst_base_sink_class_init (GstBaseSinkClass * klass)
403 {
404   GObjectClass *gobject_class;
405   GstElementClass *gstelement_class;
406
407   gobject_class = G_OBJECT_CLASS (klass);
408   gstelement_class = GST_ELEMENT_CLASS (klass);
409
410   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
411       "basesink element");
412
413   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
414
415   parent_class = g_type_class_peek_parent (klass);
416
417   gobject_class->finalize = gst_base_sink_finalize;
418   gobject_class->set_property = gst_base_sink_set_property;
419   gobject_class->get_property = gst_base_sink_get_property;
420
421   /* FIXME, this next value should be configured using an event from the
422    * upstream element, ie, the BUFFER_SIZE event. */
423   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
424       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
425           "Number of buffers to queue during preroll", 0, G_MAXUINT,
426           DEFAULT_PREROLL_QUEUE_LEN,
427           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
428
429   g_object_class_install_property (gobject_class, PROP_SYNC,
430       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432
433   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
434       g_param_spec_int64 ("max-lateness", "Max Lateness",
435           "Maximum number of nanoseconds that a buffer can be late before it "
436           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
437           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438
439   g_object_class_install_property (gobject_class, PROP_QOS,
440       g_param_spec_boolean ("qos", "Qos",
441           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443   /**
444    * GstBaseSink:async
445    *
446    * If set to #TRUE, the basesink will perform asynchronous state changes.
447    * When set to #FALSE, the sink will not signal the parent when it prerolls.
448    * Use this option when dealing with sparse streams or when synchronisation is
449    * not required.
450    *
451    * Since: 0.10.15
452    */
453   g_object_class_install_property (gobject_class, PROP_ASYNC,
454       g_param_spec_boolean ("async", "Async",
455           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457   /**
458    * GstBaseSink:ts-offset
459    *
460    * Controls the final synchronisation, a negative value will render the buffer
461    * earlier while a positive value delays playback. This property can be
462    * used to fix synchronisation in bad files.
463    *
464    * Since: 0.10.15
465    */
466   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
467       g_param_spec_int64 ("ts-offset", "TS Offset",
468           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
469           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
470
471   /**
472    * GstBaseSink:enable-last-buffer
473    *
474    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
475    * reference to the last buffer arrived and the last-buffer property is always
476    * set to NULL. This can be useful if you need buffers to be released as soon
477    * as possible, eg. if you're using a buffer pool.
478    *
479    * Since: 0.10.30
480    */
481   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
482       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
483           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
484           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
485
486   /**
487    * GstBaseSink:last-buffer
488    *
489    * The last buffer that arrived in the sink and was used for preroll or for
490    * rendering. This property can be used to generate thumbnails. This property
491    * can be NULL when the sink has not yet received a bufer.
492    *
493    * Since: 0.10.15
494    */
495   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
496       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
497           "The last buffer received in the sink", GST_TYPE_BUFFER,
498           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
499   /**
500    * GstBaseSink:blocksize
501    *
502    * The amount of bytes to pull when operating in pull mode.
503    *
504    * Since: 0.10.22
505    */
506   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
507       g_param_spec_uint ("blocksize", "Block size",
508           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
509           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
510   /**
511    * GstBaseSink:render-delay
512    *
513    * The additional delay between synchronisation and actual rendering of the
514    * media. This property will add additional latency to the device in order to
515    * make other sinks compensate for the delay.
516    *
517    * Since: 0.10.22
518    */
519   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
520       g_param_spec_uint64 ("render-delay", "Render Delay",
521           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
522           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
523
524   gstelement_class->change_state =
525       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
526   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
527   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
528   gstelement_class->get_query_types =
529       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
530
531   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
532   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
533   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
534   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
535   klass->activate_pull =
536       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
537
538   /* Registering debug symbols for function pointers */
539   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
540   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
541   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
542   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
543   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
544   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
545   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
546   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
547   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
548   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
549 }
550
551 static GstCaps *
552 gst_base_sink_pad_getcaps (GstPad * pad)
553 {
554   GstBaseSinkClass *bclass;
555   GstBaseSink *bsink;
556   GstCaps *caps = NULL;
557
558   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
559   bclass = GST_BASE_SINK_GET_CLASS (bsink);
560
561   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
562     /* if we are operating in pull mode we only accept the negotiated caps */
563     GST_OBJECT_LOCK (pad);
564     if ((caps = GST_PAD_CAPS (pad)))
565       gst_caps_ref (caps);
566     GST_OBJECT_UNLOCK (pad);
567   }
568   if (caps == NULL) {
569     if (bclass->get_caps)
570       caps = bclass->get_caps (bsink);
571
572     if (caps == NULL) {
573       GstPadTemplate *pad_template;
574
575       pad_template =
576           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
577           "sink");
578       if (pad_template != NULL) {
579         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
580       }
581     }
582   }
583   gst_object_unref (bsink);
584
585   return caps;
586 }
587
588 static gboolean
589 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
590 {
591   GstBaseSinkClass *bclass;
592   GstBaseSink *bsink;
593   gboolean res = TRUE;
594
595   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
596   bclass = GST_BASE_SINK_GET_CLASS (bsink);
597
598   if (res && bclass->set_caps)
599     res = bclass->set_caps (bsink, caps);
600
601   gst_object_unref (bsink);
602
603   return res;
604 }
605
606 static void
607 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
608 {
609   GstBaseSinkClass *bclass;
610   GstBaseSink *bsink;
611
612   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
613   bclass = GST_BASE_SINK_GET_CLASS (bsink);
614
615   if (bclass->fixate)
616     bclass->fixate (bsink, caps);
617
618   gst_object_unref (bsink);
619 }
620
621 static GstFlowReturn
622 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
623     GstCaps * caps, GstBuffer ** buf)
624 {
625   GstBaseSinkClass *bclass;
626   GstBaseSink *bsink;
627   GstFlowReturn result = GST_FLOW_OK;
628
629   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
630   bclass = GST_BASE_SINK_GET_CLASS (bsink);
631
632   if (bclass->buffer_alloc)
633     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
634   else
635     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
636
637   gst_object_unref (bsink);
638
639   return result;
640 }
641
642 static void
643 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
644 {
645   GstPadTemplate *pad_template;
646   GstBaseSinkPrivate *priv;
647
648   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
649
650   pad_template =
651       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
652   g_return_if_fail (pad_template != NULL);
653
654   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
655
656   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
657   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
658   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
659   gst_pad_set_bufferalloc_function (basesink->sinkpad,
660       gst_base_sink_pad_buffer_alloc);
661   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
662   gst_pad_set_activatepush_function (basesink->sinkpad,
663       gst_base_sink_pad_activate_push);
664   gst_pad_set_activatepull_function (basesink->sinkpad,
665       gst_base_sink_pad_activate_pull);
666   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
667   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
668   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
669   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
670
671   basesink->pad_mode = GST_ACTIVATE_NONE;
672   basesink->preroll_queue = g_queue_new ();
673   basesink->abidata.ABI.clip_segment = gst_segment_new ();
674   priv->have_latency = FALSE;
675
676   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
677   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
678
679   basesink->sync = DEFAULT_SYNC;
680   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
681   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
682   priv->async_enabled = DEFAULT_ASYNC;
683   priv->ts_offset = DEFAULT_TS_OFFSET;
684   priv->render_delay = DEFAULT_RENDER_DELAY;
685   priv->blocksize = DEFAULT_BLOCKSIZE;
686   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
687
688   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
689 }
690
691 static void
692 gst_base_sink_finalize (GObject * object)
693 {
694   GstBaseSink *basesink;
695
696   basesink = GST_BASE_SINK (object);
697
698   g_queue_free (basesink->preroll_queue);
699   gst_segment_free (basesink->abidata.ABI.clip_segment);
700
701   G_OBJECT_CLASS (parent_class)->finalize (object);
702 }
703
704 /**
705  * gst_base_sink_set_sync:
706  * @sink: the sink
707  * @sync: the new sync value.
708  *
709  * Configures @sink to synchronize on the clock or not. When
710  * @sync is FALSE, incomming samples will be played as fast as
711  * possible. If @sync is TRUE, the timestamps of the incomming
712  * buffers will be used to schedule the exact render time of its
713  * contents.
714  *
715  * Since: 0.10.4
716  */
717 void
718 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
719 {
720   g_return_if_fail (GST_IS_BASE_SINK (sink));
721
722   GST_OBJECT_LOCK (sink);
723   sink->sync = sync;
724   GST_OBJECT_UNLOCK (sink);
725 }
726
727 /**
728  * gst_base_sink_get_sync:
729  * @sink: the sink
730  *
731  * Checks if @sink is currently configured to synchronize against the
732  * clock.
733  *
734  * Returns: TRUE if the sink is configured to synchronize against the clock.
735  *
736  * Since: 0.10.4
737  */
738 gboolean
739 gst_base_sink_get_sync (GstBaseSink * sink)
740 {
741   gboolean res;
742
743   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
744
745   GST_OBJECT_LOCK (sink);
746   res = sink->sync;
747   GST_OBJECT_UNLOCK (sink);
748
749   return res;
750 }
751
752 /**
753  * gst_base_sink_set_max_lateness:
754  * @sink: the sink
755  * @max_lateness: the new max lateness value.
756  *
757  * Sets the new max lateness value to @max_lateness. This value is
758  * used to decide if a buffer should be dropped or not based on the
759  * buffer timestamp and the current clock time. A value of -1 means
760  * an unlimited time.
761  *
762  * Since: 0.10.4
763  */
764 void
765 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
766 {
767   g_return_if_fail (GST_IS_BASE_SINK (sink));
768
769   GST_OBJECT_LOCK (sink);
770   sink->abidata.ABI.max_lateness = max_lateness;
771   GST_OBJECT_UNLOCK (sink);
772 }
773
774 /**
775  * gst_base_sink_get_max_lateness:
776  * @sink: the sink
777  *
778  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
779  * more details.
780  *
781  * Returns: The maximum time in nanoseconds that a buffer can be late
782  * before it is dropped and not rendered. A value of -1 means an
783  * unlimited time.
784  *
785  * Since: 0.10.4
786  */
787 gint64
788 gst_base_sink_get_max_lateness (GstBaseSink * sink)
789 {
790   gint64 res;
791
792   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
793
794   GST_OBJECT_LOCK (sink);
795   res = sink->abidata.ABI.max_lateness;
796   GST_OBJECT_UNLOCK (sink);
797
798   return res;
799 }
800
801 /**
802  * gst_base_sink_set_qos_enabled:
803  * @sink: the sink
804  * @enabled: the new qos value.
805  *
806  * Configures @sink to send Quality-of-Service events upstream.
807  *
808  * Since: 0.10.5
809  */
810 void
811 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
812 {
813   g_return_if_fail (GST_IS_BASE_SINK (sink));
814
815   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
816 }
817
818 /**
819  * gst_base_sink_is_qos_enabled:
820  * @sink: the sink
821  *
822  * Checks if @sink is currently configured to send Quality-of-Service events
823  * upstream.
824  *
825  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
826  *
827  * Since: 0.10.5
828  */
829 gboolean
830 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
831 {
832   gboolean res;
833
834   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
835
836   res = g_atomic_int_get (&sink->priv->qos_enabled);
837
838   return res;
839 }
840
841 /**
842  * gst_base_sink_set_async_enabled:
843  * @sink: the sink
844  * @enabled: the new async value.
845  *
846  * Configures @sink to perform all state changes asynchronusly. When async is
847  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
848  * preroll buffer. This feature is usefull if the sink does not synchronize
849  * against the clock or when it is dealing with sparse streams.
850  *
851  * Since: 0.10.15
852  */
853 void
854 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
855 {
856   g_return_if_fail (GST_IS_BASE_SINK (sink));
857
858   GST_PAD_PREROLL_LOCK (sink->sinkpad);
859   g_atomic_int_set (&sink->priv->async_enabled, enabled);
860   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
861   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
862 }
863
864 /**
865  * gst_base_sink_is_async_enabled:
866  * @sink: the sink
867  *
868  * Checks if @sink is currently configured to perform asynchronous state
869  * changes to PAUSED.
870  *
871  * Returns: TRUE if the sink is configured to perform asynchronous state
872  * changes.
873  *
874  * Since: 0.10.15
875  */
876 gboolean
877 gst_base_sink_is_async_enabled (GstBaseSink * sink)
878 {
879   gboolean res;
880
881   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
882
883   res = g_atomic_int_get (&sink->priv->async_enabled);
884
885   return res;
886 }
887
888 /**
889  * gst_base_sink_set_ts_offset:
890  * @sink: the sink
891  * @offset: the new offset
892  *
893  * Adjust the synchronisation of @sink with @offset. A negative value will
894  * render buffers earlier than their timestamp. A positive value will delay
895  * rendering. This function can be used to fix playback of badly timestamped
896  * buffers.
897  *
898  * Since: 0.10.15
899  */
900 void
901 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
902 {
903   g_return_if_fail (GST_IS_BASE_SINK (sink));
904
905   GST_OBJECT_LOCK (sink);
906   sink->priv->ts_offset = offset;
907   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
908   GST_OBJECT_UNLOCK (sink);
909 }
910
911 /**
912  * gst_base_sink_get_ts_offset:
913  * @sink: the sink
914  *
915  * Get the synchronisation offset of @sink.
916  *
917  * Returns: The synchronisation offset.
918  *
919  * Since: 0.10.15
920  */
921 GstClockTimeDiff
922 gst_base_sink_get_ts_offset (GstBaseSink * sink)
923 {
924   GstClockTimeDiff res;
925
926   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
927
928   GST_OBJECT_LOCK (sink);
929   res = sink->priv->ts_offset;
930   GST_OBJECT_UNLOCK (sink);
931
932   return res;
933 }
934
935 /**
936  * gst_base_sink_get_last_buffer:
937  * @sink: the sink
938  *
939  * Get the last buffer that arrived in the sink and was used for preroll or for
940  * rendering. This property can be used to generate thumbnails.
941  *
942  * The #GstCaps on the buffer can be used to determine the type of the buffer.
943  *
944  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
945  * NULL when no buffer has arrived in the sink yet or when the sink is not in
946  * PAUSED or PLAYING.
947  *
948  * Since: 0.10.15
949  */
950 GstBuffer *
951 gst_base_sink_get_last_buffer (GstBaseSink * sink)
952 {
953   GstBuffer *res;
954
955   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
956
957   GST_OBJECT_LOCK (sink);
958   if ((res = sink->priv->last_buffer))
959     gst_buffer_ref (res);
960   GST_OBJECT_UNLOCK (sink);
961
962   return res;
963 }
964
965 /* with OBJECT_LOCK */
966 static void
967 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
968 {
969   GstBuffer *old;
970
971   old = sink->priv->last_buffer;
972   if (G_LIKELY (old != buffer)) {
973     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
974     if (G_LIKELY (buffer))
975       gst_buffer_ref (buffer);
976     sink->priv->last_buffer = buffer;
977   } else {
978     old = NULL;
979   }
980   /* avoid unreffing with the lock because cleanup code might want to take the
981    * lock too */
982   if (G_LIKELY (old)) {
983     GST_OBJECT_UNLOCK (sink);
984     gst_buffer_unref (old);
985     GST_OBJECT_LOCK (sink);
986   }
987 }
988
989 static void
990 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
991 {
992   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
993     return;
994
995   GST_OBJECT_LOCK (sink);
996   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
997   GST_OBJECT_UNLOCK (sink);
998 }
999
1000 /**
1001  * gst_base_sink_set_last_buffer_enabled:
1002  * @sink: the sink
1003  * @enabled: the new enable-last-buffer value.
1004  *
1005  * Configures @sink to store the last received buffer in the last-buffer
1006  * property.
1007  *
1008  * Since: 0.10.30
1009  */
1010 void
1011 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1012 {
1013   g_return_if_fail (GST_IS_BASE_SINK (sink));
1014
1015   /* Only take lock if we change the value */
1016   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1017           !enabled, enabled) && !enabled) {
1018     GST_OBJECT_LOCK (sink);
1019     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1020     GST_OBJECT_UNLOCK (sink);
1021   }
1022 }
1023
1024 /**
1025  * gst_base_sink_is_last_buffer_enabled:
1026  * @sink: the sink
1027  *
1028  * Checks if @sink is currently configured to store the last received buffer in
1029  * the last-buffer property.
1030  *
1031  * Returns: TRUE if the sink is configured to store the last received buffer.
1032  *
1033  * Since: 0.10.30
1034  */
1035 gboolean
1036 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1037 {
1038   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1039
1040   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1041 }
1042
1043 /**
1044  * gst_base_sink_get_latency:
1045  * @sink: the sink
1046  *
1047  * Get the currently configured latency.
1048  *
1049  * Returns: The configured latency.
1050  *
1051  * Since: 0.10.12
1052  */
1053 GstClockTime
1054 gst_base_sink_get_latency (GstBaseSink * sink)
1055 {
1056   GstClockTime res;
1057
1058   GST_OBJECT_LOCK (sink);
1059   res = sink->priv->latency;
1060   GST_OBJECT_UNLOCK (sink);
1061
1062   return res;
1063 }
1064
1065 /**
1066  * gst_base_sink_query_latency:
1067  * @sink: the sink
1068  * @live: if the sink is live
1069  * @upstream_live: if an upstream element is live
1070  * @min_latency: the min latency of the upstream elements
1071  * @max_latency: the max latency of the upstream elements
1072  *
1073  * Query the sink for the latency parameters. The latency will be queried from
1074  * the upstream elements. @live will be TRUE if @sink is configured to
1075  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1076  * element is live.
1077  *
1078  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1079  * for the latency introduced by the upstream elements by setting the
1080  * @min_latency to a strictly possitive value.
1081  *
1082  * This function is mostly used by subclasses.
1083  *
1084  * Returns: TRUE if the query succeeded.
1085  *
1086  * Since: 0.10.12
1087  */
1088 gboolean
1089 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1090     gboolean * upstream_live, GstClockTime * min_latency,
1091     GstClockTime * max_latency)
1092 {
1093   gboolean l, us_live, res, have_latency;
1094   GstClockTime min, max, render_delay;
1095   GstQuery *query;
1096   GstClockTime us_min, us_max;
1097
1098   /* we are live when we sync to the clock */
1099   GST_OBJECT_LOCK (sink);
1100   l = sink->sync;
1101   have_latency = sink->priv->have_latency;
1102   render_delay = sink->priv->render_delay;
1103   GST_OBJECT_UNLOCK (sink);
1104
1105   /* assume no latency */
1106   min = 0;
1107   max = -1;
1108   us_live = FALSE;
1109
1110   if (have_latency) {
1111     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1112     /* we are ready for a latency query this is when we preroll or when we are
1113      * not async. */
1114     query = gst_query_new_latency ();
1115
1116     /* ask the peer for the latency */
1117     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1118       /* get upstream min and max latency */
1119       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1120
1121       if (us_live) {
1122         /* upstream live, use its latency, subclasses should use these
1123          * values to create the complete latency. */
1124         min = us_min;
1125         max = us_max;
1126       }
1127       if (l) {
1128         /* we need to add the render delay if we are live */
1129         if (min != -1)
1130           min += render_delay;
1131         if (max != -1)
1132           max += render_delay;
1133       }
1134     }
1135     gst_query_unref (query);
1136   } else {
1137     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1138     res = FALSE;
1139   }
1140
1141   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1142   if (!res) {
1143     if (!l) {
1144       res = TRUE;
1145       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1146     } else {
1147       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1148     }
1149   }
1150
1151   if (res) {
1152     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1153         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1154         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1155
1156     if (live)
1157       *live = l;
1158     if (upstream_live)
1159       *upstream_live = us_live;
1160     if (min_latency)
1161       *min_latency = min;
1162     if (max_latency)
1163       *max_latency = max;
1164   }
1165   return res;
1166 }
1167
1168 /**
1169  * gst_base_sink_set_render_delay:
1170  * @sink: a #GstBaseSink
1171  * @delay: the new delay
1172  *
1173  * Set the render delay in @sink to @delay. The render delay is the time
1174  * between actual rendering of a buffer and its synchronisation time. Some
1175  * devices might delay media rendering which can be compensated for with this
1176  * function.
1177  *
1178  * After calling this function, this sink will report additional latency and
1179  * other sinks will adjust their latency to delay the rendering of their media.
1180  *
1181  * This function is usually called by subclasses.
1182  *
1183  * Since: 0.10.21
1184  */
1185 void
1186 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1187 {
1188   GstClockTime old_render_delay;
1189
1190   g_return_if_fail (GST_IS_BASE_SINK (sink));
1191
1192   GST_OBJECT_LOCK (sink);
1193   old_render_delay = sink->priv->render_delay;
1194   sink->priv->render_delay = delay;
1195   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1196       GST_TIME_ARGS (delay));
1197   GST_OBJECT_UNLOCK (sink);
1198
1199   if (delay != old_render_delay) {
1200     GST_DEBUG_OBJECT (sink, "posting latency changed");
1201     gst_element_post_message (GST_ELEMENT_CAST (sink),
1202         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1203   }
1204 }
1205
1206 /**
1207  * gst_base_sink_get_render_delay:
1208  * @sink: a #GstBaseSink
1209  *
1210  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1211  * information about the render delay.
1212  *
1213  * Returns: the render delay of @sink.
1214  *
1215  * Since: 0.10.21
1216  */
1217 GstClockTime
1218 gst_base_sink_get_render_delay (GstBaseSink * sink)
1219 {
1220   GstClockTimeDiff res;
1221
1222   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1223
1224   GST_OBJECT_LOCK (sink);
1225   res = sink->priv->render_delay;
1226   GST_OBJECT_UNLOCK (sink);
1227
1228   return res;
1229 }
1230
1231 /**
1232  * gst_base_sink_set_blocksize:
1233  * @sink: a #GstBaseSink
1234  * @blocksize: the blocksize in bytes
1235  *
1236  * Set the number of bytes that the sink will pull when it is operating in pull
1237  * mode.
1238  *
1239  * Since: 0.10.22
1240  */
1241 void
1242 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1243 {
1244   g_return_if_fail (GST_IS_BASE_SINK (sink));
1245
1246   GST_OBJECT_LOCK (sink);
1247   sink->priv->blocksize = blocksize;
1248   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1249   GST_OBJECT_UNLOCK (sink);
1250 }
1251
1252 /**
1253  * gst_base_sink_get_blocksize:
1254  * @sink: a #GstBaseSink
1255  *
1256  * Get the number of bytes that the sink will pull when it is operating in pull
1257  * mode.
1258  *
1259  * Returns: the number of bytes @sink will pull in pull mode.
1260  *
1261  * Since: 0.10.22
1262  */
1263 guint
1264 gst_base_sink_get_blocksize (GstBaseSink * sink)
1265 {
1266   guint res;
1267
1268   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1269
1270   GST_OBJECT_LOCK (sink);
1271   res = sink->priv->blocksize;
1272   GST_OBJECT_UNLOCK (sink);
1273
1274   return res;
1275 }
1276
1277 static void
1278 gst_base_sink_set_property (GObject * object, guint prop_id,
1279     const GValue * value, GParamSpec * pspec)
1280 {
1281   GstBaseSink *sink = GST_BASE_SINK (object);
1282
1283   switch (prop_id) {
1284     case PROP_PREROLL_QUEUE_LEN:
1285       /* preroll lock necessary to serialize with finish_preroll */
1286       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1287       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1288       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1289       break;
1290     case PROP_SYNC:
1291       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1292       break;
1293     case PROP_MAX_LATENESS:
1294       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1295       break;
1296     case PROP_QOS:
1297       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1298       break;
1299     case PROP_ASYNC:
1300       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1301       break;
1302     case PROP_TS_OFFSET:
1303       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1304       break;
1305     case PROP_BLOCKSIZE:
1306       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1307       break;
1308     case PROP_RENDER_DELAY:
1309       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1310       break;
1311     case PROP_ENABLE_LAST_BUFFER:
1312       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1313       break;
1314     default:
1315       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1316       break;
1317   }
1318 }
1319
1320 static void
1321 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1322     GParamSpec * pspec)
1323 {
1324   GstBaseSink *sink = GST_BASE_SINK (object);
1325
1326   switch (prop_id) {
1327     case PROP_PREROLL_QUEUE_LEN:
1328       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1329       break;
1330     case PROP_SYNC:
1331       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1332       break;
1333     case PROP_MAX_LATENESS:
1334       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1335       break;
1336     case PROP_QOS:
1337       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1338       break;
1339     case PROP_ASYNC:
1340       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1341       break;
1342     case PROP_TS_OFFSET:
1343       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1344       break;
1345     case PROP_LAST_BUFFER:
1346       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1347       break;
1348     case PROP_ENABLE_LAST_BUFFER:
1349       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1350       break;
1351     case PROP_BLOCKSIZE:
1352       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1353       break;
1354     case PROP_RENDER_DELAY:
1355       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1356       break;
1357     default:
1358       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1359       break;
1360   }
1361 }
1362
1363
1364 static GstCaps *
1365 gst_base_sink_get_caps (GstBaseSink * sink)
1366 {
1367   return NULL;
1368 }
1369
1370 static gboolean
1371 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1372 {
1373   return TRUE;
1374 }
1375
1376 static GstFlowReturn
1377 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1378     GstCaps * caps, GstBuffer ** buf)
1379 {
1380   *buf = NULL;
1381   return GST_FLOW_OK;
1382 }
1383
1384 /* with PREROLL_LOCK, STREAM_LOCK */
1385 static void
1386 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1387 {
1388   GstMiniObject *obj;
1389
1390   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1391   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1392     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1393     gst_mini_object_unref (obj);
1394   }
1395   /* we can't have EOS anymore now */
1396   basesink->eos = FALSE;
1397   basesink->priv->received_eos = FALSE;
1398   basesink->have_preroll = FALSE;
1399   basesink->priv->step_unlock = FALSE;
1400   basesink->eos_queued = FALSE;
1401   basesink->preroll_queued = 0;
1402   basesink->buffers_queued = 0;
1403   basesink->events_queued = 0;
1404   /* can't report latency anymore until we preroll again */
1405   if (basesink->priv->async_enabled) {
1406     GST_OBJECT_LOCK (basesink);
1407     basesink->priv->have_latency = FALSE;
1408     GST_OBJECT_UNLOCK (basesink);
1409   }
1410   /* and signal any waiters now */
1411   GST_PAD_PREROLL_SIGNAL (pad);
1412 }
1413
1414 /* with STREAM_LOCK, configures given segment with the event information. */
1415 static void
1416 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1417     GstEvent * event, GstSegment * segment)
1418 {
1419   gboolean update;
1420   gdouble rate, arate;
1421   GstFormat format;
1422   gint64 start;
1423   gint64 stop;
1424   gint64 time;
1425
1426   /* the newsegment event is needed to bring the buffer timestamps to the
1427    * stream time and to drop samples outside of the playback segment. */
1428   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1429       &start, &stop, &time);
1430
1431   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1432    * We protect with the OBJECT_LOCK so that we can use the values to
1433    * safely answer a POSITION query. */
1434   GST_OBJECT_LOCK (basesink);
1435   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1436       stop, time);
1437
1438   if (format == GST_FORMAT_TIME) {
1439     GST_DEBUG_OBJECT (basesink,
1440         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1441         "format GST_FORMAT_TIME, "
1442         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1443         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1444         update, rate, arate, GST_TIME_ARGS (segment->start),
1445         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1446         GST_TIME_ARGS (segment->accum));
1447   } else {
1448     GST_DEBUG_OBJECT (basesink,
1449         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1450         "format %d, "
1451         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1452         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1453         segment->format, segment->start, segment->stop, segment->time,
1454         segment->accum);
1455   }
1456   GST_OBJECT_UNLOCK (basesink);
1457 }
1458
1459 /* with PREROLL_LOCK, STREAM_LOCK */
1460 static gboolean
1461 gst_base_sink_commit_state (GstBaseSink * basesink)
1462 {
1463   /* commit state and proceed to next pending state */
1464   GstState current, next, pending, post_pending;
1465   gboolean post_paused = FALSE;
1466   gboolean post_async_done = FALSE;
1467   gboolean post_playing = FALSE;
1468
1469   /* we are certainly not playing async anymore now */
1470   basesink->playing_async = FALSE;
1471
1472   GST_OBJECT_LOCK (basesink);
1473   current = GST_STATE (basesink);
1474   next = GST_STATE_NEXT (basesink);
1475   pending = GST_STATE_PENDING (basesink);
1476   post_pending = pending;
1477
1478   switch (pending) {
1479     case GST_STATE_PLAYING:
1480     {
1481       GstBaseSinkClass *bclass;
1482       GstStateChangeReturn ret;
1483
1484       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1485
1486       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1487
1488       basesink->need_preroll = FALSE;
1489       post_async_done = TRUE;
1490       basesink->priv->commited = TRUE;
1491       post_playing = TRUE;
1492       /* post PAUSED too when we were READY */
1493       if (current == GST_STATE_READY) {
1494         post_paused = TRUE;
1495       }
1496
1497       /* make sure we notify the subclass of async playing */
1498       if (bclass->async_play) {
1499         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1500         ret = bclass->async_play (basesink);
1501         if (ret == GST_STATE_CHANGE_FAILURE)
1502           goto async_failed;
1503       }
1504       break;
1505     }
1506     case GST_STATE_PAUSED:
1507       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1508       post_paused = TRUE;
1509       post_async_done = TRUE;
1510       basesink->priv->commited = TRUE;
1511       post_pending = GST_STATE_VOID_PENDING;
1512       break;
1513     case GST_STATE_READY:
1514     case GST_STATE_NULL:
1515       goto stopping;
1516     case GST_STATE_VOID_PENDING:
1517       goto nothing_pending;
1518     default:
1519       break;
1520   }
1521
1522   /* we can report latency queries now */
1523   basesink->priv->have_latency = TRUE;
1524
1525   GST_STATE (basesink) = pending;
1526   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1527   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1528   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1529   GST_OBJECT_UNLOCK (basesink);
1530
1531   if (post_paused) {
1532     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1533     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1534         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1535             current, next, post_pending));
1536   }
1537   if (post_async_done) {
1538     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1539     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1540         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1541   }
1542   if (post_playing) {
1543     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1544     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1545         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1546             next, pending, GST_STATE_VOID_PENDING));
1547   }
1548
1549   GST_STATE_BROADCAST (basesink);
1550
1551   return TRUE;
1552
1553 nothing_pending:
1554   {
1555     /* Depending on the state, set our vars. We get in this situation when the
1556      * state change function got a change to update the state vars before the
1557      * streaming thread did. This is fine but we need to make sure that we
1558      * update the need_preroll var since it was TRUE when we got here and might
1559      * become FALSE if we got to PLAYING. */
1560     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1561         gst_element_state_get_name (current));
1562     switch (current) {
1563       case GST_STATE_PLAYING:
1564         basesink->need_preroll = FALSE;
1565         break;
1566       case GST_STATE_PAUSED:
1567         basesink->need_preroll = TRUE;
1568         break;
1569       default:
1570         basesink->need_preroll = FALSE;
1571         basesink->flushing = TRUE;
1572         break;
1573     }
1574     /* we can report latency queries now */
1575     basesink->priv->have_latency = TRUE;
1576     GST_OBJECT_UNLOCK (basesink);
1577     return TRUE;
1578   }
1579 stopping:
1580   {
1581     /* app is going to READY */
1582     GST_DEBUG_OBJECT (basesink, "stopping");
1583     basesink->need_preroll = FALSE;
1584     basesink->flushing = TRUE;
1585     GST_OBJECT_UNLOCK (basesink);
1586     return FALSE;
1587   }
1588 async_failed:
1589   {
1590     GST_DEBUG_OBJECT (basesink, "async commit failed");
1591     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1592     GST_OBJECT_UNLOCK (basesink);
1593     return FALSE;
1594   }
1595 }
1596
1597 static void
1598 start_stepping (GstBaseSink * sink, GstSegment * segment,
1599     GstStepInfo * pending, GstStepInfo * current)
1600 {
1601   gint64 end;
1602   GstMessage *message;
1603
1604   GST_DEBUG_OBJECT (sink, "update pending step");
1605
1606   GST_OBJECT_LOCK (sink);
1607   memcpy (current, pending, sizeof (GstStepInfo));
1608   pending->valid = FALSE;
1609   GST_OBJECT_UNLOCK (sink);
1610
1611   /* post message first */
1612   message =
1613       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1614       current->amount, current->rate, current->flush, current->intermediate);
1615   gst_message_set_seqnum (message, current->seqnum);
1616   gst_element_post_message (GST_ELEMENT (sink), message);
1617
1618   /* get the running time of where we paused and remember it */
1619   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1620   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1621
1622   /* set the new rate for the remainder of the segment */
1623   current->start_rate = segment->rate;
1624   segment->rate *= current->rate;
1625   segment->abs_rate = ABS (segment->rate);
1626
1627   /* save values */
1628   if (segment->rate > 0.0)
1629     current->start_stop = segment->stop;
1630   else
1631     current->start_start = segment->start;
1632
1633   if (current->format == GST_FORMAT_TIME) {
1634     end = current->start + current->amount;
1635     if (!current->flush) {
1636       /* update the segment clipping regions for non-flushing seeks */
1637       if (segment->rate > 0.0) {
1638         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1639         segment->last_stop = segment->stop;
1640       } else {
1641         gint64 position;
1642
1643         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1644         segment->time = position;
1645         segment->start = position;
1646         segment->last_stop = position;
1647       }
1648     }
1649   }
1650
1651   GST_DEBUG_OBJECT (sink,
1652       "segment now rate %lf, applied rate %lf, "
1653       "format GST_FORMAT_TIME, "
1654       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1655       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1656       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1657       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1658       GST_TIME_ARGS (segment->accum));
1659
1660   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1661       GST_TIME_ARGS (current->start));
1662
1663   if (current->amount == -1) {
1664     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1665     current->valid = FALSE;
1666   } else {
1667     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1668         "rate: %f", current->amount, gst_format_get_name (current->format),
1669         current->rate);
1670   }
1671 }
1672
1673 static void
1674 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1675     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1676 {
1677   gint64 stop, position;
1678   GstMessage *message;
1679
1680   GST_DEBUG_OBJECT (sink, "step complete");
1681
1682   if (segment->rate > 0.0)
1683     stop = rstart;
1684   else
1685     stop = rstop;
1686
1687   GST_DEBUG_OBJECT (sink,
1688       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1689
1690   if (stop == -1)
1691     current->duration = current->position;
1692   else
1693     current->duration = stop - current->start;
1694
1695   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1696       GST_TIME_ARGS (current->duration));
1697
1698   position = current->start + current->duration;
1699
1700   /* now move the segment to the new running time */
1701   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1702
1703   if (current->flush) {
1704     /* and remove the accumulated time we flushed, start time did not change */
1705     segment->accum = current->start;
1706   } else {
1707     /* start time is now the stepped position */
1708     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1709   }
1710
1711   /* restore the previous rate */
1712   segment->rate = current->start_rate;
1713   segment->abs_rate = ABS (segment->rate);
1714
1715   if (segment->rate > 0.0)
1716     segment->stop = current->start_stop;
1717   else
1718     segment->start = current->start_start;
1719
1720   /* the clip segment is used for position report in paused... */
1721   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1722
1723   /* post the step done when we know the stepped duration in TIME */
1724   message =
1725       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1726       current->amount, current->rate, current->flush, current->intermediate,
1727       current->duration, eos);
1728   gst_message_set_seqnum (message, current->seqnum);
1729   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1730
1731   if (!current->intermediate)
1732     sink->need_preroll = current->need_preroll;
1733
1734   /* and the current step info finished and becomes invalid */
1735   current->valid = FALSE;
1736 }
1737
1738 static gboolean
1739 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1740     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1741     gint64 * rstop)
1742 {
1743   gboolean step_end = FALSE;
1744
1745   /* see if we need to skip this buffer because of stepping */
1746   switch (current->format) {
1747     case GST_FORMAT_TIME:
1748     {
1749       guint64 end;
1750       gint64 first, last;
1751
1752       if (segment->rate > 0.0) {
1753         if (segment->stop == *cstop)
1754           *rstop = *rstart + current->amount;
1755
1756         first = *rstart;
1757         last = *rstop;
1758       } else {
1759         if (segment->start == *cstart)
1760           *rstart = *rstop + current->amount;
1761
1762         first = *rstop;
1763         last = *rstart;
1764       }
1765
1766       end = current->start + current->amount;
1767       current->position = first - current->start;
1768
1769       if (G_UNLIKELY (segment->abs_rate != 1.0))
1770         current->position /= segment->abs_rate;
1771
1772       GST_DEBUG_OBJECT (sink,
1773           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1774           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1775       GST_DEBUG_OBJECT (sink,
1776           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1777           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1778           GST_TIME_ARGS (last - current->start),
1779           GST_TIME_ARGS (current->amount));
1780
1781       if ((current->flush && current->position >= current->amount)
1782           || last >= end) {
1783         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1784         step_end = TRUE;
1785         if (segment->rate > 0.0) {
1786           *rstart = end;
1787           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1788         } else {
1789           *rstop = end;
1790           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1791         }
1792       }
1793       GST_DEBUG_OBJECT (sink,
1794           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1795           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1796       GST_DEBUG_OBJECT (sink,
1797           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1798           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1799       break;
1800     }
1801     case GST_FORMAT_BUFFERS:
1802       GST_DEBUG_OBJECT (sink,
1803           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1804           current->position, current->amount);
1805
1806       if (current->position < current->amount) {
1807         current->position++;
1808       } else {
1809         step_end = TRUE;
1810       }
1811       break;
1812     case GST_FORMAT_DEFAULT:
1813     default:
1814       GST_DEBUG_OBJECT (sink,
1815           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1816           current->position, current->amount);
1817       break;
1818   }
1819   return step_end;
1820 }
1821
1822 /* with STREAM_LOCK, PREROLL_LOCK
1823  *
1824  * Returns TRUE if the object needs synchronisation and takes therefore
1825  * part in prerolling.
1826  *
1827  * rsstart/rsstop contain the start/stop in stream time.
1828  * rrstart/rrstop contain the start/stop in running time.
1829  */
1830 static gboolean
1831 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1832     GstClockTime * rsstart, GstClockTime * rsstop,
1833     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1834     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1835     gboolean * step_end, guint8 obj_type)
1836 {
1837   GstBaseSinkClass *bclass;
1838   GstBuffer *buffer;
1839   GstClockTime start, stop;     /* raw start/stop timestamps */
1840   gint64 cstart, cstop;         /* clipped raw timestamps */
1841   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1842   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1843   GstFormat format;
1844   GstBaseSinkPrivate *priv;
1845   gboolean eos;
1846
1847   priv = basesink->priv;
1848
1849   /* start with nothing */
1850   start = stop = GST_CLOCK_TIME_NONE;
1851
1852   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1853     GstEvent *event = GST_EVENT_CAST (obj);
1854
1855     switch (GST_EVENT_TYPE (event)) {
1856         /* EOS event needs syncing */
1857       case GST_EVENT_EOS:
1858       {
1859         if (basesink->segment.rate >= 0.0) {
1860           sstart = sstop = priv->current_sstop;
1861           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1862             /* we have not seen a buffer yet, use the segment values */
1863             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1864                 basesink->segment.format, basesink->segment.stop);
1865           }
1866         } else {
1867           sstart = sstop = priv->current_sstart;
1868           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1869             /* we have not seen a buffer yet, use the segment values */
1870             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1871                 basesink->segment.format, basesink->segment.start);
1872           }
1873         }
1874
1875         rstart = rstop = priv->eos_rtime;
1876         *do_sync = rstart != -1;
1877         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1878             GST_TIME_ARGS (rstart));
1879         /* if we are stepping, we end now */
1880         *step_end = step->valid;
1881         eos = TRUE;
1882         goto eos_done;
1883       }
1884       default:
1885         /* other events do not need syncing */
1886         /* FIXME, maybe NEWSEGMENT might need synchronisation
1887          * since the POSITION query depends on accumulated times and
1888          * we cannot accumulate the current segment before the previous
1889          * one completed.
1890          */
1891         return FALSE;
1892     }
1893   }
1894
1895   eos = FALSE;
1896
1897 again:
1898   /* else do buffer sync code */
1899   buffer = GST_BUFFER_CAST (obj);
1900
1901   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1902
1903   /* just get the times to see if we need syncing, if the start returns -1 we
1904    * don't sync. */
1905   if (bclass->get_times)
1906     bclass->get_times (basesink, buffer, &start, &stop);
1907
1908   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1909     /* we don't need to sync but we still want to get the timestamps for
1910      * tracking the position */
1911     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1912     *do_sync = FALSE;
1913   } else {
1914     *do_sync = TRUE;
1915   }
1916
1917   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1918       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1919       GST_TIME_ARGS (stop), *do_sync);
1920
1921   /* collect segment and format for code clarity */
1922   format = segment->format;
1923
1924   /* no timestamp clipping if we did not get a TIME segment format */
1925   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1926     cstart = start;
1927     cstop = stop;
1928     /* do running and stream time in TIME format */
1929     format = GST_FORMAT_TIME;
1930     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1931     goto do_times;
1932   }
1933
1934   /* clip, only when we know about time */
1935   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1936               (gint64) start, (gint64) stop, &cstart, &cstop))) {
1937     if (step->valid) {
1938       GST_DEBUG_OBJECT (basesink, "step out of segment");
1939       /* when we are stepping, pretend we're at the end of the segment */
1940       if (segment->rate > 0.0) {
1941         cstart = segment->stop;
1942         cstop = segment->stop;
1943       } else {
1944         cstart = segment->start;
1945         cstop = segment->start;
1946       }
1947       goto do_times;
1948     }
1949     goto out_of_segment;
1950   }
1951
1952   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1953     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1954         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1955         GST_TIME_ARGS (cstop));
1956   }
1957
1958   /* set last stop position */
1959   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1960     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1961   else
1962     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1963
1964 do_times:
1965   rstart = gst_segment_to_running_time (segment, format, cstart);
1966   rstop = gst_segment_to_running_time (segment, format, cstop);
1967
1968   if (G_UNLIKELY (step->valid)) {
1969     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1970                 &rstart, &rstop))) {
1971       /* step is still busy, we discard data when we are flushing */
1972       *stepped = step->flush;
1973       GST_DEBUG_OBJECT (basesink, "stepping busy");
1974     }
1975   }
1976   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1977    * upstream is behaving very badly */
1978   sstart = gst_segment_to_stream_time (segment, format, cstart);
1979   sstop = gst_segment_to_stream_time (segment, format, cstop);
1980
1981 eos_done:
1982   /* eos_done label only called when doing EOS, we also stop stepping then */
1983   if (*step_end && step->flush) {
1984     GST_DEBUG_OBJECT (basesink, "flushing step ended");
1985     stop_stepping (basesink, segment, step, rstart, rstop, eos);
1986     *step_end = FALSE;
1987     /* re-determine running start times for adjusted segment
1988      * (which has a flushed amount of running/accumulated time removed) */
1989     if (!GST_IS_EVENT (obj)) {
1990       GST_DEBUG_OBJECT (basesink, "refresh sync times");
1991       goto again;
1992     }
1993   }
1994
1995   /* save times */
1996   *rsstart = sstart;
1997   *rsstop = sstop;
1998   *rrstart = rstart;
1999   *rrstop = rstop;
2000
2001   /* buffers and EOS always need syncing and preroll */
2002   return TRUE;
2003
2004   /* special cases */
2005 out_of_segment:
2006   {
2007     /* we usually clip in the chain function already but stepping could cause
2008      * the segment to be updated later. we return FALSE so that we don't try
2009      * to sync on it. */
2010     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2011     return FALSE;
2012   }
2013 }
2014
2015 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2016  * adjust a timestamp with the latency and timestamp offset. This function does
2017  * not adjust for the render delay. */
2018 static GstClockTime
2019 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2020 {
2021   GstClockTimeDiff ts_offset;
2022
2023   /* don't do anything funny with invalid timestamps */
2024   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2025     return time;
2026
2027   time += basesink->priv->latency;
2028
2029   /* apply offset, be carefull for underflows */
2030   ts_offset = basesink->priv->ts_offset;
2031   if (ts_offset < 0) {
2032     ts_offset = -ts_offset;
2033     if (ts_offset < time)
2034       time -= ts_offset;
2035     else
2036       time = 0;
2037   } else
2038     time += ts_offset;
2039
2040   /* subtract the render delay again, which was included in the latency */
2041   if (time > basesink->priv->render_delay)
2042     time -= basesink->priv->render_delay;
2043   else
2044     time = 0;
2045
2046   return time;
2047 }
2048
2049 /**
2050  * gst_base_sink_wait_clock:
2051  * @sink: the sink
2052  * @time: the running_time to be reached
2053  * @jitter: the jitter to be filled with time diff (can be NULL)
2054  *
2055  * This function will block until @time is reached. It is usually called by
2056  * subclasses that use their own internal synchronisation.
2057  *
2058  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2059  * returned. Likewise, if synchronisation is disabled in the element or there
2060  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2061  *
2062  * This function should only be called with the PREROLL_LOCK held, like when
2063  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2064  * receiving a buffer in
2065  * the #GstBaseSinkClass.render() vmethod.
2066  *
2067  * The @time argument should be the running_time of when this method should
2068  * return and is not adjusted with any latency or offset configured in the
2069  * sink.
2070  *
2071  * Since 0.10.20
2072  *
2073  * Returns: #GstClockReturn
2074  */
2075 GstClockReturn
2076 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2077     GstClockTimeDiff * jitter)
2078 {
2079   GstClockID id;
2080   GstClockReturn ret;
2081   GstClock *clock;
2082   GstClockTime base_time;
2083
2084   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2085     goto invalid_time;
2086
2087   GST_OBJECT_LOCK (sink);
2088   if (G_UNLIKELY (!sink->sync))
2089     goto no_sync;
2090
2091   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2092     goto no_clock;
2093
2094   base_time = GST_ELEMENT_CAST (sink)->base_time;
2095   GST_LOG_OBJECT (sink,
2096       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2097       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2098
2099   /* add base_time to running_time to get the time against the clock */
2100   time += base_time;
2101
2102   id = gst_clock_new_single_shot_id (clock, time);
2103   GST_OBJECT_UNLOCK (sink);
2104
2105   /* A blocking wait is performed on the clock. We save the ClockID
2106    * so we can unlock the entry at any time. While we are blocking, we
2107    * release the PREROLL_LOCK so that other threads can interrupt the
2108    * entry. */
2109   sink->clock_id = id;
2110   /* release the preroll lock while waiting */
2111   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2112
2113   ret = gst_clock_id_wait (id, jitter);
2114
2115   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2116   gst_clock_id_unref (id);
2117   sink->clock_id = NULL;
2118
2119   return ret;
2120
2121   /* no syncing needed */
2122 invalid_time:
2123   {
2124     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2125     return GST_CLOCK_BADTIME;
2126   }
2127 no_sync:
2128   {
2129     GST_DEBUG_OBJECT (sink, "sync disabled");
2130     GST_OBJECT_UNLOCK (sink);
2131     return GST_CLOCK_BADTIME;
2132   }
2133 no_clock:
2134   {
2135     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2136     GST_OBJECT_UNLOCK (sink);
2137     return GST_CLOCK_BADTIME;
2138   }
2139 }
2140
2141 /**
2142  * gst_base_sink_wait_preroll:
2143  * @sink: the sink
2144  *
2145  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2146  * against the clock it must unblock when going from PLAYING to the PAUSED state
2147  * and call this method before continuing to render the remaining data.
2148  *
2149  * This function will block until a state change to PLAYING happens (in which
2150  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2151  * to a state change to READY or a FLUSH event (in which case this function
2152  * returns #GST_FLOW_WRONG_STATE).
2153  *
2154  * This function should only be called with the PREROLL_LOCK held, like in the
2155  * render function.
2156  *
2157  * Since: 0.10.11
2158  *
2159  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2160  * continue. Any other return value should be returned from the render vmethod.
2161  */
2162 GstFlowReturn
2163 gst_base_sink_wait_preroll (GstBaseSink * sink)
2164 {
2165   sink->have_preroll = TRUE;
2166   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2167   /* block until the state changes, or we get a flush, or something */
2168   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2169   sink->have_preroll = FALSE;
2170   if (G_UNLIKELY (sink->flushing))
2171     goto stopping;
2172   if (G_UNLIKELY (sink->priv->step_unlock))
2173     goto step_unlocked;
2174   GST_DEBUG_OBJECT (sink, "continue after preroll");
2175
2176   return GST_FLOW_OK;
2177
2178   /* ERRORS */
2179 stopping:
2180   {
2181     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2182     return GST_FLOW_WRONG_STATE;
2183   }
2184 step_unlocked:
2185   {
2186     sink->priv->step_unlock = FALSE;
2187     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2188     return GST_FLOW_STEP;
2189   }
2190 }
2191
2192 /**
2193  * gst_base_sink_do_preroll:
2194  * @sink: the sink
2195  * @obj: the object that caused the preroll
2196  *
2197  * If the @sink spawns its own thread for pulling buffers from upstream it
2198  * should call this method after it has pulled a buffer. If the element needed
2199  * to preroll, this function will perform the preroll and will then block
2200  * until the element state is changed.
2201  *
2202  * This function should be called with the PREROLL_LOCK held.
2203  *
2204  * Since 0.10.22
2205  *
2206  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2207  * continue. Any other return value should be returned from the render vmethod.
2208  */
2209 GstFlowReturn
2210 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2211 {
2212   GstFlowReturn ret;
2213
2214   while (G_UNLIKELY (sink->need_preroll)) {
2215     guint8 obj_type = _PR_IS_NOTHING;
2216     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2217
2218     if (G_LIKELY (GST_IS_BUFFER (obj)))
2219       obj_type = _PR_IS_BUFFER;
2220     else if (GST_IS_EVENT (obj))
2221       obj_type = _PR_IS_EVENT;
2222     else if (GST_IS_BUFFER_LIST (obj))
2223       obj_type = _PR_IS_BUFFERLIST;
2224
2225     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2226     if (ret != GST_FLOW_OK)
2227       goto preroll_failed;
2228
2229     /* need to recheck here because the commit state could have
2230      * made us not need the preroll anymore */
2231     if (G_LIKELY (sink->need_preroll)) {
2232       /* block until the state changes, or we get a flush, or something */
2233       ret = gst_base_sink_wait_preroll (sink);
2234       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2235         goto preroll_failed;
2236     }
2237   }
2238   return GST_FLOW_OK;
2239
2240   /* ERRORS */
2241 preroll_failed:
2242   {
2243     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2244     return ret;
2245   }
2246 }
2247
2248 /**
2249  * gst_base_sink_wait_eos:
2250  * @sink: the sink
2251  * @time: the running_time to be reached
2252  * @jitter: the jitter to be filled with time diff (can be NULL)
2253  *
2254  * This function will block until @time is reached. It is usually called by
2255  * subclasses that use their own internal synchronisation but want to let the
2256  * EOS be handled by the base class.
2257  *
2258  * This function should only be called with the PREROLL_LOCK held, like when
2259  * receiving an EOS event in the ::event vmethod.
2260  *
2261  * The @time argument should be the running_time of when the EOS should happen
2262  * and will be adjusted with any latency and offset configured in the sink.
2263  *
2264  * Since 0.10.15
2265  *
2266  * Returns: #GstFlowReturn
2267  */
2268 GstFlowReturn
2269 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2270     GstClockTimeDiff * jitter)
2271 {
2272   GstClockReturn status;
2273   GstFlowReturn ret;
2274
2275   do {
2276     GstClockTime stime;
2277
2278     GST_DEBUG_OBJECT (sink, "checking preroll");
2279
2280     /* first wait for the playing state before we can continue */
2281     if (G_UNLIKELY (sink->need_preroll)) {
2282       ret = gst_base_sink_wait_preroll (sink);
2283       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2284         goto flushing;
2285     }
2286
2287     /* preroll done, we can sync since we are in PLAYING now. */
2288     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2289         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2290
2291     /* compensate for latency and ts_offset. We don't adjust for render delay
2292      * because we don't interact with the device on EOS normally. */
2293     stime = gst_base_sink_adjust_time (sink, time);
2294
2295     /* wait for the clock, this can be interrupted because we got shut down or
2296      * we PAUSED. */
2297     status = gst_base_sink_wait_clock (sink, stime, jitter);
2298
2299     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2300
2301     /* invalid time, no clock or sync disabled, just continue then */
2302     if (status == GST_CLOCK_BADTIME)
2303       break;
2304
2305     /* waiting could have been interrupted and we can be flushing now */
2306     if (G_UNLIKELY (sink->flushing))
2307       goto flushing;
2308
2309     /* retry if we got unscheduled, which means we did not reach the timeout
2310      * yet. if some other error occures, we continue. */
2311   } while (status == GST_CLOCK_UNSCHEDULED);
2312
2313   GST_DEBUG_OBJECT (sink, "end of stream");
2314
2315   return GST_FLOW_OK;
2316
2317   /* ERRORS */
2318 flushing:
2319   {
2320     GST_DEBUG_OBJECT (sink, "we are flushing");
2321     return GST_FLOW_WRONG_STATE;
2322   }
2323 }
2324
2325 /* with STREAM_LOCK, PREROLL_LOCK
2326  *
2327  * Make sure we are in PLAYING and synchronize an object to the clock.
2328  *
2329  * If we need preroll, we are not in PLAYING. We try to commit the state
2330  * if needed and then block if we still are not PLAYING.
2331  *
2332  * We start waiting on the clock in PLAYING. If we got interrupted, we
2333  * immediatly try to re-preroll.
2334  *
2335  * Some objects do not need synchronisation (most events) and so this function
2336  * immediatly returns GST_FLOW_OK.
2337  *
2338  * for objects that arrive later than max-lateness to be synchronized to the
2339  * clock have the @late boolean set to TRUE.
2340  *
2341  * This function keeps a running average of the jitter (the diff between the
2342  * clock time and the requested sync time). The jitter is negative for
2343  * objects that arrive in time and positive for late buffers.
2344  *
2345  * does not take ownership of obj.
2346  */
2347 static GstFlowReturn
2348 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2349     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2350 {
2351   GstClockTimeDiff jitter = 0;
2352   gboolean syncable;
2353   GstClockReturn status = GST_CLOCK_OK;
2354   GstClockTime rstart, rstop, sstart, sstop, stime;
2355   gboolean do_sync;
2356   GstBaseSinkPrivate *priv;
2357   GstFlowReturn ret;
2358   GstStepInfo *current, *pending;
2359   gboolean stepped;
2360
2361   priv = basesink->priv;
2362
2363 do_step:
2364   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2365   do_sync = TRUE;
2366   stepped = FALSE;
2367
2368   priv->current_rstart = GST_CLOCK_TIME_NONE;
2369
2370   /* get stepping info */
2371   current = &priv->current_step;
2372   pending = &priv->pending_step;
2373
2374   /* get timing information for this object against the render segment */
2375   syncable = gst_base_sink_get_sync_times (basesink, obj,
2376       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2377       current, step_end, obj_type);
2378
2379   if (G_UNLIKELY (stepped))
2380     goto step_skipped;
2381
2382   /* a syncable object needs to participate in preroll and
2383    * clocking. All buffers and EOS are syncable. */
2384   if (G_UNLIKELY (!syncable))
2385     goto not_syncable;
2386
2387   /* store timing info for current object */
2388   priv->current_rstart = rstart;
2389   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2390
2391   /* save sync time for eos when the previous object needed sync */
2392   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2393
2394 again:
2395   /* first do preroll, this makes sure we commit our state
2396    * to PAUSED and can continue to PLAYING. We cannot perform
2397    * any clock sync in PAUSED because there is no clock. */
2398   ret = gst_base_sink_do_preroll (basesink, obj);
2399   if (G_UNLIKELY (ret != GST_FLOW_OK))
2400     goto preroll_failed;
2401
2402   /* update the segment with a pending step if the current one is invalid and we
2403    * have a new pending one. We only accept new step updates after a preroll */
2404   if (G_UNLIKELY (pending->valid && !current->valid)) {
2405     start_stepping (basesink, &basesink->segment, pending, current);
2406     goto do_step;
2407   }
2408
2409   /* After rendering we store the position of the last buffer so that we can use
2410    * it to report the position. We need to take the lock here. */
2411   GST_OBJECT_LOCK (basesink);
2412   priv->current_sstart = sstart;
2413   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2414   GST_OBJECT_UNLOCK (basesink);
2415
2416   if (!do_sync)
2417     goto done;
2418
2419   /* adjust for latency */
2420   stime = gst_base_sink_adjust_time (basesink, rstart);
2421
2422   /* adjust for render-delay, avoid underflows */
2423   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2424     if (stime > priv->render_delay)
2425       stime -= priv->render_delay;
2426     else
2427       stime = 0;
2428   }
2429
2430   /* preroll done, we can sync since we are in PLAYING now. */
2431   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2432       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2433       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2434
2435   /* This function will return immediatly if start == -1, no clock
2436    * or sync is disabled with GST_CLOCK_BADTIME. */
2437   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2438
2439   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2440       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2441
2442   /* invalid time, no clock or sync disabled, just render */
2443   if (status == GST_CLOCK_BADTIME)
2444     goto done;
2445
2446   /* waiting could have been interrupted and we can be flushing now */
2447   if (G_UNLIKELY (basesink->flushing))
2448     goto flushing;
2449
2450   /* check for unlocked by a state change, we are not flushing so
2451    * we can try to preroll on the current buffer. */
2452   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2453     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2454     priv->call_preroll = TRUE;
2455     goto again;
2456   }
2457
2458   /* successful syncing done, record observation */
2459   priv->current_jitter = jitter;
2460
2461   /* check if the object should be dropped */
2462   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2463       status, jitter);
2464
2465 done:
2466   return GST_FLOW_OK;
2467
2468   /* ERRORS */
2469 step_skipped:
2470   {
2471     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2472     *late = TRUE;
2473     return GST_FLOW_OK;
2474   }
2475 not_syncable:
2476   {
2477     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2478     return GST_FLOW_OK;
2479   }
2480 flushing:
2481   {
2482     GST_DEBUG_OBJECT (basesink, "we are flushing");
2483     return GST_FLOW_WRONG_STATE;
2484   }
2485 preroll_failed:
2486   {
2487     GST_DEBUG_OBJECT (basesink, "preroll failed");
2488     *step_end = FALSE;
2489     return ret;
2490   }
2491 }
2492
2493 static gboolean
2494 gst_base_sink_send_qos (GstBaseSink * basesink,
2495     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2496 {
2497   GstEvent *event;
2498   gboolean res;
2499
2500   /* generate Quality-of-Service event */
2501   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2502       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2503       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2504
2505   event = gst_event_new_qos (proportion, diff, time);
2506
2507   /* send upstream */
2508   res = gst_pad_push_event (basesink->sinkpad, event);
2509
2510   return res;
2511 }
2512
2513 static void
2514 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2515 {
2516   GstBaseSinkPrivate *priv;
2517   GstClockTime start, stop;
2518   GstClockTimeDiff jitter;
2519   GstClockTime pt, entered, left;
2520   GstClockTime duration;
2521   gdouble rate;
2522
2523   priv = sink->priv;
2524
2525   start = priv->current_rstart;
2526
2527   if (priv->current_step.valid)
2528     return;
2529
2530   /* if Quality-of-Service disabled, do nothing */
2531   if (!g_atomic_int_get (&priv->qos_enabled) ||
2532       !GST_CLOCK_TIME_IS_VALID (start))
2533     return;
2534
2535   stop = priv->current_rstop;
2536   jitter = priv->current_jitter;
2537
2538   if (jitter < 0) {
2539     /* this is the time the buffer entered the sink */
2540     if (start < -jitter)
2541       entered = 0;
2542     else
2543       entered = start + jitter;
2544     left = start;
2545   } else {
2546     /* this is the time the buffer entered the sink */
2547     entered = start + jitter;
2548     /* this is the time the buffer left the sink */
2549     left = start + jitter;
2550   }
2551
2552   /* calculate duration of the buffer */
2553   if (GST_CLOCK_TIME_IS_VALID (stop))
2554     duration = stop - start;
2555   else
2556     duration = GST_CLOCK_TIME_NONE;
2557
2558   /* if we have the time when the last buffer left us, calculate
2559    * processing time */
2560   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2561     if (entered > priv->last_left) {
2562       pt = entered - priv->last_left;
2563     } else {
2564       pt = 0;
2565     }
2566   } else {
2567     pt = priv->avg_pt;
2568   }
2569
2570   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2571       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2572       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2573       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2574       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2575       jitter);
2576
2577   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2578       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2579       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2580       priv->avg_rate);
2581
2582   /* collect running averages. for first observations, we copy the
2583    * values */
2584   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2585     priv->avg_duration = duration;
2586   else
2587     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2588
2589   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2590     priv->avg_pt = pt;
2591   else
2592     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2593
2594   if (priv->avg_duration != 0)
2595     rate =
2596         gst_guint64_to_gdouble (priv->avg_pt) /
2597         gst_guint64_to_gdouble (priv->avg_duration);
2598   else
2599     rate = 0.0;
2600
2601   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2602     if (dropped || priv->avg_rate < 0.0) {
2603       priv->avg_rate = rate;
2604     } else {
2605       if (rate > 1.0)
2606         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2607       else
2608         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2609     }
2610   }
2611
2612   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2613       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2614       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2615       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2616
2617
2618   if (priv->avg_rate >= 0.0) {
2619     /* if we have a valid rate, start sending QoS messages */
2620     if (priv->current_jitter < 0) {
2621       /* make sure we never go below 0 when adding the jitter to the
2622        * timestamp. */
2623       if (priv->current_rstart < -priv->current_jitter)
2624         priv->current_jitter = -priv->current_rstart;
2625     }
2626     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2627         priv->current_jitter);
2628   }
2629
2630   /* record when this buffer will leave us */
2631   priv->last_left = left;
2632 }
2633
2634 /* reset all qos measuring */
2635 static void
2636 gst_base_sink_reset_qos (GstBaseSink * sink)
2637 {
2638   GstBaseSinkPrivate *priv;
2639
2640   priv = sink->priv;
2641
2642   priv->last_in_time = GST_CLOCK_TIME_NONE;
2643   priv->last_left = GST_CLOCK_TIME_NONE;
2644   priv->avg_duration = GST_CLOCK_TIME_NONE;
2645   priv->avg_pt = GST_CLOCK_TIME_NONE;
2646   priv->avg_rate = -1.0;
2647   priv->avg_render = GST_CLOCK_TIME_NONE;
2648   priv->rendered = 0;
2649   priv->dropped = 0;
2650
2651 }
2652
2653 /* Checks if the object was scheduled too late.
2654  *
2655  * start/stop contain the raw timestamp start and stop values
2656  * of the object.
2657  *
2658  * status and jitter contain the return values from the clock wait.
2659  *
2660  * returns TRUE if the buffer was too late.
2661  */
2662 static gboolean
2663 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2664     GstClockTime start, GstClockTime stop,
2665     GstClockReturn status, GstClockTimeDiff jitter)
2666 {
2667   gboolean late;
2668   gint64 max_lateness;
2669   GstBaseSinkPrivate *priv;
2670
2671   priv = basesink->priv;
2672
2673   late = FALSE;
2674
2675   /* only for objects that were too late */
2676   if (G_LIKELY (status != GST_CLOCK_EARLY))
2677     goto in_time;
2678
2679   max_lateness = basesink->abidata.ABI.max_lateness;
2680
2681   /* check if frame dropping is enabled */
2682   if (max_lateness == -1)
2683     goto no_drop;
2684
2685   /* only check for buffers */
2686   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2687     goto not_buffer;
2688
2689   /* can't do check if we don't have a timestamp */
2690   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (start)))
2691     goto no_timestamp;
2692
2693   /* we can add a valid stop time */
2694   if (GST_CLOCK_TIME_IS_VALID (stop))
2695     max_lateness += stop;
2696   else
2697     max_lateness += start;
2698
2699   /* if the jitter bigger than duration and lateness we are too late */
2700   if ((late = start + jitter > max_lateness)) {
2701     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2702         "buffer is too late %" GST_TIME_FORMAT
2703         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2704         GST_TIME_ARGS (max_lateness));
2705     /* !!emergency!!, if we did not receive anything valid for more than a
2706      * second, render it anyway so the user sees something */
2707     if (GST_CLOCK_TIME_IS_VALID (priv->last_in_time) &&
2708         start - priv->last_in_time > GST_SECOND) {
2709       late = FALSE;
2710       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2711           (_("A lot of buffers are being dropped.")),
2712           ("There may be a timestamping problem, or this computer is too slow."));
2713       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2714           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2715           GST_TIME_ARGS (priv->last_in_time));
2716     }
2717   }
2718
2719 done:
2720   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_in_time)) {
2721     priv->last_in_time = start;
2722   }
2723   return late;
2724
2725   /* all is fine */
2726 in_time:
2727   {
2728     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2729     goto done;
2730   }
2731 no_drop:
2732   {
2733     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2734     goto done;
2735   }
2736 not_buffer:
2737   {
2738     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2739     return FALSE;
2740   }
2741 no_timestamp:
2742   {
2743     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2744     return FALSE;
2745   }
2746 }
2747
2748 /* called before and after calling the render vmethod. It keeps track of how
2749  * much time was spent in the render method and is used to check if we are
2750  * flooded */
2751 static void
2752 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2753 {
2754   GstBaseSinkPrivate *priv;
2755
2756   priv = basesink->priv;
2757
2758   if (start) {
2759     priv->start = gst_util_get_timestamp ();
2760   } else {
2761     GstClockTime elapsed;
2762
2763     priv->stop = gst_util_get_timestamp ();
2764
2765     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2766
2767     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2768       priv->avg_render = elapsed;
2769     else
2770       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2771
2772     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2773         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2774   }
2775 }
2776
2777 /* with STREAM_LOCK, PREROLL_LOCK,
2778  *
2779  * Synchronize the object on the clock and then render it.
2780  *
2781  * takes ownership of obj.
2782  */
2783 static GstFlowReturn
2784 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2785     guint8 obj_type, gpointer obj)
2786 {
2787   GstFlowReturn ret;
2788   GstBaseSinkClass *bclass;
2789   gboolean late, step_end;
2790   gpointer sync_obj;
2791   GstBaseSinkPrivate *priv;
2792
2793   priv = basesink->priv;
2794
2795   if (OBJ_IS_BUFFERLIST (obj_type)) {
2796     /*
2797      * If buffer list, use the first group buffer within the list
2798      * for syncing
2799      */
2800     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2801     g_assert (NULL != sync_obj);
2802   } else {
2803     sync_obj = obj;
2804   }
2805
2806 again:
2807   late = FALSE;
2808   step_end = FALSE;
2809
2810   /* synchronize this object, non syncable objects return OK
2811    * immediatly. */
2812   ret =
2813       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2814       obj_type);
2815   if (G_UNLIKELY (ret != GST_FLOW_OK))
2816     goto sync_failed;
2817
2818   /* and now render, event or buffer/buffer list. */
2819   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2820     /* drop late buffers unconditionally, let's hope it's unlikely */
2821     if (G_UNLIKELY (late))
2822       goto dropped;
2823
2824     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2825
2826     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2827             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2828       gint do_qos;
2829
2830       /* read once, to get same value before and after */
2831       do_qos = g_atomic_int_get (&priv->qos_enabled);
2832
2833       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2834
2835       /* record rendering time for QoS and stats */
2836       if (do_qos)
2837         gst_base_sink_do_render_stats (basesink, TRUE);
2838
2839       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2840         GstBuffer *buf;
2841
2842         /* For buffer lists do not set last buffer. Creating buffer
2843          * with meaningful data can be done only with memcpy which will
2844          * significantly affect performance */
2845         buf = GST_BUFFER_CAST (obj);
2846         gst_base_sink_set_last_buffer (basesink, buf);
2847
2848         ret = bclass->render (basesink, buf);
2849       } else {
2850         GstBufferList *buflist;
2851
2852         buflist = GST_BUFFER_LIST_CAST (obj);
2853
2854         ret = bclass->render_list (basesink, buflist);
2855       }
2856
2857       if (do_qos)
2858         gst_base_sink_do_render_stats (basesink, FALSE);
2859
2860       if (ret == GST_FLOW_STEP)
2861         goto again;
2862
2863       if (G_UNLIKELY (basesink->flushing))
2864         goto flushing;
2865
2866       priv->rendered++;
2867     }
2868   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
2869     GstEvent *event = GST_EVENT_CAST (obj);
2870     gboolean event_res = TRUE;
2871     GstEventType type;
2872
2873     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2874
2875     type = GST_EVENT_TYPE (event);
2876
2877     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2878         gst_event_type_get_name (type));
2879
2880     if (bclass->event)
2881       event_res = bclass->event (basesink, event);
2882
2883     /* when we get here we could be flushing again when the event handler calls
2884      * _wait_eos(). We have to ignore this object in that case. */
2885     if (G_UNLIKELY (basesink->flushing))
2886       goto flushing;
2887
2888     if (G_LIKELY (event_res)) {
2889       guint32 seqnum;
2890
2891       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2892       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2893
2894       switch (type) {
2895         case GST_EVENT_EOS:
2896         {
2897           GstMessage *message;
2898
2899           /* the EOS event is completely handled so we mark
2900            * ourselves as being in the EOS state. eos is also
2901            * protected by the object lock so we can read it when
2902            * answering the POSITION query. */
2903           GST_OBJECT_LOCK (basesink);
2904           basesink->eos = TRUE;
2905           GST_OBJECT_UNLOCK (basesink);
2906
2907           /* ok, now we can post the message */
2908           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2909
2910           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2911           gst_message_set_seqnum (message, seqnum);
2912           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2913           break;
2914         }
2915         case GST_EVENT_NEWSEGMENT:
2916           /* configure the segment */
2917           gst_base_sink_configure_segment (basesink, pad, event,
2918               &basesink->segment);
2919           break;
2920         case GST_EVENT_SINK_MESSAGE:{
2921           GstMessage *msg = NULL;
2922
2923           gst_event_parse_sink_message (event, &msg);
2924
2925           if (msg)
2926             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
2927         }
2928         default:
2929           break;
2930       }
2931     }
2932   } else {
2933     g_return_val_if_reached (GST_FLOW_ERROR);
2934   }
2935
2936 done:
2937   if (step_end) {
2938     /* the step ended, check if we need to activate a new step */
2939     GST_DEBUG_OBJECT (basesink, "step ended");
2940     stop_stepping (basesink, &basesink->segment, &priv->current_step,
2941         priv->current_rstart, priv->current_rstop, basesink->eos);
2942     goto again;
2943   }
2944
2945   gst_base_sink_perform_qos (basesink, late);
2946
2947   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2948   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
2949   return ret;
2950
2951   /* ERRORS */
2952 sync_failed:
2953   {
2954     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2955     goto done;
2956   }
2957 dropped:
2958   {
2959     priv->dropped++;
2960     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2961
2962     if (g_atomic_int_get (&priv->qos_enabled)) {
2963       GstMessage *qos_msg;
2964       GstClockTime timestamp, duration;
2965
2966       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
2967       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
2968
2969       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2970           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
2971           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
2972           GST_TIME_ARGS (priv->current_rstart),
2973           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
2974           GST_TIME_ARGS (duration));
2975       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2976           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
2977           priv->rendered, priv->dropped);
2978
2979       qos_msg =
2980           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
2981           priv->current_rstart, priv->current_sstart, timestamp, duration);
2982       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
2983           1000000);
2984       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
2985           priv->dropped);
2986       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
2987     }
2988     goto done;
2989   }
2990 flushing:
2991   {
2992     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2993     gst_mini_object_unref (obj);
2994     return GST_FLOW_WRONG_STATE;
2995   }
2996 }
2997
2998 /* with STREAM_LOCK, PREROLL_LOCK
2999  *
3000  * Perform preroll on the given object. For buffers this means
3001  * calling the preroll subclass method.
3002  * If that succeeds, the state will be commited.
3003  *
3004  * function does not take ownership of obj.
3005  */
3006 static GstFlowReturn
3007 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3008     GstMiniObject * obj)
3009 {
3010   GstFlowReturn ret;
3011
3012   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3013
3014   /* if it's a buffer, we need to call the preroll method */
3015   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3016     GstBaseSinkClass *bclass;
3017     GstBuffer *buf;
3018     GstClockTime timestamp;
3019
3020     if (OBJ_IS_BUFFERLIST (obj_type)) {
3021       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3022       g_assert (NULL != buf);
3023     } else {
3024       buf = GST_BUFFER_CAST (obj);
3025     }
3026
3027     timestamp = GST_BUFFER_TIMESTAMP (buf);
3028
3029     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3030         GST_TIME_ARGS (timestamp));
3031
3032     /*
3033      * For buffer lists do not set last buffer. Creating buffer
3034      * with meaningful data can be done only with memcpy which will
3035      * significantly affect performance
3036      */
3037     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3038       gst_base_sink_set_last_buffer (basesink, buf);
3039     }
3040
3041     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3042     if (bclass->preroll)
3043       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3044         goto preroll_failed;
3045
3046     basesink->priv->call_preroll = FALSE;
3047   }
3048
3049   /* commit state */
3050   if (G_LIKELY (basesink->playing_async)) {
3051     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3052       goto stopping;
3053   }
3054
3055   return GST_FLOW_OK;
3056
3057   /* ERRORS */
3058 preroll_failed:
3059   {
3060     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3061     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3062     return ret;
3063   }
3064 stopping:
3065   {
3066     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3067     return GST_FLOW_WRONG_STATE;
3068   }
3069 }
3070
3071 /* with STREAM_LOCK, PREROLL_LOCK
3072  *
3073  * Queue an object for rendering.
3074  * The first prerollable object queued will complete the preroll. If the
3075  * preroll queue if filled, we render all the objects in the queue.
3076  *
3077  * This function takes ownership of the object.
3078  */
3079 static GstFlowReturn
3080 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3081     guint8 obj_type, gpointer obj, gboolean prerollable)
3082 {
3083   GstFlowReturn ret = GST_FLOW_OK;
3084   gint length;
3085   GQueue *q;
3086
3087   if (G_UNLIKELY (basesink->need_preroll)) {
3088     if (G_LIKELY (prerollable))
3089       basesink->preroll_queued++;
3090
3091     length = basesink->preroll_queued;
3092
3093     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3094
3095     /* first prerollable item needs to finish the preroll */
3096     if (length == 1) {
3097       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3098       if (G_UNLIKELY (ret != GST_FLOW_OK))
3099         goto preroll_failed;
3100     }
3101     /* need to recheck if we need preroll, commmit state during preroll
3102      * could have made us not need more preroll. */
3103     if (G_UNLIKELY (basesink->need_preroll)) {
3104       /* see if we can render now, if we can't add the object to the preroll
3105        * queue. */
3106       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3107         goto more_preroll;
3108     }
3109   }
3110   /* we can start rendering (or blocking) the queued object
3111    * if any. */
3112   q = basesink->preroll_queue;
3113   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3114     GstMiniObject *o;
3115
3116     o = g_queue_pop_head (q);
3117     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3118
3119     /* do something with the return value */
3120     ret =
3121         gst_base_sink_render_object (basesink, pad,
3122         GST_IS_BUFFER (o) ? _PR_IS_BUFFER : _PR_IS_EVENT, o);
3123     if (ret != GST_FLOW_OK)
3124       goto dequeue_failed;
3125   }
3126
3127   /* now render the object */
3128   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3129   basesink->preroll_queued = 0;
3130
3131   return ret;
3132
3133   /* special cases */
3134 preroll_failed:
3135   {
3136     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3137         gst_flow_get_name (ret));
3138     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3139     return ret;
3140   }
3141 more_preroll:
3142   {
3143     /* add object to the queue and return */
3144     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3145         length, basesink->preroll_queue_max_len);
3146     g_queue_push_tail (basesink->preroll_queue, obj);
3147     return GST_FLOW_OK;
3148   }
3149 dequeue_failed:
3150   {
3151     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3152         gst_flow_get_name (ret));
3153     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3154     return ret;
3155   }
3156 }
3157
3158 /* with STREAM_LOCK
3159  *
3160  * This function grabs the PREROLL_LOCK and adds the object to
3161  * the queue.
3162  *
3163  * This function takes ownership of obj.
3164  *
3165  * Note: Only GstEvent seem to be passed to this private method
3166  */
3167 static GstFlowReturn
3168 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3169     GstMiniObject * obj, gboolean prerollable)
3170 {
3171   GstFlowReturn ret;
3172
3173   GST_PAD_PREROLL_LOCK (pad);
3174   if (G_UNLIKELY (basesink->flushing))
3175     goto flushing;
3176
3177   if (G_UNLIKELY (basesink->priv->received_eos))
3178     goto was_eos;
3179
3180   ret =
3181       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3182       prerollable);
3183   GST_PAD_PREROLL_UNLOCK (pad);
3184
3185   return ret;
3186
3187   /* ERRORS */
3188 flushing:
3189   {
3190     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3191     GST_PAD_PREROLL_UNLOCK (pad);
3192     gst_mini_object_unref (obj);
3193     return GST_FLOW_WRONG_STATE;
3194   }
3195 was_eos:
3196   {
3197     GST_DEBUG_OBJECT (basesink,
3198         "we are EOS, dropping object, return UNEXPECTED");
3199     GST_PAD_PREROLL_UNLOCK (pad);
3200     gst_mini_object_unref (obj);
3201     return GST_FLOW_UNEXPECTED;
3202   }
3203 }
3204
3205 static void
3206 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3207 {
3208   /* make sure we are not blocked on the clock also clear any pending
3209    * eos state. */
3210   gst_base_sink_set_flushing (basesink, pad, TRUE);
3211
3212   /* we grab the stream lock but that is not needed since setting the
3213    * sink to flushing would make sure no state commit is being done
3214    * anymore */
3215   GST_PAD_STREAM_LOCK (pad);
3216   gst_base_sink_reset_qos (basesink);
3217   if (basesink->priv->async_enabled) {
3218     /* and we need to commit our state again on the next
3219      * prerolled buffer */
3220     basesink->playing_async = TRUE;
3221     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3222   } else {
3223     basesink->priv->have_latency = TRUE;
3224     basesink->need_preroll = FALSE;
3225   }
3226   gst_base_sink_set_last_buffer (basesink, NULL);
3227   GST_PAD_STREAM_UNLOCK (pad);
3228 }
3229
3230 static void
3231 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3232 {
3233   /* unset flushing so we can accept new data, this also flushes out any EOS
3234    * event. */
3235   gst_base_sink_set_flushing (basesink, pad, FALSE);
3236
3237   /* for position reporting */
3238   GST_OBJECT_LOCK (basesink);
3239   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3240   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3241   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3242   basesink->priv->call_preroll = TRUE;
3243   basesink->priv->current_step.valid = FALSE;
3244   basesink->priv->pending_step.valid = FALSE;
3245   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3246     /* we need new segment info after the flush. */
3247     basesink->have_newsegment = FALSE;
3248     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3249     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3250   }
3251   GST_OBJECT_UNLOCK (basesink);
3252 }
3253
3254 static gboolean
3255 gst_base_sink_event (GstPad * pad, GstEvent * event)
3256 {
3257   GstBaseSink *basesink;
3258   gboolean result = TRUE;
3259   GstBaseSinkClass *bclass;
3260
3261   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3262
3263   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3264
3265   GST_DEBUG_OBJECT (basesink, "reveived event %p %" GST_PTR_FORMAT, event,
3266       event);
3267
3268   switch (GST_EVENT_TYPE (event)) {
3269     case GST_EVENT_EOS:
3270     {
3271       GstFlowReturn ret;
3272
3273       GST_PAD_PREROLL_LOCK (pad);
3274       if (G_UNLIKELY (basesink->flushing))
3275         goto flushing;
3276
3277       if (G_UNLIKELY (basesink->priv->received_eos)) {
3278         /* we can't accept anything when we are EOS */
3279         result = FALSE;
3280         gst_event_unref (event);
3281       } else {
3282         /* we set the received EOS flag here so that we can use it when testing if
3283          * we are prerolled and to refuse more buffers. */
3284         basesink->priv->received_eos = TRUE;
3285
3286         /* EOS is a prerollable object, we call the unlocked version because it
3287          * does not check the received_eos flag. */
3288         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3289             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3290         if (G_UNLIKELY (ret != GST_FLOW_OK))
3291           result = FALSE;
3292       }
3293       GST_PAD_PREROLL_UNLOCK (pad);
3294       break;
3295     }
3296     case GST_EVENT_NEWSEGMENT:
3297     {
3298       GstFlowReturn ret;
3299       gboolean update;
3300
3301       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3302
3303       GST_PAD_PREROLL_LOCK (pad);
3304       if (G_UNLIKELY (basesink->flushing))
3305         goto flushing;
3306
3307       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3308           NULL, NULL);
3309
3310       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3311         /* we can't accept anything when we are EOS */
3312         result = FALSE;
3313         gst_event_unref (event);
3314       } else {
3315         /* the new segment is a non prerollable item and does not block anything,
3316          * we need to configure the current clipping segment and insert the event
3317          * in the queue to serialize it with the buffers for rendering. */
3318         gst_base_sink_configure_segment (basesink, pad, event,
3319             basesink->abidata.ABI.clip_segment);
3320
3321         ret =
3322             gst_base_sink_queue_object_unlocked (basesink, pad,
3323             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3324         if (G_UNLIKELY (ret != GST_FLOW_OK))
3325           result = FALSE;
3326         else {
3327           GST_OBJECT_LOCK (basesink);
3328           basesink->have_newsegment = TRUE;
3329           GST_OBJECT_UNLOCK (basesink);
3330         }
3331       }
3332       GST_PAD_PREROLL_UNLOCK (pad);
3333       break;
3334     }
3335     case GST_EVENT_FLUSH_START:
3336       if (bclass->event)
3337         bclass->event (basesink, event);
3338
3339       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3340
3341       gst_base_sink_flush_start (basesink, pad);
3342
3343       gst_event_unref (event);
3344       break;
3345     case GST_EVENT_FLUSH_STOP:
3346       if (bclass->event)
3347         bclass->event (basesink, event);
3348
3349       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3350
3351       gst_base_sink_flush_stop (basesink, pad);
3352
3353       gst_event_unref (event);
3354       break;
3355     default:
3356       /* other events are sent to queue or subclass depending on if they
3357        * are serialized. */
3358       if (GST_EVENT_IS_SERIALIZED (event)) {
3359         gst_base_sink_queue_object (basesink, pad,
3360             GST_MINI_OBJECT_CAST (event), FALSE);
3361       } else {
3362         if (bclass->event)
3363           bclass->event (basesink, event);
3364         gst_event_unref (event);
3365       }
3366       break;
3367   }
3368 done:
3369   gst_object_unref (basesink);
3370
3371   return result;
3372
3373   /* ERRORS */
3374 flushing:
3375   {
3376     GST_DEBUG_OBJECT (basesink, "we are flushing");
3377     GST_PAD_PREROLL_UNLOCK (pad);
3378     result = FALSE;
3379     gst_event_unref (event);
3380     goto done;
3381   }
3382 }
3383
3384 /* default implementation to calculate the start and end
3385  * timestamps on a buffer, subclasses can override
3386  */
3387 static void
3388 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3389     GstClockTime * start, GstClockTime * end)
3390 {
3391   GstClockTime timestamp, duration;
3392
3393   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3394   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3395
3396     /* get duration to calculate end time */
3397     duration = GST_BUFFER_DURATION (buffer);
3398     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3399       *end = timestamp + duration;
3400     }
3401     *start = timestamp;
3402   }
3403 }
3404
3405 /* must be called with PREROLL_LOCK */
3406 static gboolean
3407 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3408 {
3409   gboolean is_prerolled, res;
3410
3411   /* we have 2 cases where the PREROLL_LOCK is released:
3412    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3413    *  2) we are syncing on the clock
3414    */
3415   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3416   res = !is_prerolled;
3417
3418   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3419       basesink->have_preroll, basesink->priv->received_eos, res);
3420
3421   return res;
3422 }
3423
3424 /* with STREAM_LOCK, PREROLL_LOCK
3425  *
3426  * Takes a buffer and compare the timestamps with the last segment.
3427  * If the buffer falls outside of the segment boundaries, drop it.
3428  * Else queue the buffer for preroll and rendering.
3429  *
3430  * This function takes ownership of the buffer.
3431  */
3432 static GstFlowReturn
3433 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3434     guint8 obj_type, gpointer obj)
3435 {
3436   GstBaseSinkClass *bclass;
3437   GstFlowReturn result;
3438   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3439   GstSegment *clip_segment;
3440   GstBuffer *time_buf;
3441
3442   if (G_UNLIKELY (basesink->flushing))
3443     goto flushing;
3444
3445   if (G_UNLIKELY (basesink->priv->received_eos))
3446     goto was_eos;
3447
3448   if (OBJ_IS_BUFFERLIST (obj_type)) {
3449     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3450     g_assert (NULL != time_buf);
3451   } else {
3452     time_buf = GST_BUFFER_CAST (obj);
3453   }
3454
3455   /* for code clarity */
3456   clip_segment = basesink->abidata.ABI.clip_segment;
3457
3458   if (G_UNLIKELY (!basesink->have_newsegment)) {
3459     gboolean sync;
3460
3461     sync = gst_base_sink_get_sync (basesink);
3462     if (sync) {
3463       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3464           (_("Internal data flow problem.")),
3465           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3466     }
3467
3468     /* this means this sink will assume timestamps start from 0 */
3469     GST_OBJECT_LOCK (basesink);
3470     clip_segment->start = 0;
3471     clip_segment->stop = -1;
3472     basesink->segment.start = 0;
3473     basesink->segment.stop = -1;
3474     basesink->have_newsegment = TRUE;
3475     GST_OBJECT_UNLOCK (basesink);
3476   }
3477
3478   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3479
3480   /* check if the buffer needs to be dropped, we first ask the subclass for the
3481    * start and end */
3482   if (bclass->get_times)
3483     bclass->get_times (basesink, time_buf, &start, &end);
3484
3485   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3486     /* if the subclass does not want sync, we use our own values so that we at
3487      * least clip the buffer to the segment */
3488     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3489   }
3490
3491   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3492       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3493
3494   /* a dropped buffer does not participate in anything */
3495   if (GST_CLOCK_TIME_IS_VALID (start) &&
3496       (clip_segment->format == GST_FORMAT_TIME)) {
3497     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3498                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3499       goto out_of_segment;
3500   }
3501
3502   /* now we can process the buffer in the queue, this function takes ownership
3503    * of the buffer */
3504   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3505       obj_type, obj, TRUE);
3506   return result;
3507
3508   /* ERRORS */
3509 flushing:
3510   {
3511     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3512     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3513     return GST_FLOW_WRONG_STATE;
3514   }
3515 was_eos:
3516   {
3517     GST_DEBUG_OBJECT (basesink,
3518         "we are EOS, dropping object, return UNEXPECTED");
3519     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3520     return GST_FLOW_UNEXPECTED;
3521   }
3522 out_of_segment:
3523   {
3524     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3525     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3526     return GST_FLOW_OK;
3527   }
3528 }
3529
3530 /* with STREAM_LOCK
3531  */
3532 static GstFlowReturn
3533 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3534     guint8 obj_type, gpointer obj)
3535 {
3536   GstFlowReturn result;
3537
3538   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3539     goto wrong_mode;
3540
3541   GST_PAD_PREROLL_LOCK (pad);
3542   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3543   GST_PAD_PREROLL_UNLOCK (pad);
3544
3545 done:
3546   return result;
3547
3548   /* ERRORS */
3549 wrong_mode:
3550   {
3551     GST_OBJECT_LOCK (pad);
3552     GST_WARNING_OBJECT (basesink,
3553         "Push on pad %s:%s, but it was not activated in push mode",
3554         GST_DEBUG_PAD_NAME (pad));
3555     GST_OBJECT_UNLOCK (pad);
3556     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3557     /* we don't post an error message this will signal to the peer
3558      * pushing that EOS is reached. */
3559     result = GST_FLOW_UNEXPECTED;
3560     goto done;
3561   }
3562 }
3563
3564 static GstFlowReturn
3565 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3566 {
3567   GstBaseSink *basesink;
3568
3569   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3570
3571   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3572 }
3573
3574 static GstFlowReturn
3575 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3576 {
3577   GstBaseSink *basesink;
3578   GstBaseSinkClass *bclass;
3579   GstFlowReturn result;
3580
3581   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3582   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3583
3584   if (G_LIKELY (bclass->render_list)) {
3585     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3586   } else {
3587     GstBufferListIterator *it;
3588     GstBuffer *group;
3589
3590     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3591
3592     it = gst_buffer_list_iterate (list);
3593
3594     if (gst_buffer_list_iterator_next_group (it)) {
3595       do {
3596         group = gst_buffer_list_iterator_merge_group (it);
3597         if (group == NULL) {
3598           group = gst_buffer_new ();
3599           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3600         } else {
3601           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3602         }
3603         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3604       } while (result == GST_FLOW_OK
3605           && gst_buffer_list_iterator_next_group (it));
3606     } else {
3607       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3608       result =
3609           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3610           gst_buffer_new ());
3611     }
3612     gst_buffer_list_iterator_free (it);
3613     gst_buffer_list_unref (list);
3614   }
3615   return result;
3616 }
3617
3618
3619 static gboolean
3620 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3621 {
3622   gboolean res = TRUE;
3623
3624   /* update our offset if the start/stop position was updated */
3625   if (segment->format == GST_FORMAT_BYTES) {
3626     segment->time = segment->start;
3627   } else if (segment->start == 0) {
3628     /* seek to start, we can implement a default for this. */
3629     segment->time = 0;
3630   } else {
3631     res = FALSE;
3632     GST_INFO_OBJECT (sink, "Can't do a default seek");
3633   }
3634
3635   return res;
3636 }
3637
3638 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3639
3640 static gboolean
3641 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3642     GstEvent * event, GstSegment * segment)
3643 {
3644   /* By default, we try one of 2 things:
3645    *   - For absolute seek positions, convert the requested position to our
3646    *     configured processing format and place it in the output segment \
3647    *   - For relative seek positions, convert our current (input) values to the
3648    *     seek format, adjust by the relative seek offset and then convert back to
3649    *     the processing format
3650    */
3651   GstSeekType cur_type, stop_type;
3652   gint64 cur, stop;
3653   GstSeekFlags flags;
3654   GstFormat seek_format, dest_format;
3655   gdouble rate;
3656   gboolean update;
3657   gboolean res = TRUE;
3658
3659   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3660       &cur_type, &cur, &stop_type, &stop);
3661   dest_format = segment->format;
3662
3663   if (seek_format == dest_format) {
3664     gst_segment_set_seek (segment, rate, seek_format, flags,
3665         cur_type, cur, stop_type, stop, &update);
3666     return TRUE;
3667   }
3668
3669   if (cur_type != GST_SEEK_TYPE_NONE) {
3670     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3671     res =
3672         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3673         &cur);
3674     cur_type = GST_SEEK_TYPE_SET;
3675   }
3676
3677   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3678     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3679     res =
3680         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3681         &stop);
3682     stop_type = GST_SEEK_TYPE_SET;
3683   }
3684
3685   /* And finally, configure our output segment in the desired format */
3686   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3687       stop_type, stop, &update);
3688
3689   if (!res)
3690     goto no_format;
3691
3692   return res;
3693
3694 no_format:
3695   {
3696     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3697     return FALSE;
3698   }
3699 }
3700
3701 /* perform a seek, only executed in pull mode */
3702 static gboolean
3703 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3704 {
3705   gboolean flush;
3706   gdouble rate;
3707   GstFormat seek_format, dest_format;
3708   GstSeekFlags flags;
3709   GstSeekType cur_type, stop_type;
3710   gboolean seekseg_configured = FALSE;
3711   gint64 cur, stop;
3712   gboolean update, res = TRUE;
3713   GstSegment seeksegment;
3714
3715   dest_format = sink->segment.format;
3716
3717   if (event) {
3718     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3719     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3720         &cur_type, &cur, &stop_type, &stop);
3721
3722     flush = flags & GST_SEEK_FLAG_FLUSH;
3723   } else {
3724     GST_DEBUG_OBJECT (sink, "performing seek without event");
3725     flush = FALSE;
3726   }
3727
3728   if (flush) {
3729     GST_DEBUG_OBJECT (sink, "flushing upstream");
3730     gst_pad_push_event (pad, gst_event_new_flush_start ());
3731     gst_base_sink_flush_start (sink, pad);
3732   } else {
3733     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3734   }
3735
3736   GST_PAD_STREAM_LOCK (pad);
3737
3738   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3739    * copy the current segment info into the temp segment that we can actually
3740    * attempt the seek with. We only update the real segment if the seek suceeds. */
3741   if (!seekseg_configured) {
3742     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3743
3744     /* now configure the final seek segment */
3745     if (event) {
3746       if (sink->segment.format != seek_format) {
3747         /* OK, here's where we give the subclass a chance to convert the relative
3748          * seek into an absolute one in the processing format. We set up any
3749          * absolute seek above, before taking the stream lock. */
3750         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3751                 &seeksegment)) {
3752           GST_DEBUG_OBJECT (sink,
3753               "Preparing the seek failed after flushing. " "Aborting seek");
3754           res = FALSE;
3755         }
3756       } else {
3757         /* The seek format matches our processing format, no need to ask the
3758          * the subclass to configure the segment. */
3759         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3760             cur_type, cur, stop_type, stop, &update);
3761       }
3762     }
3763     /* Else, no seek event passed, so we're just (re)starting the
3764        current segment. */
3765   }
3766
3767   if (res) {
3768     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3769         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3770         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3771
3772     /* do the seek, segment.last_stop contains the new position. */
3773     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3774   }
3775
3776
3777   if (flush) {
3778     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3779     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3780     gst_base_sink_flush_stop (sink, pad);
3781   } else if (res && sink->abidata.ABI.running) {
3782     /* we are running the current segment and doing a non-flushing seek,
3783      * close the segment first based on the last_stop. */
3784     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3785         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3786   }
3787
3788   /* The subclass must have converted the segment to the processing format
3789    * by now */
3790   if (res && seeksegment.format != dest_format) {
3791     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3792         "in the correct format. Aborting seek.");
3793     res = FALSE;
3794   }
3795
3796   /* if successfull seek, we update our real segment and push
3797    * out the new segment. */
3798   if (res) {
3799     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3800
3801     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3802       gst_element_post_message (GST_ELEMENT (sink),
3803           gst_message_new_segment_start (GST_OBJECT (sink),
3804               sink->segment.format, sink->segment.last_stop));
3805     }
3806   }
3807
3808   sink->priv->discont = TRUE;
3809   sink->abidata.ABI.running = TRUE;
3810
3811   GST_PAD_STREAM_UNLOCK (pad);
3812
3813   return res;
3814 }
3815
3816 static void
3817 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3818     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3819     gboolean flush, gboolean intermediate)
3820 {
3821   GST_OBJECT_LOCK (sink);
3822   pending->seqnum = seqnum;
3823   pending->format = format;
3824   pending->amount = amount;
3825   pending->position = 0;
3826   pending->rate = rate;
3827   pending->flush = flush;
3828   pending->intermediate = intermediate;
3829   pending->valid = TRUE;
3830   /* flush invalidates the current stepping segment */
3831   if (flush)
3832     current->valid = FALSE;
3833   GST_OBJECT_UNLOCK (sink);
3834 }
3835
3836 static gboolean
3837 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3838 {
3839   GstBaseSinkPrivate *priv;
3840   GstBaseSinkClass *bclass;
3841   gboolean flush, intermediate;
3842   gdouble rate;
3843   GstFormat format;
3844   guint64 amount;
3845   guint seqnum;
3846   GstStepInfo *pending, *current;
3847   GstMessage *message;
3848
3849   bclass = GST_BASE_SINK_GET_CLASS (sink);
3850   priv = sink->priv;
3851
3852   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3853
3854   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3855   seqnum = gst_event_get_seqnum (event);
3856
3857   pending = &priv->pending_step;
3858   current = &priv->current_step;
3859
3860   /* post message first */
3861   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3862       amount, rate, flush, intermediate);
3863   gst_message_set_seqnum (message, seqnum);
3864   gst_element_post_message (GST_ELEMENT (sink), message);
3865
3866   if (flush) {
3867     /* we need to call ::unlock before locking PREROLL_LOCK
3868      * since we lock it before going into ::render */
3869     if (bclass->unlock)
3870       bclass->unlock (sink);
3871
3872     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3873     /* now that we have the PREROLL lock, clear our unlock request */
3874     if (bclass->unlock_stop)
3875       bclass->unlock_stop (sink);
3876
3877     /* update the stepinfo and make it valid */
3878     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3879         intermediate);
3880
3881     if (sink->priv->async_enabled) {
3882       /* and we need to commit our state again on the next
3883        * prerolled buffer */
3884       sink->playing_async = TRUE;
3885       priv->pending_step.need_preroll = TRUE;
3886       sink->need_preroll = FALSE;
3887       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3888     } else {
3889       sink->priv->have_latency = TRUE;
3890       sink->need_preroll = FALSE;
3891     }
3892     priv->current_sstart = GST_CLOCK_TIME_NONE;
3893     priv->current_sstop = GST_CLOCK_TIME_NONE;
3894     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3895     priv->call_preroll = TRUE;
3896     gst_base_sink_set_last_buffer (sink, NULL);
3897     gst_base_sink_reset_qos (sink);
3898
3899     if (sink->clock_id) {
3900       gst_clock_id_unschedule (sink->clock_id);
3901     }
3902
3903     if (sink->have_preroll) {
3904       GST_DEBUG_OBJECT (sink, "signal waiter");
3905       priv->step_unlock = TRUE;
3906       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3907     }
3908     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3909   } else {
3910     /* update the stepinfo and make it valid */
3911     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3912         intermediate);
3913   }
3914
3915   return TRUE;
3916 }
3917
3918 /* with STREAM_LOCK
3919  */
3920 static void
3921 gst_base_sink_loop (GstPad * pad)
3922 {
3923   GstBaseSink *basesink;
3924   GstBuffer *buf = NULL;
3925   GstFlowReturn result;
3926   guint blocksize;
3927   guint64 offset;
3928
3929   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3930
3931   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3932
3933   if ((blocksize = basesink->priv->blocksize) == 0)
3934     blocksize = -1;
3935
3936   offset = basesink->segment.last_stop;
3937
3938   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3939       offset, blocksize);
3940
3941   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3942   if (G_UNLIKELY (result != GST_FLOW_OK))
3943     goto paused;
3944
3945   if (G_UNLIKELY (buf == NULL))
3946     goto no_buffer;
3947
3948   offset += GST_BUFFER_SIZE (buf);
3949
3950   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3951
3952   GST_PAD_PREROLL_LOCK (pad);
3953   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
3954   GST_PAD_PREROLL_UNLOCK (pad);
3955   if (G_UNLIKELY (result != GST_FLOW_OK))
3956     goto paused;
3957
3958   return;
3959
3960   /* ERRORS */
3961 paused:
3962   {
3963     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3964         gst_flow_get_name (result));
3965     gst_pad_pause_task (pad);
3966     if (result == GST_FLOW_UNEXPECTED) {
3967       /* perform EOS logic */
3968       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3969         gst_element_post_message (GST_ELEMENT_CAST (basesink),
3970             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3971                 basesink->segment.format, basesink->segment.last_stop));
3972       } else {
3973         gst_base_sink_event (pad, gst_event_new_eos ());
3974       }
3975     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
3976       /* for fatal errors we post an error message, post the error
3977        * first so the app knows about the error first. 
3978        * wrong-state is not a fatal error because it happens due to
3979        * flushing and posting an error message in that case is the
3980        * wrong thing to do, e.g. when basesrc is doing a flushing
3981        * seek. */
3982       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3983           (_("Internal data stream error.")),
3984           ("stream stopped, reason %s", gst_flow_get_name (result)));
3985       gst_base_sink_event (pad, gst_event_new_eos ());
3986     }
3987     return;
3988   }
3989 no_buffer:
3990   {
3991     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3992     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3993         (_("Internal data flow error.")), ("element returned NULL buffer"));
3994     result = GST_FLOW_ERROR;
3995     goto paused;
3996   }
3997 }
3998
3999 static gboolean
4000 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4001     gboolean flushing)
4002 {
4003   GstBaseSinkClass *bclass;
4004
4005   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4006
4007   if (flushing) {
4008     /* unlock any subclasses, we need to do this before grabbing the
4009      * PREROLL_LOCK since we hold this lock before going into ::render. */
4010     if (bclass->unlock)
4011       bclass->unlock (basesink);
4012   }
4013
4014   GST_PAD_PREROLL_LOCK (pad);
4015   basesink->flushing = flushing;
4016   if (flushing) {
4017     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4018     if (bclass->unlock_stop)
4019       bclass->unlock_stop (basesink);
4020
4021     /* set need_preroll before we unblock the clock. If the clock is unblocked
4022      * before timing out, we can reuse the buffer for preroll. */
4023     basesink->need_preroll = TRUE;
4024
4025     /* step 2, unblock clock sync (if any) or any other blocking thing */
4026     if (basesink->clock_id) {
4027       gst_clock_id_unschedule (basesink->clock_id);
4028     }
4029
4030     /* flush out the data thread if it's locked in finish_preroll, this will
4031      * also flush out the EOS state */
4032     GST_DEBUG_OBJECT (basesink,
4033         "flushing out data thread, need preroll to TRUE");
4034     gst_base_sink_preroll_queue_flush (basesink, pad);
4035   }
4036   GST_PAD_PREROLL_UNLOCK (pad);
4037
4038   return TRUE;
4039 }
4040
4041 static gboolean
4042 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4043 {
4044   gboolean result;
4045
4046   if (active) {
4047     /* start task */
4048     result = gst_pad_start_task (basesink->sinkpad,
4049         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4050   } else {
4051     /* step 2, make sure streaming finishes */
4052     result = gst_pad_stop_task (basesink->sinkpad);
4053   }
4054
4055   return result;
4056 }
4057
4058 static gboolean
4059 gst_base_sink_pad_activate (GstPad * pad)
4060 {
4061   gboolean result = FALSE;
4062   GstBaseSink *basesink;
4063
4064   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4065
4066   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4067
4068   gst_base_sink_set_flushing (basesink, pad, FALSE);
4069
4070   /* we need to have the pull mode enabled */
4071   if (!basesink->can_activate_pull) {
4072     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4073     goto fallback;
4074   }
4075
4076   /* check if downstreams supports pull mode at all */
4077   if (!gst_pad_check_pull_range (pad)) {
4078     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4079     goto fallback;
4080   }
4081
4082   /* set the pad mode before starting the task so that it's in the
4083    * correct state for the new thread. also the sink set_caps and get_caps
4084    * function checks this */
4085   basesink->pad_mode = GST_ACTIVATE_PULL;
4086
4087   /* we first try to negotiate a format so that when we try to activate
4088    * downstream, it knows about our format */
4089   if (!gst_base_sink_negotiate_pull (basesink)) {
4090     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4091     goto fallback;
4092   }
4093
4094   /* ok activate now */
4095   if (!gst_pad_activate_pull (pad, TRUE)) {
4096     /* clear any pending caps */
4097     GST_OBJECT_LOCK (basesink);
4098     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4099     GST_OBJECT_UNLOCK (basesink);
4100     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4101     goto fallback;
4102   }
4103
4104   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4105   result = TRUE;
4106   goto done;
4107
4108   /* push mode fallback */
4109 fallback:
4110   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4111   if ((result = gst_pad_activate_push (pad, TRUE))) {
4112     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4113   }
4114
4115 done:
4116   if (!result) {
4117     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4118     gst_base_sink_set_flushing (basesink, pad, TRUE);
4119   }
4120
4121   gst_object_unref (basesink);
4122
4123   return result;
4124 }
4125
4126 static gboolean
4127 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4128 {
4129   gboolean result;
4130   GstBaseSink *basesink;
4131
4132   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4133
4134   if (active) {
4135     if (!basesink->can_activate_push) {
4136       result = FALSE;
4137       basesink->pad_mode = GST_ACTIVATE_NONE;
4138     } else {
4139       result = TRUE;
4140       basesink->pad_mode = GST_ACTIVATE_PUSH;
4141     }
4142   } else {
4143     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4144       g_warning ("Internal GStreamer activation error!!!");
4145       result = FALSE;
4146     } else {
4147       gst_base_sink_set_flushing (basesink, pad, TRUE);
4148       result = TRUE;
4149       basesink->pad_mode = GST_ACTIVATE_NONE;
4150     }
4151   }
4152
4153   gst_object_unref (basesink);
4154
4155   return result;
4156 }
4157
4158 static gboolean
4159 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4160 {
4161   GstCaps *caps;
4162   gboolean result;
4163
4164   result = FALSE;
4165
4166   /* this returns the intersection between our caps and the peer caps. If there
4167    * is no peer, it returns NULL and we can't operate in pull mode so we can
4168    * fail the negotiation. */
4169   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4170   if (caps == NULL || gst_caps_is_empty (caps))
4171     goto no_caps_possible;
4172
4173   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4174
4175   caps = gst_caps_make_writable (caps);
4176   /* get the first (prefered) format */
4177   gst_caps_truncate (caps);
4178   /* try to fixate */
4179   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4180
4181   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4182
4183   if (gst_caps_is_any (caps)) {
4184     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4185         "allowing pull()");
4186     /* neither side has template caps in this case, so they are prepared for
4187        pull() without setcaps() */
4188     result = TRUE;
4189   } else if (gst_caps_is_fixed (caps)) {
4190     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4191       goto could_not_set_caps;
4192
4193     GST_OBJECT_LOCK (basesink);
4194     gst_caps_replace (&basesink->priv->pull_caps, caps);
4195     GST_OBJECT_UNLOCK (basesink);
4196
4197     result = TRUE;
4198   }
4199
4200   gst_caps_unref (caps);
4201
4202   return result;
4203
4204 no_caps_possible:
4205   {
4206     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4207     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4208     if (caps)
4209       gst_caps_unref (caps);
4210     return FALSE;
4211   }
4212 could_not_set_caps:
4213   {
4214     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4215     gst_caps_unref (caps);
4216     return FALSE;
4217   }
4218 }
4219
4220 /* this won't get called until we implement an activate function */
4221 static gboolean
4222 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4223 {
4224   gboolean result = FALSE;
4225   GstBaseSink *basesink;
4226   GstBaseSinkClass *bclass;
4227
4228   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4229   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4230
4231   if (active) {
4232     GstFormat format;
4233     gint64 duration;
4234
4235     /* we mark we have a newsegment here because pull based
4236      * mode works just fine without having a newsegment before the
4237      * first buffer */
4238     format = GST_FORMAT_BYTES;
4239
4240     gst_segment_init (&basesink->segment, format);
4241     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4242     GST_OBJECT_LOCK (basesink);
4243     basesink->have_newsegment = TRUE;
4244     GST_OBJECT_UNLOCK (basesink);
4245
4246     /* get the peer duration in bytes */
4247     result = gst_pad_query_peer_duration (pad, &format, &duration);
4248     if (result) {
4249       GST_DEBUG_OBJECT (basesink,
4250           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4251       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4252           duration);
4253       gst_segment_set_duration (&basesink->segment, format, duration);
4254     } else {
4255       GST_DEBUG_OBJECT (basesink, "unknown duration");
4256     }
4257
4258     if (bclass->activate_pull)
4259       result = bclass->activate_pull (basesink, TRUE);
4260     else
4261       result = FALSE;
4262
4263     if (!result)
4264       goto activate_failed;
4265
4266   } else {
4267     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4268       g_warning ("Internal GStreamer activation error!!!");
4269       result = FALSE;
4270     } else {
4271       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4272       if (bclass->activate_pull)
4273         result &= bclass->activate_pull (basesink, FALSE);
4274       basesink->pad_mode = GST_ACTIVATE_NONE;
4275       /* clear any pending caps */
4276       GST_OBJECT_LOCK (basesink);
4277       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4278       GST_OBJECT_UNLOCK (basesink);
4279     }
4280   }
4281   gst_object_unref (basesink);
4282
4283   return result;
4284
4285   /* ERRORS */
4286 activate_failed:
4287   {
4288     /* reset, as starting the thread failed */
4289     basesink->pad_mode = GST_ACTIVATE_NONE;
4290
4291     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4292     return FALSE;
4293   }
4294 }
4295
4296 /* send an event to our sinkpad peer. */
4297 static gboolean
4298 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4299 {
4300   GstPad *pad;
4301   GstBaseSink *basesink = GST_BASE_SINK (element);
4302   gboolean forward, result = TRUE;
4303   GstActivateMode mode;
4304
4305   GST_OBJECT_LOCK (element);
4306   /* get the pad and the scheduling mode */
4307   pad = gst_object_ref (basesink->sinkpad);
4308   mode = basesink->pad_mode;
4309   GST_OBJECT_UNLOCK (element);
4310
4311   /* only push UPSTREAM events upstream */
4312   forward = GST_EVENT_IS_UPSTREAM (event);
4313
4314   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4315       event);
4316
4317   switch (GST_EVENT_TYPE (event)) {
4318     case GST_EVENT_LATENCY:
4319     {
4320       GstClockTime latency;
4321
4322       gst_event_parse_latency (event, &latency);
4323
4324       /* store the latency. We use this to adjust the running_time before syncing
4325        * it to the clock. */
4326       GST_OBJECT_LOCK (element);
4327       basesink->priv->latency = latency;
4328       if (!basesink->priv->have_latency)
4329         forward = FALSE;
4330       GST_OBJECT_UNLOCK (element);
4331       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4332           GST_TIME_ARGS (latency));
4333
4334       /* We forward this event so that all elements know about the global pipeline
4335        * latency. This is interesting for an element when it wants to figure out
4336        * when a particular piece of data will be rendered. */
4337       break;
4338     }
4339     case GST_EVENT_SEEK:
4340       /* in pull mode we will execute the seek */
4341       if (mode == GST_ACTIVATE_PULL)
4342         result = gst_base_sink_perform_seek (basesink, pad, event);
4343       break;
4344     case GST_EVENT_STEP:
4345       result = gst_base_sink_perform_step (basesink, pad, event);
4346       forward = FALSE;
4347       break;
4348     default:
4349       break;
4350   }
4351
4352   if (forward) {
4353     result = gst_pad_push_event (pad, event);
4354   } else {
4355     /* not forwarded, unref the event */
4356     gst_event_unref (event);
4357   }
4358
4359   gst_object_unref (pad);
4360   return result;
4361 }
4362
4363 /* get the end position of the last seen object, this is used
4364  * for EOS and for making sure that we don't report a position we
4365  * have not reached yet. With LOCK. */
4366 static gboolean
4367 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
4368     gint64 * cur, gboolean * upstream)
4369 {
4370   GstFormat oformat;
4371   GstSegment *segment;
4372   gboolean ret = TRUE;
4373
4374   segment = &basesink->segment;
4375   oformat = segment->format;
4376
4377   if (oformat == GST_FORMAT_TIME) {
4378     /* return last observed stream time, we keep the stream time around in the
4379      * time format. */
4380     *cur = basesink->priv->current_sstop;
4381   } else {
4382     /* convert last stop to stream time */
4383     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4384   }
4385
4386   if (*cur != -1 && oformat != format) {
4387     GST_OBJECT_UNLOCK (basesink);
4388     /* convert to the target format if we need to, release lock first */
4389     ret =
4390         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4391     if (!ret) {
4392       *cur = -1;
4393       *upstream = TRUE;
4394     }
4395     GST_OBJECT_LOCK (basesink);
4396   }
4397
4398   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
4399       GST_TIME_ARGS (*cur));
4400
4401   return ret;
4402 }
4403
4404 /* get the position when we are PAUSED, this is the stream time of the buffer
4405  * that prerolled. If no buffer is prerolled (we are still flushing), this
4406  * value will be -1. With LOCK. */
4407 static gboolean
4408 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4409     gint64 * cur, gboolean * upstream)
4410 {
4411   gboolean res;
4412   gint64 time;
4413   GstSegment *segment;
4414   GstFormat oformat;
4415
4416   /* we don't use the clip segment in pull mode, when seeking we update the
4417    * main segment directly with the new segment values without it having to be
4418    * activated by the rendering after preroll */
4419   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4420     segment = basesink->abidata.ABI.clip_segment;
4421   else
4422     segment = &basesink->segment;
4423   oformat = segment->format;
4424
4425   if (oformat == GST_FORMAT_TIME) {
4426     *cur = basesink->priv->current_sstart;
4427     if (segment->rate < 0.0 &&
4428         GST_CLOCK_TIME_IS_VALID (basesink->priv->current_sstop)) {
4429       /* for reverse playback we prefer the stream time stop position if we have
4430        * one */
4431       *cur = basesink->priv->current_sstop;
4432     }
4433   } else {
4434     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4435   }
4436
4437   time = segment->time;
4438
4439   if (*cur != -1) {
4440     *cur = MAX (*cur, time);
4441     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4442         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4443   } else {
4444     /* we have no buffer, use the segment times. */
4445     if (segment->rate >= 0.0) {
4446       /* forward, next position is always the time of the segment */
4447       *cur = time;
4448       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4449           GST_TIME_ARGS (*cur));
4450     } else {
4451       /* reverse, next expected timestamp is segment->stop. We use the function
4452        * to get things right for negative applied_rates. */
4453       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4454       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4455           GST_TIME_ARGS (*cur));
4456     }
4457   }
4458
4459   res = (*cur != -1);
4460   if (res && oformat != format) {
4461     GST_OBJECT_UNLOCK (basesink);
4462     res =
4463         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4464     if (!res) {
4465       *cur = -1;
4466       *upstream = TRUE;
4467     }
4468     GST_OBJECT_LOCK (basesink);
4469   }
4470
4471   return res;
4472 }
4473
4474 static gboolean
4475 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4476     gint64 * cur, gboolean * upstream)
4477 {
4478   GstClock *clock;
4479   gboolean res = FALSE;
4480   GstFormat oformat, tformat;
4481   GstClockTime now, latency;
4482   GstClockTimeDiff base;
4483   gint64 time, accum, duration;
4484   gdouble rate;
4485   gint64 last;
4486
4487   GST_OBJECT_LOCK (basesink);
4488   /* our intermediate time format */
4489   tformat = GST_FORMAT_TIME;
4490   /* get the format in the segment */
4491   oformat = basesink->segment.format;
4492
4493   /* can only give answer based on the clock if not EOS */
4494   if (G_UNLIKELY (basesink->eos))
4495     goto in_eos;
4496
4497   /* we can only get the segment when we are not NULL or READY */
4498   if (!basesink->have_newsegment)
4499     goto wrong_state;
4500
4501   /* when not in PLAYING or when we're busy with a state change, we
4502    * cannot read from the clock so we report time based on the
4503    * last seen timestamp. */
4504   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4505       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4506     goto in_pause;
4507
4508   /* we need to sync on the clock. */
4509   if (basesink->sync == FALSE)
4510     goto no_sync;
4511
4512   /* and we need a clock */
4513   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4514     goto no_sync;
4515
4516   /* collect all data we need holding the lock */
4517   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4518     time = basesink->segment.time;
4519   else
4520     time = 0;
4521
4522   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4523     duration = basesink->segment.stop - basesink->segment.start;
4524   else
4525     duration = 0;
4526
4527   base = GST_ELEMENT_CAST (basesink)->base_time;
4528   accum = basesink->segment.accum;
4529   rate = basesink->segment.rate * basesink->segment.applied_rate;
4530   latency = basesink->priv->latency;
4531
4532   gst_object_ref (clock);
4533
4534   /* this function might release the LOCK */
4535   gst_base_sink_get_position_last (basesink, format, &last, upstream);
4536
4537   /* need to release the object lock before we can get the time,
4538    * a clock might take the LOCK of the provider, which could be
4539    * a basesink subclass. */
4540   GST_OBJECT_UNLOCK (basesink);
4541
4542   now = gst_clock_get_time (clock);
4543
4544   if (oformat != tformat) {
4545     /* convert accum, time and duration to time */
4546     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4547             &accum))
4548       goto convert_failed;
4549     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4550             &duration))
4551       goto convert_failed;
4552     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4553             &time))
4554       goto convert_failed;
4555   }
4556
4557   /* subtract base time and accumulated time from the clock time.
4558    * Make sure we don't go negative. This is the current time in
4559    * the segment which we need to scale with the combined
4560    * rate and applied rate. */
4561   base += accum;
4562   base += latency;
4563   if (GST_CLOCK_DIFF (base, now) < 0)
4564     base = now;
4565
4566   /* for negative rates we need to count back from the segment
4567    * duration. */
4568   if (rate < 0.0)
4569     time += duration;
4570
4571   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4572
4573   /* never report more than last seen position */
4574   if (last != -1)
4575     *cur = MIN (last, *cur);
4576
4577   gst_object_unref (clock);
4578
4579   GST_DEBUG_OBJECT (basesink,
4580       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4581       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4582       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4583       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4584
4585   if (oformat != format) {
4586     /* convert time to final format */
4587     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4588       goto convert_failed;
4589   }
4590
4591   res = TRUE;
4592
4593 done:
4594   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4595       res, GST_TIME_ARGS (*cur));
4596   return res;
4597
4598   /* special cases */
4599 in_eos:
4600   {
4601     GST_DEBUG_OBJECT (basesink, "position in EOS");
4602     res = gst_base_sink_get_position_last (basesink, format, cur, upstream);
4603     GST_OBJECT_UNLOCK (basesink);
4604     goto done;
4605   }
4606 in_pause:
4607   {
4608     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4609     res = gst_base_sink_get_position_paused (basesink, format, cur, upstream);
4610     GST_OBJECT_UNLOCK (basesink);
4611     goto done;
4612   }
4613 wrong_state:
4614   {
4615     /* in NULL or READY we always return FALSE and -1 */
4616     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4617     res = FALSE;
4618     *cur = -1;
4619     GST_OBJECT_UNLOCK (basesink);
4620     goto done;
4621   }
4622 no_sync:
4623   {
4624     /* report last seen timestamp if any, else ask upstream to answer */
4625     if ((*cur = basesink->priv->current_sstart) != -1)
4626       res = TRUE;
4627     else
4628       *upstream = TRUE;
4629
4630     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4631         res, GST_TIME_ARGS (*cur));
4632     GST_OBJECT_UNLOCK (basesink);
4633     return res;
4634   }
4635 convert_failed:
4636   {
4637     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4638     *upstream = TRUE;
4639     return FALSE;
4640   }
4641 }
4642
4643 static gboolean
4644 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4645     gint64 * dur, gboolean * upstream)
4646 {
4647   gboolean res = FALSE;
4648
4649   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4650     GstFormat uformat = GST_FORMAT_BYTES;
4651     gint64 uduration;
4652
4653     /* get the duration in bytes, in pull mode that's all we are sure to
4654      * know. We have to explicitly get this value from upstream instead of
4655      * using our cached value because it might change. Duration caching
4656      * should be done at a higher level. */
4657     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4658     if (res) {
4659       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4660       if (format != uformat) {
4661         /* convert to the requested format */
4662         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4663             &format, dur);
4664       } else {
4665         *dur = uduration;
4666       }
4667     }
4668     *upstream = FALSE;
4669   } else {
4670     *upstream = TRUE;
4671   }
4672
4673   return res;
4674 }
4675
4676 static const GstQueryType *
4677 gst_base_sink_get_query_types (GstElement * element)
4678 {
4679   static const GstQueryType query_types[] = {
4680     GST_QUERY_DURATION,
4681     GST_QUERY_POSITION,
4682     GST_QUERY_SEGMENT,
4683     GST_QUERY_LATENCY,
4684     0
4685   };
4686
4687   return query_types;
4688 }
4689
4690 static gboolean
4691 gst_base_sink_query (GstElement * element, GstQuery * query)
4692 {
4693   gboolean res = FALSE;
4694
4695   GstBaseSink *basesink = GST_BASE_SINK (element);
4696
4697   switch (GST_QUERY_TYPE (query)) {
4698     case GST_QUERY_POSITION:
4699     {
4700       gint64 cur = 0;
4701       GstFormat format;
4702       gboolean upstream = FALSE;
4703
4704       gst_query_parse_position (query, &format, NULL);
4705
4706       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4707           gst_format_get_name (format));
4708
4709       /* first try to get the position based on the clock */
4710       if ((res =
4711               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4712         gst_query_set_position (query, format, cur);
4713       } else if (upstream) {
4714         /* fallback to peer query */
4715         res = gst_pad_peer_query (basesink->sinkpad, query);
4716       }
4717       if (!res) {
4718         /* we can handle a few things if upstream failed */
4719         if (format == GST_FORMAT_PERCENT) {
4720           gint64 dur = 0;
4721           GstFormat uformat = GST_FORMAT_TIME;
4722
4723           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4724               &upstream);
4725           if (!res && upstream) {
4726             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4727                 &cur);
4728           }
4729           if (res) {
4730             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4731                 &upstream);
4732             if (!res && upstream) {
4733               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4734                   &dur);
4735             }
4736           }
4737           if (res) {
4738             gint64 pos;
4739
4740             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4741                 dur);
4742             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4743           }
4744         }
4745       }
4746       break;
4747     }
4748     case GST_QUERY_DURATION:
4749     {
4750       gint64 dur = 0;
4751       GstFormat format;
4752       gboolean upstream = FALSE;
4753
4754       gst_query_parse_duration (query, &format, NULL);
4755
4756       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4757           gst_format_get_name (format));
4758
4759       if ((res =
4760               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4761         gst_query_set_duration (query, format, dur);
4762       } else if (upstream) {
4763         /* fallback to peer query */
4764         res = gst_pad_peer_query (basesink->sinkpad, query);
4765       }
4766       if (!res) {
4767         /* we can handle a few things if upstream failed */
4768         if (format == GST_FORMAT_PERCENT) {
4769           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4770               GST_FORMAT_PERCENT_MAX);
4771           res = TRUE;
4772         }
4773       }
4774       break;
4775     }
4776     case GST_QUERY_LATENCY:
4777     {
4778       gboolean live, us_live;
4779       GstClockTime min, max;
4780
4781       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4782                   &max))) {
4783         gst_query_set_latency (query, live, min, max);
4784       }
4785       break;
4786     }
4787     case GST_QUERY_JITTER:
4788       break;
4789     case GST_QUERY_RATE:
4790       /* gst_query_set_rate (query, basesink->segment_rate); */
4791       res = TRUE;
4792       break;
4793     case GST_QUERY_SEGMENT:
4794     {
4795       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4796         gst_query_set_segment (query, basesink->segment.rate,
4797             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4798         res = TRUE;
4799       } else {
4800         res = gst_pad_peer_query (basesink->sinkpad, query);
4801       }
4802       break;
4803     }
4804     case GST_QUERY_SEEKING:
4805     case GST_QUERY_CONVERT:
4806     case GST_QUERY_FORMATS:
4807     default:
4808       res = gst_pad_peer_query (basesink->sinkpad, query);
4809       break;
4810   }
4811   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4812       GST_QUERY_TYPE_NAME (query), res);
4813   return res;
4814 }
4815
4816 static GstStateChangeReturn
4817 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4818 {
4819   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4820   GstBaseSink *basesink = GST_BASE_SINK (element);
4821   GstBaseSinkClass *bclass;
4822   GstBaseSinkPrivate *priv;
4823
4824   priv = basesink->priv;
4825
4826   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4827
4828   switch (transition) {
4829     case GST_STATE_CHANGE_NULL_TO_READY:
4830       if (bclass->start)
4831         if (!bclass->start (basesink))
4832           goto start_failed;
4833       break;
4834     case GST_STATE_CHANGE_READY_TO_PAUSED:
4835       /* need to complete preroll before this state change completes, there
4836        * is no data flow in READY so we can safely assume we need to preroll. */
4837       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4838       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4839       basesink->have_newsegment = FALSE;
4840       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4841       gst_segment_init (basesink->abidata.ABI.clip_segment,
4842           GST_FORMAT_UNDEFINED);
4843       basesink->offset = 0;
4844       basesink->have_preroll = FALSE;
4845       priv->step_unlock = FALSE;
4846       basesink->need_preroll = TRUE;
4847       basesink->playing_async = TRUE;
4848       priv->current_sstart = GST_CLOCK_TIME_NONE;
4849       priv->current_sstop = GST_CLOCK_TIME_NONE;
4850       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4851       priv->latency = 0;
4852       basesink->eos = FALSE;
4853       priv->received_eos = FALSE;
4854       gst_base_sink_reset_qos (basesink);
4855       priv->commited = FALSE;
4856       priv->call_preroll = TRUE;
4857       priv->current_step.valid = FALSE;
4858       priv->pending_step.valid = FALSE;
4859       if (priv->async_enabled) {
4860         GST_DEBUG_OBJECT (basesink, "doing async state change");
4861         /* when async enabled, post async-start message and return ASYNC from
4862          * the state change function */
4863         ret = GST_STATE_CHANGE_ASYNC;
4864         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4865             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4866       } else {
4867         priv->have_latency = TRUE;
4868       }
4869       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4870       break;
4871     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4872       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4873       if (!gst_base_sink_needs_preroll (basesink)) {
4874         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4875         /* no preroll needed anymore now. */
4876         basesink->playing_async = FALSE;
4877         basesink->need_preroll = FALSE;
4878         if (basesink->eos) {
4879           GstMessage *message;
4880
4881           /* need to post EOS message here */
4882           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4883           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4884           gst_message_set_seqnum (message, basesink->priv->seqnum);
4885           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4886         } else {
4887           GST_DEBUG_OBJECT (basesink, "signal preroll");
4888           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4889         }
4890       } else {
4891         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4892         basesink->need_preroll = TRUE;
4893         basesink->playing_async = TRUE;
4894         priv->call_preroll = TRUE;
4895         priv->commited = FALSE;
4896         if (priv->async_enabled) {
4897           GST_DEBUG_OBJECT (basesink, "doing async state change");
4898           ret = GST_STATE_CHANGE_ASYNC;
4899           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4900               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4901         }
4902       }
4903       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4904       break;
4905     default:
4906       break;
4907   }
4908
4909   {
4910     GstStateChangeReturn bret;
4911
4912     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4913     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4914       goto activate_failed;
4915   }
4916
4917   switch (transition) {
4918     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4919       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4920       /* FIXME, make sure we cannot enter _render first */
4921
4922       /* we need to call ::unlock before locking PREROLL_LOCK
4923        * since we lock it before going into ::render */
4924       if (bclass->unlock)
4925         bclass->unlock (basesink);
4926
4927       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4928       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4929       /* now that we have the PREROLL lock, clear our unlock request */
4930       if (bclass->unlock_stop)
4931         bclass->unlock_stop (basesink);
4932
4933       /* we need preroll again and we set the flag before unlocking the clockid
4934        * because if the clockid is unlocked before a current buffer expired, we
4935        * can use that buffer to preroll with */
4936       basesink->need_preroll = TRUE;
4937
4938       if (basesink->clock_id) {
4939         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4940         gst_clock_id_unschedule (basesink->clock_id);
4941       }
4942
4943       /* if we don't have a preroll buffer we need to wait for a preroll and
4944        * return ASYNC. */
4945       if (!gst_base_sink_needs_preroll (basesink)) {
4946         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4947         basesink->playing_async = FALSE;
4948       } else {
4949         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4950           GST_DEBUG_OBJECT (basesink, "element is <= READY");
4951           ret = GST_STATE_CHANGE_SUCCESS;
4952         } else {
4953           GST_DEBUG_OBJECT (basesink,
4954               "PLAYING to PAUSED, we are not prerolled");
4955           basesink->playing_async = TRUE;
4956           priv->commited = FALSE;
4957           priv->call_preroll = TRUE;
4958           if (priv->async_enabled) {
4959             GST_DEBUG_OBJECT (basesink, "doing async state change");
4960             ret = GST_STATE_CHANGE_ASYNC;
4961             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4962                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4963                     FALSE));
4964           }
4965         }
4966       }
4967       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4968           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4969
4970       gst_base_sink_reset_qos (basesink);
4971       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4972       break;
4973     case GST_STATE_CHANGE_PAUSED_TO_READY:
4974       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4975       /* start by reseting our position state with the object lock so that the
4976        * position query gets the right idea. We do this before we post the
4977        * messages so that the message handlers pick this up. */
4978       GST_OBJECT_LOCK (basesink);
4979       basesink->have_newsegment = FALSE;
4980       priv->current_sstart = GST_CLOCK_TIME_NONE;
4981       priv->current_sstop = GST_CLOCK_TIME_NONE;
4982       priv->have_latency = FALSE;
4983       GST_OBJECT_UNLOCK (basesink);
4984
4985       gst_base_sink_set_last_buffer (basesink, NULL);
4986       priv->call_preroll = FALSE;
4987
4988       if (!priv->commited) {
4989         if (priv->async_enabled) {
4990           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4991
4992           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4993               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4994                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4995
4996           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4997               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4998         }
4999         priv->commited = TRUE;
5000       } else {
5001         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5002       }
5003       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5004       break;
5005     case GST_STATE_CHANGE_READY_TO_NULL:
5006       if (bclass->stop) {
5007         if (!bclass->stop (basesink)) {
5008           GST_WARNING_OBJECT (basesink, "failed to stop");
5009         }
5010       }
5011       gst_base_sink_set_last_buffer (basesink, NULL);
5012       priv->call_preroll = FALSE;
5013       break;
5014     default:
5015       break;
5016   }
5017
5018   return ret;
5019
5020   /* ERRORS */
5021 start_failed:
5022   {
5023     GST_DEBUG_OBJECT (basesink, "failed to start");
5024     return GST_STATE_CHANGE_FAILURE;
5025   }
5026 activate_failed:
5027   {
5028     GST_DEBUG_OBJECT (basesink,
5029         "element failed to change states -- activation problem?");
5030     return GST_STATE_CHANGE_FAILURE;
5031   }
5032 }