base: Fix pad callbacks so they handle when parent goes away
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199   /* the running time of the previous buffer */
200   GstClockTime prev_rstart;
201
202   /* EOS sync time in running time */
203   GstClockTime eos_rtime;
204
205   /* last buffer that arrived in time, running time */
206   GstClockTime last_render_time;
207   /* when the last buffer left the sink, running time */
208   GstClockTime last_left;
209
210   /* running averages go here these are done on running time */
211   GstClockTime avg_pt;
212   GstClockTime avg_duration;
213   gdouble avg_rate;
214   GstClockTime avg_in_diff;
215
216   /* these are done on system time. avg_jitter and avg_render are
217    * compared to eachother to see if the rendering time takes a
218    * huge amount of the processing, If so we are flooded with
219    * buffers. */
220   GstClockTime last_left_systime;
221   GstClockTime avg_jitter;
222   GstClockTime start, stop;
223   GstClockTime avg_render;
224
225   /* number of rendered and dropped frames */
226   guint64 rendered;
227   guint64 dropped;
228
229   /* latency stuff */
230   GstClockTime latency;
231
232   /* if we already commited the state */
233   gboolean commited;
234
235   /* when we received EOS */
236   gboolean received_eos;
237
238   /* when we are prerolled and able to report latency */
239   gboolean have_latency;
240
241   /* the last buffer we prerolled or rendered. Useful for making snapshots */
242   gint enable_last_buffer;      /* atomic */
243   GstBuffer *last_buffer;
244
245   /* caps for pull based scheduling */
246   GstCaps *pull_caps;
247
248   /* blocksize for pulling */
249   guint blocksize;
250
251   gboolean discont;
252
253   /* seqnum of the stream */
254   guint32 seqnum;
255
256   gboolean call_preroll;
257   gboolean step_unlock;
258
259   /* we have a pending and a current step operation */
260   GstStepInfo current_step;
261   GstStepInfo pending_step;
262
263   /* Cached GstClockID */
264   GstClockID cached_clock_id;
265
266   /* for throttling and QoS */
267   GstClockTime earliest_in_time;
268   GstClockTime throttle_time;
269 };
270
271 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
272
273 /* generic running average, this has a neutral window size */
274 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
275
276 /* the windows for these running averages are experimentally obtained.
277  * possitive values get averaged more while negative values use a small
278  * window so we can react faster to badness. */
279 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
280 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
281
282 enum
283 {
284   _PR_IS_NOTHING = 1 << 0,
285   _PR_IS_BUFFER = 1 << 1,
286   _PR_IS_BUFFERLIST = 1 << 2,
287   _PR_IS_EVENT = 1 << 3
288 } PrivateObjectType;
289
290 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
291 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
292 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
293 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
294
295 /* BaseSink properties */
296
297 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
298 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
299
300 #define DEFAULT_PREROLL_QUEUE_LEN   0
301 #define DEFAULT_SYNC                TRUE
302 #define DEFAULT_MAX_LATENESS        -1
303 #define DEFAULT_QOS                 FALSE
304 #define DEFAULT_ASYNC               TRUE
305 #define DEFAULT_TS_OFFSET           0
306 #define DEFAULT_BLOCKSIZE           4096
307 #define DEFAULT_RENDER_DELAY        0
308 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
309 #define DEFAULT_THROTTLE_TIME       0
310
311 enum
312 {
313   PROP_0,
314   PROP_PREROLL_QUEUE_LEN,
315   PROP_SYNC,
316   PROP_MAX_LATENESS,
317   PROP_QOS,
318   PROP_ASYNC,
319   PROP_TS_OFFSET,
320   PROP_ENABLE_LAST_BUFFER,
321   PROP_LAST_BUFFER,
322   PROP_BLOCKSIZE,
323   PROP_RENDER_DELAY,
324   PROP_THROTTLE_TIME,
325   PROP_LAST
326 };
327
328 static GstElementClass *parent_class = NULL;
329
330 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
331 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
332 static void gst_base_sink_finalize (GObject * object);
333
334 GType
335 gst_base_sink_get_type (void)
336 {
337   static volatile gsize base_sink_type = 0;
338
339   if (g_once_init_enter (&base_sink_type)) {
340     GType _type;
341     static const GTypeInfo base_sink_info = {
342       sizeof (GstBaseSinkClass),
343       NULL,
344       NULL,
345       (GClassInitFunc) gst_base_sink_class_init,
346       NULL,
347       NULL,
348       sizeof (GstBaseSink),
349       0,
350       (GInstanceInitFunc) gst_base_sink_init,
351     };
352
353     _type = g_type_register_static (GST_TYPE_ELEMENT,
354         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
355     g_once_init_leave (&base_sink_type, _type);
356   }
357   return base_sink_type;
358 }
359
360 static void gst_base_sink_set_property (GObject * object, guint prop_id,
361     const GValue * value, GParamSpec * pspec);
362 static void gst_base_sink_get_property (GObject * object, guint prop_id,
363     GValue * value, GParamSpec * pspec);
364
365 static gboolean gst_base_sink_send_event (GstElement * element,
366     GstEvent * event);
367 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
368 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
369
370 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
371 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
372 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
373     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
374 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
375     GstClockTime * start, GstClockTime * end);
376 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
377     GstPad * pad, gboolean flushing);
378 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
379     gboolean active);
380 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
381     GstSegment * segment);
382 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
383     GstEvent * event, GstSegment * segment);
384
385 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
386     GstStateChange transition);
387
388 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
389 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
390     GstBufferList * list);
391
392 static void gst_base_sink_loop (GstPad * pad);
393 static gboolean gst_base_sink_pad_activate (GstPad * pad);
394 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
395 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
396 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
397
398 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
399 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
400 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
401 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
402 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
403     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
404
405
406 /* check if an object was too late */
407 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
408     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
409     GstClockReturn status, GstClockTimeDiff jitter);
410 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
411     guint8 obj_type, GstMiniObject * obj);
412
413 static void
414 gst_base_sink_class_init (GstBaseSinkClass * klass)
415 {
416   GObjectClass *gobject_class;
417   GstElementClass *gstelement_class;
418
419   gobject_class = G_OBJECT_CLASS (klass);
420   gstelement_class = GST_ELEMENT_CLASS (klass);
421
422   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
423       "basesink element");
424
425   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
426
427   parent_class = g_type_class_peek_parent (klass);
428
429   gobject_class->finalize = gst_base_sink_finalize;
430   gobject_class->set_property = gst_base_sink_set_property;
431   gobject_class->get_property = gst_base_sink_get_property;
432
433   /* FIXME, this next value should be configured using an event from the
434    * upstream element, ie, the BUFFER_SIZE event. */
435   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
436       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
437           "Number of buffers to queue during preroll", 0, G_MAXUINT,
438           DEFAULT_PREROLL_QUEUE_LEN,
439           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_SYNC,
442       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
443           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444
445   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
446       g_param_spec_int64 ("max-lateness", "Max Lateness",
447           "Maximum number of nanoseconds that a buffer can be late before it "
448           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_QOS,
452       g_param_spec_boolean ("qos", "Qos",
453           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455   /**
456    * GstBaseSink:async
457    *
458    * If set to #TRUE, the basesink will perform asynchronous state changes.
459    * When set to #FALSE, the sink will not signal the parent when it prerolls.
460    * Use this option when dealing with sparse streams or when synchronisation is
461    * not required.
462    *
463    * Since: 0.10.15
464    */
465   g_object_class_install_property (gobject_class, PROP_ASYNC,
466       g_param_spec_boolean ("async", "Async",
467           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstBaseSink:ts-offset
471    *
472    * Controls the final synchronisation, a negative value will render the buffer
473    * earlier while a positive value delays playback. This property can be
474    * used to fix synchronisation in bad files.
475    *
476    * Since: 0.10.15
477    */
478   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
479       g_param_spec_int64 ("ts-offset", "TS Offset",
480           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
481           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
482
483   /**
484    * GstBaseSink:enable-last-buffer
485    *
486    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
487    * reference to the last buffer arrived and the last-buffer property is always
488    * set to NULL. This can be useful if you need buffers to be released as soon
489    * as possible, eg. if you're using a buffer pool.
490    *
491    * Since: 0.10.30
492    */
493   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
494       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
495           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
496           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
497
498   /**
499    * GstBaseSink:last-buffer
500    *
501    * The last buffer that arrived in the sink and was used for preroll or for
502    * rendering. This property can be used to generate thumbnails. This property
503    * can be NULL when the sink has not yet received a bufer.
504    *
505    * Since: 0.10.15
506    */
507   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
508       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
509           "The last buffer received in the sink", GST_TYPE_BUFFER,
510           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
511   /**
512    * GstBaseSink:blocksize
513    *
514    * The amount of bytes to pull when operating in pull mode.
515    *
516    * Since: 0.10.22
517    */
518   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
519   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
520       g_param_spec_uint ("blocksize", "Block size",
521           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
522           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
523   /**
524    * GstBaseSink:render-delay
525    *
526    * The additional delay between synchronisation and actual rendering of the
527    * media. This property will add additional latency to the device in order to
528    * make other sinks compensate for the delay.
529    *
530    * Since: 0.10.22
531    */
532   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
533       g_param_spec_uint64 ("render-delay", "Render Delay",
534           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
535           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536   /**
537    * GstBaseSink:throttle-time
538    *
539    * The time to insert between buffers. This property can be used to control
540    * the maximum amount of buffers per second to render. Setting this property
541    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
542    *
543    * Since: 0.10.33
544    */
545   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
546       g_param_spec_uint64 ("throttle-time", "Throttle time",
547           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
548           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   gstelement_class->change_state =
551       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
552   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
553   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
554   gstelement_class->get_query_types =
555       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
556
557   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
558   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
559   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
560   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
561   klass->activate_pull =
562       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
563
564   /* Registering debug symbols for function pointers */
565   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
566   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
571   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
572   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
573   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
575 }
576
577 static GstCaps *
578 gst_base_sink_pad_getcaps (GstPad * pad)
579 {
580   GstBaseSinkClass *bclass;
581   GstBaseSink *bsink;
582   GstCaps *caps = NULL;
583
584   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
585   bclass = GST_BASE_SINK_GET_CLASS (bsink);
586
587   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
588     /* if we are operating in pull mode we only accept the negotiated caps */
589     GST_OBJECT_LOCK (pad);
590     if ((caps = GST_PAD_CAPS (pad)))
591       gst_caps_ref (caps);
592     GST_OBJECT_UNLOCK (pad);
593   }
594   if (caps == NULL) {
595     if (bclass->get_caps)
596       caps = bclass->get_caps (bsink);
597
598     if (caps == NULL) {
599       GstPadTemplate *pad_template;
600
601       pad_template =
602           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
603           "sink");
604       if (pad_template != NULL) {
605         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
606       }
607     }
608   }
609   gst_object_unref (bsink);
610
611   return caps;
612 }
613
614 static gboolean
615 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
616 {
617   GstBaseSinkClass *bclass;
618   GstBaseSink *bsink;
619   gboolean res = TRUE;
620
621   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
622   bclass = GST_BASE_SINK_GET_CLASS (bsink);
623
624   if (res && bclass->set_caps)
625     res = bclass->set_caps (bsink, caps);
626
627   gst_object_unref (bsink);
628
629   return res;
630 }
631
632 static void
633 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
634 {
635   GstBaseSinkClass *bclass;
636   GstBaseSink *bsink;
637
638   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
639   bclass = GST_BASE_SINK_GET_CLASS (bsink);
640
641   if (bclass->fixate)
642     bclass->fixate (bsink, caps);
643
644   gst_object_unref (bsink);
645 }
646
647 static GstFlowReturn
648 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
649     GstCaps * caps, GstBuffer ** buf)
650 {
651   GstBaseSinkClass *bclass;
652   GstBaseSink *bsink;
653   GstFlowReturn result = GST_FLOW_OK;
654
655   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
656   if (G_UNLIKELY (bsink == NULL))
657     return GST_FLOW_WRONG_STATE;
658   bclass = GST_BASE_SINK_GET_CLASS (bsink);
659
660   if (bclass->buffer_alloc)
661     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
662   else
663     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
664
665   gst_object_unref (bsink);
666
667   return result;
668 }
669
670 static void
671 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
672 {
673   GstPadTemplate *pad_template;
674   GstBaseSinkPrivate *priv;
675
676   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
677
678   pad_template =
679       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
680   g_return_if_fail (pad_template != NULL);
681
682   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
683
684   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
685   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
686   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
687   gst_pad_set_bufferalloc_function (basesink->sinkpad,
688       gst_base_sink_pad_buffer_alloc);
689   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
690   gst_pad_set_activatepush_function (basesink->sinkpad,
691       gst_base_sink_pad_activate_push);
692   gst_pad_set_activatepull_function (basesink->sinkpad,
693       gst_base_sink_pad_activate_pull);
694   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
695   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
696   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
697   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
698
699   basesink->pad_mode = GST_ACTIVATE_NONE;
700   basesink->preroll_queue = g_queue_new ();
701   basesink->abidata.ABI.clip_segment = gst_segment_new ();
702   priv->have_latency = FALSE;
703
704   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
705   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
706
707   basesink->sync = DEFAULT_SYNC;
708   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
709   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
710   priv->async_enabled = DEFAULT_ASYNC;
711   priv->ts_offset = DEFAULT_TS_OFFSET;
712   priv->render_delay = DEFAULT_RENDER_DELAY;
713   priv->blocksize = DEFAULT_BLOCKSIZE;
714   priv->cached_clock_id = NULL;
715   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
716   priv->throttle_time = DEFAULT_THROTTLE_TIME;
717
718   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
719 }
720
721 static void
722 gst_base_sink_finalize (GObject * object)
723 {
724   GstBaseSink *basesink;
725
726   basesink = GST_BASE_SINK (object);
727
728   g_queue_free (basesink->preroll_queue);
729   gst_segment_free (basesink->abidata.ABI.clip_segment);
730
731   G_OBJECT_CLASS (parent_class)->finalize (object);
732 }
733
734 /**
735  * gst_base_sink_set_sync:
736  * @sink: the sink
737  * @sync: the new sync value.
738  *
739  * Configures @sink to synchronize on the clock or not. When
740  * @sync is FALSE, incomming samples will be played as fast as
741  * possible. If @sync is TRUE, the timestamps of the incomming
742  * buffers will be used to schedule the exact render time of its
743  * contents.
744  *
745  * Since: 0.10.4
746  */
747 void
748 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
749 {
750   g_return_if_fail (GST_IS_BASE_SINK (sink));
751
752   GST_OBJECT_LOCK (sink);
753   sink->sync = sync;
754   GST_OBJECT_UNLOCK (sink);
755 }
756
757 /**
758  * gst_base_sink_get_sync:
759  * @sink: the sink
760  *
761  * Checks if @sink is currently configured to synchronize against the
762  * clock.
763  *
764  * Returns: TRUE if the sink is configured to synchronize against the clock.
765  *
766  * Since: 0.10.4
767  */
768 gboolean
769 gst_base_sink_get_sync (GstBaseSink * sink)
770 {
771   gboolean res;
772
773   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
774
775   GST_OBJECT_LOCK (sink);
776   res = sink->sync;
777   GST_OBJECT_UNLOCK (sink);
778
779   return res;
780 }
781
782 /**
783  * gst_base_sink_set_max_lateness:
784  * @sink: the sink
785  * @max_lateness: the new max lateness value.
786  *
787  * Sets the new max lateness value to @max_lateness. This value is
788  * used to decide if a buffer should be dropped or not based on the
789  * buffer timestamp and the current clock time. A value of -1 means
790  * an unlimited time.
791  *
792  * Since: 0.10.4
793  */
794 void
795 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
796 {
797   g_return_if_fail (GST_IS_BASE_SINK (sink));
798
799   GST_OBJECT_LOCK (sink);
800   sink->abidata.ABI.max_lateness = max_lateness;
801   GST_OBJECT_UNLOCK (sink);
802 }
803
804 /**
805  * gst_base_sink_get_max_lateness:
806  * @sink: the sink
807  *
808  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
809  * more details.
810  *
811  * Returns: The maximum time in nanoseconds that a buffer can be late
812  * before it is dropped and not rendered. A value of -1 means an
813  * unlimited time.
814  *
815  * Since: 0.10.4
816  */
817 gint64
818 gst_base_sink_get_max_lateness (GstBaseSink * sink)
819 {
820   gint64 res;
821
822   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
823
824   GST_OBJECT_LOCK (sink);
825   res = sink->abidata.ABI.max_lateness;
826   GST_OBJECT_UNLOCK (sink);
827
828   return res;
829 }
830
831 /**
832  * gst_base_sink_set_qos_enabled:
833  * @sink: the sink
834  * @enabled: the new qos value.
835  *
836  * Configures @sink to send Quality-of-Service events upstream.
837  *
838  * Since: 0.10.5
839  */
840 void
841 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
842 {
843   g_return_if_fail (GST_IS_BASE_SINK (sink));
844
845   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
846 }
847
848 /**
849  * gst_base_sink_is_qos_enabled:
850  * @sink: the sink
851  *
852  * Checks if @sink is currently configured to send Quality-of-Service events
853  * upstream.
854  *
855  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
856  *
857  * Since: 0.10.5
858  */
859 gboolean
860 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
861 {
862   gboolean res;
863
864   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
865
866   res = g_atomic_int_get (&sink->priv->qos_enabled);
867
868   return res;
869 }
870
871 /**
872  * gst_base_sink_set_async_enabled:
873  * @sink: the sink
874  * @enabled: the new async value.
875  *
876  * Configures @sink to perform all state changes asynchronusly. When async is
877  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
878  * preroll buffer. This feature is usefull if the sink does not synchronize
879  * against the clock or when it is dealing with sparse streams.
880  *
881  * Since: 0.10.15
882  */
883 void
884 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
885 {
886   g_return_if_fail (GST_IS_BASE_SINK (sink));
887
888   GST_PAD_PREROLL_LOCK (sink->sinkpad);
889   g_atomic_int_set (&sink->priv->async_enabled, enabled);
890   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
891   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
892 }
893
894 /**
895  * gst_base_sink_is_async_enabled:
896  * @sink: the sink
897  *
898  * Checks if @sink is currently configured to perform asynchronous state
899  * changes to PAUSED.
900  *
901  * Returns: TRUE if the sink is configured to perform asynchronous state
902  * changes.
903  *
904  * Since: 0.10.15
905  */
906 gboolean
907 gst_base_sink_is_async_enabled (GstBaseSink * sink)
908 {
909   gboolean res;
910
911   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
912
913   res = g_atomic_int_get (&sink->priv->async_enabled);
914
915   return res;
916 }
917
918 /**
919  * gst_base_sink_set_ts_offset:
920  * @sink: the sink
921  * @offset: the new offset
922  *
923  * Adjust the synchronisation of @sink with @offset. A negative value will
924  * render buffers earlier than their timestamp. A positive value will delay
925  * rendering. This function can be used to fix playback of badly timestamped
926  * buffers.
927  *
928  * Since: 0.10.15
929  */
930 void
931 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
932 {
933   g_return_if_fail (GST_IS_BASE_SINK (sink));
934
935   GST_OBJECT_LOCK (sink);
936   sink->priv->ts_offset = offset;
937   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
938   GST_OBJECT_UNLOCK (sink);
939 }
940
941 /**
942  * gst_base_sink_get_ts_offset:
943  * @sink: the sink
944  *
945  * Get the synchronisation offset of @sink.
946  *
947  * Returns: The synchronisation offset.
948  *
949  * Since: 0.10.15
950  */
951 GstClockTimeDiff
952 gst_base_sink_get_ts_offset (GstBaseSink * sink)
953 {
954   GstClockTimeDiff res;
955
956   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
957
958   GST_OBJECT_LOCK (sink);
959   res = sink->priv->ts_offset;
960   GST_OBJECT_UNLOCK (sink);
961
962   return res;
963 }
964
965 /**
966  * gst_base_sink_get_last_buffer:
967  * @sink: the sink
968  *
969  * Get the last buffer that arrived in the sink and was used for preroll or for
970  * rendering. This property can be used to generate thumbnails.
971  *
972  * The #GstCaps on the buffer can be used to determine the type of the buffer.
973  *
974  * Free-function: gst_buffer_unref
975  *
976  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
977  *     This function returns NULL when no buffer has arrived in the sink yet
978  *     or when the sink is not in PAUSED or PLAYING.
979  *
980  * Since: 0.10.15
981  */
982 GstBuffer *
983 gst_base_sink_get_last_buffer (GstBaseSink * sink)
984 {
985   GstBuffer *res;
986
987   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
988
989   GST_OBJECT_LOCK (sink);
990   if ((res = sink->priv->last_buffer))
991     gst_buffer_ref (res);
992   GST_OBJECT_UNLOCK (sink);
993
994   return res;
995 }
996
997 /* with OBJECT_LOCK */
998 static void
999 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1000 {
1001   GstBuffer *old;
1002
1003   old = sink->priv->last_buffer;
1004   if (G_LIKELY (old != buffer)) {
1005     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1006     if (G_LIKELY (buffer))
1007       gst_buffer_ref (buffer);
1008     sink->priv->last_buffer = buffer;
1009   } else {
1010     old = NULL;
1011   }
1012   /* avoid unreffing with the lock because cleanup code might want to take the
1013    * lock too */
1014   if (G_LIKELY (old)) {
1015     GST_OBJECT_UNLOCK (sink);
1016     gst_buffer_unref (old);
1017     GST_OBJECT_LOCK (sink);
1018   }
1019 }
1020
1021 static void
1022 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1023 {
1024   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1025     return;
1026
1027   GST_OBJECT_LOCK (sink);
1028   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1029   GST_OBJECT_UNLOCK (sink);
1030 }
1031
1032 /**
1033  * gst_base_sink_set_last_buffer_enabled:
1034  * @sink: the sink
1035  * @enabled: the new enable-last-buffer value.
1036  *
1037  * Configures @sink to store the last received buffer in the last-buffer
1038  * property.
1039  *
1040  * Since: 0.10.30
1041  */
1042 void
1043 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1044 {
1045   g_return_if_fail (GST_IS_BASE_SINK (sink));
1046
1047   /* Only take lock if we change the value */
1048   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1049           !enabled, enabled) && !enabled) {
1050     GST_OBJECT_LOCK (sink);
1051     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1052     GST_OBJECT_UNLOCK (sink);
1053   }
1054 }
1055
1056 /**
1057  * gst_base_sink_is_last_buffer_enabled:
1058  * @sink: the sink
1059  *
1060  * Checks if @sink is currently configured to store the last received buffer in
1061  * the last-buffer property.
1062  *
1063  * Returns: TRUE if the sink is configured to store the last received buffer.
1064  *
1065  * Since: 0.10.30
1066  */
1067 gboolean
1068 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1069 {
1070   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1071
1072   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1073 }
1074
1075 /**
1076  * gst_base_sink_get_latency:
1077  * @sink: the sink
1078  *
1079  * Get the currently configured latency.
1080  *
1081  * Returns: The configured latency.
1082  *
1083  * Since: 0.10.12
1084  */
1085 GstClockTime
1086 gst_base_sink_get_latency (GstBaseSink * sink)
1087 {
1088   GstClockTime res;
1089
1090   GST_OBJECT_LOCK (sink);
1091   res = sink->priv->latency;
1092   GST_OBJECT_UNLOCK (sink);
1093
1094   return res;
1095 }
1096
1097 /**
1098  * gst_base_sink_query_latency:
1099  * @sink: the sink
1100  * @live: (out) (allow-none): if the sink is live
1101  * @upstream_live: (out) (allow-none): if an upstream element is live
1102  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1103  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1104  *
1105  * Query the sink for the latency parameters. The latency will be queried from
1106  * the upstream elements. @live will be TRUE if @sink is configured to
1107  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1108  * element is live.
1109  *
1110  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1111  * for the latency introduced by the upstream elements by setting the
1112  * @min_latency to a strictly possitive value.
1113  *
1114  * This function is mostly used by subclasses.
1115  *
1116  * Returns: TRUE if the query succeeded.
1117  *
1118  * Since: 0.10.12
1119  */
1120 gboolean
1121 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1122     gboolean * upstream_live, GstClockTime * min_latency,
1123     GstClockTime * max_latency)
1124 {
1125   gboolean l, us_live, res, have_latency;
1126   GstClockTime min, max, render_delay;
1127   GstQuery *query;
1128   GstClockTime us_min, us_max;
1129
1130   /* we are live when we sync to the clock */
1131   GST_OBJECT_LOCK (sink);
1132   l = sink->sync;
1133   have_latency = sink->priv->have_latency;
1134   render_delay = sink->priv->render_delay;
1135   GST_OBJECT_UNLOCK (sink);
1136
1137   /* assume no latency */
1138   min = 0;
1139   max = -1;
1140   us_live = FALSE;
1141
1142   if (have_latency) {
1143     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1144     /* we are ready for a latency query this is when we preroll or when we are
1145      * not async. */
1146     query = gst_query_new_latency ();
1147
1148     /* ask the peer for the latency */
1149     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1150       /* get upstream min and max latency */
1151       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1152
1153       if (us_live) {
1154         /* upstream live, use its latency, subclasses should use these
1155          * values to create the complete latency. */
1156         min = us_min;
1157         max = us_max;
1158       }
1159       if (l) {
1160         /* we need to add the render delay if we are live */
1161         if (min != -1)
1162           min += render_delay;
1163         if (max != -1)
1164           max += render_delay;
1165       }
1166     }
1167     gst_query_unref (query);
1168   } else {
1169     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1170     res = FALSE;
1171   }
1172
1173   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1174   if (!res) {
1175     if (!l) {
1176       res = TRUE;
1177       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1178     } else {
1179       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1180     }
1181   }
1182
1183   if (res) {
1184     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1185         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1186         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1187
1188     if (live)
1189       *live = l;
1190     if (upstream_live)
1191       *upstream_live = us_live;
1192     if (min_latency)
1193       *min_latency = min;
1194     if (max_latency)
1195       *max_latency = max;
1196   }
1197   return res;
1198 }
1199
1200 /**
1201  * gst_base_sink_set_render_delay:
1202  * @sink: a #GstBaseSink
1203  * @delay: the new delay
1204  *
1205  * Set the render delay in @sink to @delay. The render delay is the time
1206  * between actual rendering of a buffer and its synchronisation time. Some
1207  * devices might delay media rendering which can be compensated for with this
1208  * function.
1209  *
1210  * After calling this function, this sink will report additional latency and
1211  * other sinks will adjust their latency to delay the rendering of their media.
1212  *
1213  * This function is usually called by subclasses.
1214  *
1215  * Since: 0.10.21
1216  */
1217 void
1218 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1219 {
1220   GstClockTime old_render_delay;
1221
1222   g_return_if_fail (GST_IS_BASE_SINK (sink));
1223
1224   GST_OBJECT_LOCK (sink);
1225   old_render_delay = sink->priv->render_delay;
1226   sink->priv->render_delay = delay;
1227   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1228       GST_TIME_ARGS (delay));
1229   GST_OBJECT_UNLOCK (sink);
1230
1231   if (delay != old_render_delay) {
1232     GST_DEBUG_OBJECT (sink, "posting latency changed");
1233     gst_element_post_message (GST_ELEMENT_CAST (sink),
1234         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1235   }
1236 }
1237
1238 /**
1239  * gst_base_sink_get_render_delay:
1240  * @sink: a #GstBaseSink
1241  *
1242  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1243  * information about the render delay.
1244  *
1245  * Returns: the render delay of @sink.
1246  *
1247  * Since: 0.10.21
1248  */
1249 GstClockTime
1250 gst_base_sink_get_render_delay (GstBaseSink * sink)
1251 {
1252   GstClockTimeDiff res;
1253
1254   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1255
1256   GST_OBJECT_LOCK (sink);
1257   res = sink->priv->render_delay;
1258   GST_OBJECT_UNLOCK (sink);
1259
1260   return res;
1261 }
1262
1263 /**
1264  * gst_base_sink_set_blocksize:
1265  * @sink: a #GstBaseSink
1266  * @blocksize: the blocksize in bytes
1267  *
1268  * Set the number of bytes that the sink will pull when it is operating in pull
1269  * mode.
1270  *
1271  * Since: 0.10.22
1272  */
1273 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1274 void
1275 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1276 {
1277   g_return_if_fail (GST_IS_BASE_SINK (sink));
1278
1279   GST_OBJECT_LOCK (sink);
1280   sink->priv->blocksize = blocksize;
1281   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1282   GST_OBJECT_UNLOCK (sink);
1283 }
1284
1285 /**
1286  * gst_base_sink_get_blocksize:
1287  * @sink: a #GstBaseSink
1288  *
1289  * Get the number of bytes that the sink will pull when it is operating in pull
1290  * mode.
1291  *
1292  * Returns: the number of bytes @sink will pull in pull mode.
1293  *
1294  * Since: 0.10.22
1295  */
1296 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1297 guint
1298 gst_base_sink_get_blocksize (GstBaseSink * sink)
1299 {
1300   guint res;
1301
1302   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1303
1304   GST_OBJECT_LOCK (sink);
1305   res = sink->priv->blocksize;
1306   GST_OBJECT_UNLOCK (sink);
1307
1308   return res;
1309 }
1310
1311 /**
1312  * gst_base_sink_set_throttle_time:
1313  * @sink: a #GstBaseSink
1314  * @throttle: the throttle time in nanoseconds
1315  *
1316  * Set the time that will be inserted between rendered buffers. This
1317  * can be used to control the maximum buffers per second that the sink
1318  * will render. 
1319  *
1320  * Since: 0.10.33
1321  */
1322 void
1323 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1324 {
1325   g_return_if_fail (GST_IS_BASE_SINK (sink));
1326
1327   GST_OBJECT_LOCK (sink);
1328   sink->priv->throttle_time = throttle;
1329   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1330   GST_OBJECT_UNLOCK (sink);
1331 }
1332
1333 /**
1334  * gst_base_sink_get_throttle_time:
1335  * @sink: a #GstBaseSink
1336  *
1337  * Get the time that will be inserted between frames to control the 
1338  * maximum buffers per second.
1339  *
1340  * Returns: the number of nanoseconds @sink will put between frames.
1341  *
1342  * Since: 0.10.33
1343  */
1344 guint64
1345 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1346 {
1347   guint64 res;
1348
1349   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1350
1351   GST_OBJECT_LOCK (sink);
1352   res = sink->priv->throttle_time;
1353   GST_OBJECT_UNLOCK (sink);
1354
1355   return res;
1356 }
1357
1358 static void
1359 gst_base_sink_set_property (GObject * object, guint prop_id,
1360     const GValue * value, GParamSpec * pspec)
1361 {
1362   GstBaseSink *sink = GST_BASE_SINK (object);
1363
1364   switch (prop_id) {
1365     case PROP_PREROLL_QUEUE_LEN:
1366       /* preroll lock necessary to serialize with finish_preroll */
1367       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1368       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1369       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1370       break;
1371     case PROP_SYNC:
1372       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1373       break;
1374     case PROP_MAX_LATENESS:
1375       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1376       break;
1377     case PROP_QOS:
1378       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1379       break;
1380     case PROP_ASYNC:
1381       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1382       break;
1383     case PROP_TS_OFFSET:
1384       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1385       break;
1386     case PROP_BLOCKSIZE:
1387       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1388       break;
1389     case PROP_RENDER_DELAY:
1390       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1391       break;
1392     case PROP_ENABLE_LAST_BUFFER:
1393       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1394       break;
1395     case PROP_THROTTLE_TIME:
1396       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1397       break;
1398     default:
1399       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1400       break;
1401   }
1402 }
1403
1404 static void
1405 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1406     GParamSpec * pspec)
1407 {
1408   GstBaseSink *sink = GST_BASE_SINK (object);
1409
1410   switch (prop_id) {
1411     case PROP_PREROLL_QUEUE_LEN:
1412       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1413       break;
1414     case PROP_SYNC:
1415       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1416       break;
1417     case PROP_MAX_LATENESS:
1418       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1419       break;
1420     case PROP_QOS:
1421       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1422       break;
1423     case PROP_ASYNC:
1424       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1425       break;
1426     case PROP_TS_OFFSET:
1427       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1428       break;
1429     case PROP_LAST_BUFFER:
1430       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1431       break;
1432     case PROP_ENABLE_LAST_BUFFER:
1433       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1434       break;
1435     case PROP_BLOCKSIZE:
1436       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1437       break;
1438     case PROP_RENDER_DELAY:
1439       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1440       break;
1441     case PROP_THROTTLE_TIME:
1442       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1443       break;
1444     default:
1445       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1446       break;
1447   }
1448 }
1449
1450
1451 static GstCaps *
1452 gst_base_sink_get_caps (GstBaseSink * sink)
1453 {
1454   return NULL;
1455 }
1456
1457 static gboolean
1458 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1459 {
1460   return TRUE;
1461 }
1462
1463 static GstFlowReturn
1464 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1465     GstCaps * caps, GstBuffer ** buf)
1466 {
1467   *buf = NULL;
1468   return GST_FLOW_OK;
1469 }
1470
1471 /* with PREROLL_LOCK, STREAM_LOCK */
1472 static void
1473 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1474 {
1475   GstMiniObject *obj;
1476
1477   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1478   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1479     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1480     gst_mini_object_unref (obj);
1481   }
1482   /* we can't have EOS anymore now */
1483   basesink->eos = FALSE;
1484   basesink->priv->received_eos = FALSE;
1485   basesink->have_preroll = FALSE;
1486   basesink->priv->step_unlock = FALSE;
1487   basesink->eos_queued = FALSE;
1488   basesink->preroll_queued = 0;
1489   basesink->buffers_queued = 0;
1490   basesink->events_queued = 0;
1491   /* can't report latency anymore until we preroll again */
1492   if (basesink->priv->async_enabled) {
1493     GST_OBJECT_LOCK (basesink);
1494     basesink->priv->have_latency = FALSE;
1495     GST_OBJECT_UNLOCK (basesink);
1496   }
1497   /* and signal any waiters now */
1498   GST_PAD_PREROLL_SIGNAL (pad);
1499 }
1500
1501 /* with STREAM_LOCK, configures given segment with the event information. */
1502 static void
1503 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1504     GstEvent * event, GstSegment * segment)
1505 {
1506   gboolean update;
1507   gdouble rate, arate;
1508   GstFormat format;
1509   gint64 start;
1510   gint64 stop;
1511   gint64 time;
1512
1513   /* the newsegment event is needed to bring the buffer timestamps to the
1514    * stream time and to drop samples outside of the playback segment. */
1515   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1516       &start, &stop, &time);
1517
1518   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1519    * We protect with the OBJECT_LOCK so that we can use the values to
1520    * safely answer a POSITION query. */
1521   GST_OBJECT_LOCK (basesink);
1522   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1523       stop, time);
1524
1525   if (format == GST_FORMAT_TIME) {
1526     GST_DEBUG_OBJECT (basesink,
1527         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1528         "format GST_FORMAT_TIME, "
1529         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1530         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1531         update, rate, arate, GST_TIME_ARGS (segment->start),
1532         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1533         GST_TIME_ARGS (segment->accum));
1534   } else {
1535     GST_DEBUG_OBJECT (basesink,
1536         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1537         "format %d, "
1538         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1539         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1540         segment->format, segment->start, segment->stop, segment->time,
1541         segment->accum);
1542   }
1543   GST_OBJECT_UNLOCK (basesink);
1544 }
1545
1546 /* with PREROLL_LOCK, STREAM_LOCK */
1547 static gboolean
1548 gst_base_sink_commit_state (GstBaseSink * basesink)
1549 {
1550   /* commit state and proceed to next pending state */
1551   GstState current, next, pending, post_pending;
1552   gboolean post_paused = FALSE;
1553   gboolean post_async_done = FALSE;
1554   gboolean post_playing = FALSE;
1555
1556   /* we are certainly not playing async anymore now */
1557   basesink->playing_async = FALSE;
1558
1559   GST_OBJECT_LOCK (basesink);
1560   current = GST_STATE (basesink);
1561   next = GST_STATE_NEXT (basesink);
1562   pending = GST_STATE_PENDING (basesink);
1563   post_pending = pending;
1564
1565   switch (pending) {
1566     case GST_STATE_PLAYING:
1567     {
1568       GstBaseSinkClass *bclass;
1569       GstStateChangeReturn ret;
1570
1571       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1572
1573       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1574
1575       basesink->need_preroll = FALSE;
1576       post_async_done = TRUE;
1577       basesink->priv->commited = TRUE;
1578       post_playing = TRUE;
1579       /* post PAUSED too when we were READY */
1580       if (current == GST_STATE_READY) {
1581         post_paused = TRUE;
1582       }
1583
1584       /* make sure we notify the subclass of async playing */
1585       if (bclass->async_play) {
1586         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1587         ret = bclass->async_play (basesink);
1588         if (ret == GST_STATE_CHANGE_FAILURE)
1589           goto async_failed;
1590       }
1591       break;
1592     }
1593     case GST_STATE_PAUSED:
1594       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1595       post_paused = TRUE;
1596       post_async_done = TRUE;
1597       basesink->priv->commited = TRUE;
1598       post_pending = GST_STATE_VOID_PENDING;
1599       break;
1600     case GST_STATE_READY:
1601     case GST_STATE_NULL:
1602       goto stopping;
1603     case GST_STATE_VOID_PENDING:
1604       goto nothing_pending;
1605     default:
1606       break;
1607   }
1608
1609   /* we can report latency queries now */
1610   basesink->priv->have_latency = TRUE;
1611
1612   GST_STATE (basesink) = pending;
1613   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1614   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1615   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1616   GST_OBJECT_UNLOCK (basesink);
1617
1618   if (post_paused) {
1619     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1620     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1621         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1622             current, next, post_pending));
1623   }
1624   if (post_async_done) {
1625     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1626     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1627         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1628   }
1629   if (post_playing) {
1630     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1631     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1632         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1633             next, pending, GST_STATE_VOID_PENDING));
1634   }
1635
1636   GST_STATE_BROADCAST (basesink);
1637
1638   return TRUE;
1639
1640 nothing_pending:
1641   {
1642     /* Depending on the state, set our vars. We get in this situation when the
1643      * state change function got a change to update the state vars before the
1644      * streaming thread did. This is fine but we need to make sure that we
1645      * update the need_preroll var since it was TRUE when we got here and might
1646      * become FALSE if we got to PLAYING. */
1647     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1648         gst_element_state_get_name (current));
1649     switch (current) {
1650       case GST_STATE_PLAYING:
1651         basesink->need_preroll = FALSE;
1652         break;
1653       case GST_STATE_PAUSED:
1654         basesink->need_preroll = TRUE;
1655         break;
1656       default:
1657         basesink->need_preroll = FALSE;
1658         basesink->flushing = TRUE;
1659         break;
1660     }
1661     /* we can report latency queries now */
1662     basesink->priv->have_latency = TRUE;
1663     GST_OBJECT_UNLOCK (basesink);
1664     return TRUE;
1665   }
1666 stopping:
1667   {
1668     /* app is going to READY */
1669     GST_DEBUG_OBJECT (basesink, "stopping");
1670     basesink->need_preroll = FALSE;
1671     basesink->flushing = TRUE;
1672     GST_OBJECT_UNLOCK (basesink);
1673     return FALSE;
1674   }
1675 async_failed:
1676   {
1677     GST_DEBUG_OBJECT (basesink, "async commit failed");
1678     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1679     GST_OBJECT_UNLOCK (basesink);
1680     return FALSE;
1681   }
1682 }
1683
1684 static void
1685 start_stepping (GstBaseSink * sink, GstSegment * segment,
1686     GstStepInfo * pending, GstStepInfo * current)
1687 {
1688   gint64 end;
1689   GstMessage *message;
1690
1691   GST_DEBUG_OBJECT (sink, "update pending step");
1692
1693   GST_OBJECT_LOCK (sink);
1694   memcpy (current, pending, sizeof (GstStepInfo));
1695   pending->valid = FALSE;
1696   GST_OBJECT_UNLOCK (sink);
1697
1698   /* post message first */
1699   message =
1700       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1701       current->amount, current->rate, current->flush, current->intermediate);
1702   gst_message_set_seqnum (message, current->seqnum);
1703   gst_element_post_message (GST_ELEMENT (sink), message);
1704
1705   /* get the running time of where we paused and remember it */
1706   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1707   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1708
1709   /* set the new rate for the remainder of the segment */
1710   current->start_rate = segment->rate;
1711   segment->rate *= current->rate;
1712   segment->abs_rate = ABS (segment->rate);
1713
1714   /* save values */
1715   if (segment->rate > 0.0)
1716     current->start_stop = segment->stop;
1717   else
1718     current->start_start = segment->start;
1719
1720   if (current->format == GST_FORMAT_TIME) {
1721     end = current->start + current->amount;
1722     if (!current->flush) {
1723       /* update the segment clipping regions for non-flushing seeks */
1724       if (segment->rate > 0.0) {
1725         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1726         segment->last_stop = segment->stop;
1727       } else {
1728         gint64 position;
1729
1730         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1731         segment->time = position;
1732         segment->start = position;
1733         segment->last_stop = position;
1734       }
1735     }
1736   }
1737
1738   GST_DEBUG_OBJECT (sink,
1739       "segment now rate %lf, applied rate %lf, "
1740       "format GST_FORMAT_TIME, "
1741       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1742       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1743       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1744       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1745       GST_TIME_ARGS (segment->accum));
1746
1747   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1748       GST_TIME_ARGS (current->start));
1749
1750   if (current->amount == -1) {
1751     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1752     current->valid = FALSE;
1753   } else {
1754     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1755         "rate: %f", current->amount, gst_format_get_name (current->format),
1756         current->rate);
1757   }
1758 }
1759
1760 static void
1761 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1762     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1763 {
1764   gint64 stop, position;
1765   GstMessage *message;
1766
1767   GST_DEBUG_OBJECT (sink, "step complete");
1768
1769   if (segment->rate > 0.0)
1770     stop = rstart;
1771   else
1772     stop = rstop;
1773
1774   GST_DEBUG_OBJECT (sink,
1775       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1776
1777   if (stop == -1)
1778     current->duration = current->position;
1779   else
1780     current->duration = stop - current->start;
1781
1782   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1783       GST_TIME_ARGS (current->duration));
1784
1785   position = current->start + current->duration;
1786
1787   /* now move the segment to the new running time */
1788   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1789
1790   if (current->flush) {
1791     /* and remove the accumulated time we flushed, start time did not change */
1792     segment->accum = current->start;
1793   } else {
1794     /* start time is now the stepped position */
1795     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1796   }
1797
1798   /* restore the previous rate */
1799   segment->rate = current->start_rate;
1800   segment->abs_rate = ABS (segment->rate);
1801
1802   if (segment->rate > 0.0)
1803     segment->stop = current->start_stop;
1804   else
1805     segment->start = current->start_start;
1806
1807   /* the clip segment is used for position report in paused... */
1808   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1809
1810   /* post the step done when we know the stepped duration in TIME */
1811   message =
1812       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1813       current->amount, current->rate, current->flush, current->intermediate,
1814       current->duration, eos);
1815   gst_message_set_seqnum (message, current->seqnum);
1816   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1817
1818   if (!current->intermediate)
1819     sink->need_preroll = current->need_preroll;
1820
1821   /* and the current step info finished and becomes invalid */
1822   current->valid = FALSE;
1823 }
1824
1825 static gboolean
1826 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1827     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1828     gint64 * rstop)
1829 {
1830   gboolean step_end = FALSE;
1831
1832   /* see if we need to skip this buffer because of stepping */
1833   switch (current->format) {
1834     case GST_FORMAT_TIME:
1835     {
1836       guint64 end;
1837       gint64 first, last;
1838
1839       if (segment->rate > 0.0) {
1840         if (segment->stop == *cstop)
1841           *rstop = *rstart + current->amount;
1842
1843         first = *rstart;
1844         last = *rstop;
1845       } else {
1846         if (segment->start == *cstart)
1847           *rstart = *rstop + current->amount;
1848
1849         first = *rstop;
1850         last = *rstart;
1851       }
1852
1853       end = current->start + current->amount;
1854       current->position = first - current->start;
1855
1856       if (G_UNLIKELY (segment->abs_rate != 1.0))
1857         current->position /= segment->abs_rate;
1858
1859       GST_DEBUG_OBJECT (sink,
1860           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1861           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1862       GST_DEBUG_OBJECT (sink,
1863           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1864           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1865           GST_TIME_ARGS (last - current->start),
1866           GST_TIME_ARGS (current->amount));
1867
1868       if ((current->flush && current->position >= current->amount)
1869           || last >= end) {
1870         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1871         step_end = TRUE;
1872         if (segment->rate > 0.0) {
1873           *rstart = end;
1874           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1875         } else {
1876           *rstop = end;
1877           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1878         }
1879       }
1880       GST_DEBUG_OBJECT (sink,
1881           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1882           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1883       GST_DEBUG_OBJECT (sink,
1884           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1885           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1886       break;
1887     }
1888     case GST_FORMAT_BUFFERS:
1889       GST_DEBUG_OBJECT (sink,
1890           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1891           current->position, current->amount);
1892
1893       if (current->position < current->amount) {
1894         current->position++;
1895       } else {
1896         step_end = TRUE;
1897       }
1898       break;
1899     case GST_FORMAT_DEFAULT:
1900     default:
1901       GST_DEBUG_OBJECT (sink,
1902           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1903           current->position, current->amount);
1904       break;
1905   }
1906   return step_end;
1907 }
1908
1909 /* with STREAM_LOCK, PREROLL_LOCK
1910  *
1911  * Returns TRUE if the object needs synchronisation and takes therefore
1912  * part in prerolling.
1913  *
1914  * rsstart/rsstop contain the start/stop in stream time.
1915  * rrstart/rrstop contain the start/stop in running time.
1916  */
1917 static gboolean
1918 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1919     GstClockTime * rsstart, GstClockTime * rsstop,
1920     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1921     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1922     gboolean * step_end, guint8 obj_type)
1923 {
1924   GstBaseSinkClass *bclass;
1925   GstBuffer *buffer;
1926   GstClockTime start, stop;     /* raw start/stop timestamps */
1927   gint64 cstart, cstop;         /* clipped raw timestamps */
1928   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1929   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1930   GstFormat format;
1931   GstBaseSinkPrivate *priv;
1932   gboolean eos;
1933
1934   priv = basesink->priv;
1935
1936   /* start with nothing */
1937   start = stop = GST_CLOCK_TIME_NONE;
1938
1939   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1940     GstEvent *event = GST_EVENT_CAST (obj);
1941
1942     switch (GST_EVENT_TYPE (event)) {
1943         /* EOS event needs syncing */
1944       case GST_EVENT_EOS:
1945       {
1946         if (basesink->segment.rate >= 0.0) {
1947           sstart = sstop = priv->current_sstop;
1948           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1949             /* we have not seen a buffer yet, use the segment values */
1950             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1951                 basesink->segment.format, basesink->segment.stop);
1952           }
1953         } else {
1954           sstart = sstop = priv->current_sstart;
1955           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1956             /* we have not seen a buffer yet, use the segment values */
1957             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1958                 basesink->segment.format, basesink->segment.start);
1959           }
1960         }
1961
1962         rstart = rstop = priv->eos_rtime;
1963         *do_sync = rstart != -1;
1964         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1965             GST_TIME_ARGS (rstart));
1966         /* if we are stepping, we end now */
1967         *step_end = step->valid;
1968         eos = TRUE;
1969         goto eos_done;
1970       }
1971       default:
1972         /* other events do not need syncing */
1973         /* FIXME, maybe NEWSEGMENT might need synchronisation
1974          * since the POSITION query depends on accumulated times and
1975          * we cannot accumulate the current segment before the previous
1976          * one completed.
1977          */
1978         return FALSE;
1979     }
1980   }
1981
1982   eos = FALSE;
1983
1984 again:
1985   /* else do buffer sync code */
1986   buffer = GST_BUFFER_CAST (obj);
1987
1988   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1989
1990   /* just get the times to see if we need syncing, if the start returns -1 we
1991    * don't sync. */
1992   if (bclass->get_times)
1993     bclass->get_times (basesink, buffer, &start, &stop);
1994
1995   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1996     /* we don't need to sync but we still want to get the timestamps for
1997      * tracking the position */
1998     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1999     *do_sync = FALSE;
2000   } else {
2001     *do_sync = TRUE;
2002   }
2003
2004   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2005       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2006       GST_TIME_ARGS (stop), *do_sync);
2007
2008   /* collect segment and format for code clarity */
2009   format = segment->format;
2010
2011   /* no timestamp clipping if we did not get a TIME segment format */
2012   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2013     cstart = start;
2014     cstop = stop;
2015     /* do running and stream time in TIME format */
2016     format = GST_FORMAT_TIME;
2017     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2018     goto do_times;
2019   }
2020
2021   /* clip, only when we know about time */
2022   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2023               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2024     if (step->valid) {
2025       GST_DEBUG_OBJECT (basesink, "step out of segment");
2026       /* when we are stepping, pretend we're at the end of the segment */
2027       if (segment->rate > 0.0) {
2028         cstart = segment->stop;
2029         cstop = segment->stop;
2030       } else {
2031         cstart = segment->start;
2032         cstop = segment->start;
2033       }
2034       goto do_times;
2035     }
2036     goto out_of_segment;
2037   }
2038
2039   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2040     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2041         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2042         GST_TIME_ARGS (cstop));
2043   }
2044
2045   /* set last stop position */
2046   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2047     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2048   else
2049     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2050
2051 do_times:
2052   rstart = gst_segment_to_running_time (segment, format, cstart);
2053   rstop = gst_segment_to_running_time (segment, format, cstop);
2054
2055   if (G_UNLIKELY (step->valid)) {
2056     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2057                 &rstart, &rstop))) {
2058       /* step is still busy, we discard data when we are flushing */
2059       *stepped = step->flush;
2060       GST_DEBUG_OBJECT (basesink, "stepping busy");
2061     }
2062   }
2063   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2064    * upstream is behaving very badly */
2065   sstart = gst_segment_to_stream_time (segment, format, cstart);
2066   sstop = gst_segment_to_stream_time (segment, format, cstop);
2067
2068 eos_done:
2069   /* eos_done label only called when doing EOS, we also stop stepping then */
2070   if (*step_end && step->flush) {
2071     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2072     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2073     *step_end = FALSE;
2074     /* re-determine running start times for adjusted segment
2075      * (which has a flushed amount of running/accumulated time removed) */
2076     if (!GST_IS_EVENT (obj)) {
2077       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2078       goto again;
2079     }
2080   }
2081
2082   /* save times */
2083   *rsstart = sstart;
2084   *rsstop = sstop;
2085   *rrstart = rstart;
2086   *rrstop = rstop;
2087
2088   /* buffers and EOS always need syncing and preroll */
2089   return TRUE;
2090
2091   /* special cases */
2092 out_of_segment:
2093   {
2094     /* we usually clip in the chain function already but stepping could cause
2095      * the segment to be updated later. we return FALSE so that we don't try
2096      * to sync on it. */
2097     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2098     return FALSE;
2099   }
2100 }
2101
2102 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2103  * adjust a timestamp with the latency and timestamp offset. This function does
2104  * not adjust for the render delay. */
2105 static GstClockTime
2106 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2107 {
2108   GstClockTimeDiff ts_offset;
2109
2110   /* don't do anything funny with invalid timestamps */
2111   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2112     return time;
2113
2114   time += basesink->priv->latency;
2115
2116   /* apply offset, be carefull for underflows */
2117   ts_offset = basesink->priv->ts_offset;
2118   if (ts_offset < 0) {
2119     ts_offset = -ts_offset;
2120     if (ts_offset < time)
2121       time -= ts_offset;
2122     else
2123       time = 0;
2124   } else
2125     time += ts_offset;
2126
2127   /* subtract the render delay again, which was included in the latency */
2128   if (time > basesink->priv->render_delay)
2129     time -= basesink->priv->render_delay;
2130   else
2131     time = 0;
2132
2133   return time;
2134 }
2135
2136 /**
2137  * gst_base_sink_wait_clock:
2138  * @sink: the sink
2139  * @time: the running_time to be reached
2140  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2141  *
2142  * This function will block until @time is reached. It is usually called by
2143  * subclasses that use their own internal synchronisation.
2144  *
2145  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2146  * returned. Likewise, if synchronisation is disabled in the element or there
2147  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2148  *
2149  * This function should only be called with the PREROLL_LOCK held, like when
2150  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2151  * receiving a buffer in
2152  * the #GstBaseSinkClass.render() vmethod.
2153  *
2154  * The @time argument should be the running_time of when this method should
2155  * return and is not adjusted with any latency or offset configured in the
2156  * sink.
2157  *
2158  * Since: 0.10.20
2159  *
2160  * Returns: #GstClockReturn
2161  */
2162 GstClockReturn
2163 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2164     GstClockTimeDiff * jitter)
2165 {
2166   GstClockReturn ret;
2167   GstClock *clock;
2168   GstClockTime base_time;
2169
2170   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2171     goto invalid_time;
2172
2173   GST_OBJECT_LOCK (sink);
2174   if (G_UNLIKELY (!sink->sync))
2175     goto no_sync;
2176
2177   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2178     goto no_clock;
2179
2180   base_time = GST_ELEMENT_CAST (sink)->base_time;
2181   GST_LOG_OBJECT (sink,
2182       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2183       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2184
2185   /* add base_time to running_time to get the time against the clock */
2186   time += base_time;
2187
2188   /* Re-use existing clockid if available */
2189   if (G_LIKELY (sink->priv->cached_clock_id != NULL)) {
2190     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2191             time)) {
2192       gst_clock_id_unref (sink->priv->cached_clock_id);
2193       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2194     }
2195   } else
2196     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2197   GST_OBJECT_UNLOCK (sink);
2198
2199   /* A blocking wait is performed on the clock. We save the ClockID
2200    * so we can unlock the entry at any time. While we are blocking, we
2201    * release the PREROLL_LOCK so that other threads can interrupt the
2202    * entry. */
2203   sink->clock_id = sink->priv->cached_clock_id;
2204   /* release the preroll lock while waiting */
2205   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2206
2207   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2208
2209   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2210   sink->clock_id = NULL;
2211
2212   return ret;
2213
2214   /* no syncing needed */
2215 invalid_time:
2216   {
2217     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2218     return GST_CLOCK_BADTIME;
2219   }
2220 no_sync:
2221   {
2222     GST_DEBUG_OBJECT (sink, "sync disabled");
2223     GST_OBJECT_UNLOCK (sink);
2224     return GST_CLOCK_BADTIME;
2225   }
2226 no_clock:
2227   {
2228     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2229     GST_OBJECT_UNLOCK (sink);
2230     return GST_CLOCK_BADTIME;
2231   }
2232 }
2233
2234 /**
2235  * gst_base_sink_wait_preroll:
2236  * @sink: the sink
2237  *
2238  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2239  * against the clock it must unblock when going from PLAYING to the PAUSED state
2240  * and call this method before continuing to render the remaining data.
2241  *
2242  * This function will block until a state change to PLAYING happens (in which
2243  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2244  * to a state change to READY or a FLUSH event (in which case this function
2245  * returns #GST_FLOW_WRONG_STATE).
2246  *
2247  * This function should only be called with the PREROLL_LOCK held, like in the
2248  * render function.
2249  *
2250  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2251  * continue. Any other return value should be returned from the render vmethod.
2252  *
2253  * Since: 0.10.11
2254  */
2255 GstFlowReturn
2256 gst_base_sink_wait_preroll (GstBaseSink * sink)
2257 {
2258   sink->have_preroll = TRUE;
2259   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2260   /* block until the state changes, or we get a flush, or something */
2261   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2262   sink->have_preroll = FALSE;
2263   if (G_UNLIKELY (sink->flushing))
2264     goto stopping;
2265   if (G_UNLIKELY (sink->priv->step_unlock))
2266     goto step_unlocked;
2267   GST_DEBUG_OBJECT (sink, "continue after preroll");
2268
2269   return GST_FLOW_OK;
2270
2271   /* ERRORS */
2272 stopping:
2273   {
2274     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2275     return GST_FLOW_WRONG_STATE;
2276   }
2277 step_unlocked:
2278   {
2279     sink->priv->step_unlock = FALSE;
2280     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2281     return GST_FLOW_STEP;
2282   }
2283 }
2284
2285 static inline guint8
2286 get_object_type (GstMiniObject * obj)
2287 {
2288   guint8 obj_type;
2289
2290   if (G_LIKELY (GST_IS_BUFFER (obj)))
2291     obj_type = _PR_IS_BUFFER;
2292   else if (GST_IS_EVENT (obj))
2293     obj_type = _PR_IS_EVENT;
2294   else if (GST_IS_BUFFER_LIST (obj))
2295     obj_type = _PR_IS_BUFFERLIST;
2296   else
2297     obj_type = _PR_IS_NOTHING;
2298
2299   return obj_type;
2300 }
2301
2302 /**
2303  * gst_base_sink_do_preroll:
2304  * @sink: the sink
2305  * @obj: (transfer none): the mini object that caused the preroll
2306  *
2307  * If the @sink spawns its own thread for pulling buffers from upstream it
2308  * should call this method after it has pulled a buffer. If the element needed
2309  * to preroll, this function will perform the preroll and will then block
2310  * until the element state is changed.
2311  *
2312  * This function should be called with the PREROLL_LOCK held.
2313  *
2314  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2315  * continue. Any other return value should be returned from the render vmethod.
2316  *
2317  * Since: 0.10.22
2318  */
2319 GstFlowReturn
2320 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2321 {
2322   GstFlowReturn ret;
2323
2324   while (G_UNLIKELY (sink->need_preroll)) {
2325     guint8 obj_type;
2326     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2327
2328     obj_type = get_object_type (obj);
2329
2330     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2331     if (ret != GST_FLOW_OK)
2332       goto preroll_failed;
2333
2334     /* need to recheck here because the commit state could have
2335      * made us not need the preroll anymore */
2336     if (G_LIKELY (sink->need_preroll)) {
2337       /* block until the state changes, or we get a flush, or something */
2338       ret = gst_base_sink_wait_preroll (sink);
2339       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2340         goto preroll_failed;
2341     }
2342   }
2343   return GST_FLOW_OK;
2344
2345   /* ERRORS */
2346 preroll_failed:
2347   {
2348     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2349     return ret;
2350   }
2351 }
2352
2353 /**
2354  * gst_base_sink_wait_eos:
2355  * @sink: the sink
2356  * @time: the running_time to be reached
2357  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2358  *
2359  * This function will block until @time is reached. It is usually called by
2360  * subclasses that use their own internal synchronisation but want to let the
2361  * EOS be handled by the base class.
2362  *
2363  * This function should only be called with the PREROLL_LOCK held, like when
2364  * receiving an EOS event in the ::event vmethod.
2365  *
2366  * The @time argument should be the running_time of when the EOS should happen
2367  * and will be adjusted with any latency and offset configured in the sink.
2368  *
2369  * Returns: #GstFlowReturn
2370  *
2371  * Since: 0.10.15
2372  */
2373 GstFlowReturn
2374 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2375     GstClockTimeDiff * jitter)
2376 {
2377   GstClockReturn status;
2378   GstFlowReturn ret;
2379
2380   do {
2381     GstClockTime stime;
2382
2383     GST_DEBUG_OBJECT (sink, "checking preroll");
2384
2385     /* first wait for the playing state before we can continue */
2386     while (G_UNLIKELY (sink->need_preroll)) {
2387       ret = gst_base_sink_wait_preroll (sink);
2388       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2389         goto flushing;
2390     }
2391
2392     /* preroll done, we can sync since we are in PLAYING now. */
2393     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2394         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2395
2396     /* compensate for latency and ts_offset. We don't adjust for render delay
2397      * because we don't interact with the device on EOS normally. */
2398     stime = gst_base_sink_adjust_time (sink, time);
2399
2400     /* wait for the clock, this can be interrupted because we got shut down or
2401      * we PAUSED. */
2402     status = gst_base_sink_wait_clock (sink, stime, jitter);
2403
2404     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2405
2406     /* invalid time, no clock or sync disabled, just continue then */
2407     if (status == GST_CLOCK_BADTIME)
2408       break;
2409
2410     /* waiting could have been interrupted and we can be flushing now */
2411     if (G_UNLIKELY (sink->flushing))
2412       goto flushing;
2413
2414     /* retry if we got unscheduled, which means we did not reach the timeout
2415      * yet. if some other error occures, we continue. */
2416   } while (status == GST_CLOCK_UNSCHEDULED);
2417
2418   GST_DEBUG_OBJECT (sink, "end of stream");
2419
2420   return GST_FLOW_OK;
2421
2422   /* ERRORS */
2423 flushing:
2424   {
2425     GST_DEBUG_OBJECT (sink, "we are flushing");
2426     return GST_FLOW_WRONG_STATE;
2427   }
2428 }
2429
2430 /* with STREAM_LOCK, PREROLL_LOCK
2431  *
2432  * Make sure we are in PLAYING and synchronize an object to the clock.
2433  *
2434  * If we need preroll, we are not in PLAYING. We try to commit the state
2435  * if needed and then block if we still are not PLAYING.
2436  *
2437  * We start waiting on the clock in PLAYING. If we got interrupted, we
2438  * immediatly try to re-preroll.
2439  *
2440  * Some objects do not need synchronisation (most events) and so this function
2441  * immediatly returns GST_FLOW_OK.
2442  *
2443  * for objects that arrive later than max-lateness to be synchronized to the
2444  * clock have the @late boolean set to TRUE.
2445  *
2446  * This function keeps a running average of the jitter (the diff between the
2447  * clock time and the requested sync time). The jitter is negative for
2448  * objects that arrive in time and positive for late buffers.
2449  *
2450  * does not take ownership of obj.
2451  */
2452 static GstFlowReturn
2453 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2454     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2455 {
2456   GstClockTimeDiff jitter = 0;
2457   gboolean syncable;
2458   GstClockReturn status = GST_CLOCK_OK;
2459   GstClockTime rstart, rstop, sstart, sstop, stime;
2460   gboolean do_sync;
2461   GstBaseSinkPrivate *priv;
2462   GstFlowReturn ret;
2463   GstStepInfo *current, *pending;
2464   gboolean stepped;
2465
2466   priv = basesink->priv;
2467
2468 do_step:
2469   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2470   do_sync = TRUE;
2471   stepped = FALSE;
2472
2473   priv->current_rstart = GST_CLOCK_TIME_NONE;
2474
2475   /* get stepping info */
2476   current = &priv->current_step;
2477   pending = &priv->pending_step;
2478
2479   /* get timing information for this object against the render segment */
2480   syncable = gst_base_sink_get_sync_times (basesink, obj,
2481       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2482       current, step_end, obj_type);
2483
2484   if (G_UNLIKELY (stepped))
2485     goto step_skipped;
2486
2487   /* a syncable object needs to participate in preroll and
2488    * clocking. All buffers and EOS are syncable. */
2489   if (G_UNLIKELY (!syncable))
2490     goto not_syncable;
2491
2492   /* store timing info for current object */
2493   priv->current_rstart = rstart;
2494   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2495
2496   /* save sync time for eos when the previous object needed sync */
2497   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2498
2499   /* calculate inter frame spacing */
2500   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2501     GstClockTime in_diff;
2502
2503     in_diff = rstart - priv->prev_rstart;
2504
2505     if (priv->avg_in_diff == -1)
2506       priv->avg_in_diff = in_diff;
2507     else
2508       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2509
2510     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2511         GST_TIME_ARGS (priv->avg_in_diff));
2512
2513   }
2514   priv->prev_rstart = rstart;
2515
2516   if (G_UNLIKELY (priv->earliest_in_time != -1
2517           && rstart < priv->earliest_in_time))
2518     goto qos_dropped;
2519
2520 again:
2521   /* first do preroll, this makes sure we commit our state
2522    * to PAUSED and can continue to PLAYING. We cannot perform
2523    * any clock sync in PAUSED because there is no clock. */
2524   ret = gst_base_sink_do_preroll (basesink, obj);
2525   if (G_UNLIKELY (ret != GST_FLOW_OK))
2526     goto preroll_failed;
2527
2528   /* update the segment with a pending step if the current one is invalid and we
2529    * have a new pending one. We only accept new step updates after a preroll */
2530   if (G_UNLIKELY (pending->valid && !current->valid)) {
2531     start_stepping (basesink, &basesink->segment, pending, current);
2532     goto do_step;
2533   }
2534
2535   /* After rendering we store the position of the last buffer so that we can use
2536    * it to report the position. We need to take the lock here. */
2537   GST_OBJECT_LOCK (basesink);
2538   priv->current_sstart = sstart;
2539   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2540   GST_OBJECT_UNLOCK (basesink);
2541
2542   if (!do_sync)
2543     goto done;
2544
2545   /* adjust for latency */
2546   stime = gst_base_sink_adjust_time (basesink, rstart);
2547
2548   /* adjust for render-delay, avoid underflows */
2549   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2550     if (stime > priv->render_delay)
2551       stime -= priv->render_delay;
2552     else
2553       stime = 0;
2554   }
2555
2556   /* preroll done, we can sync since we are in PLAYING now. */
2557   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2558       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2559       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2560
2561   /* This function will return immediatly if start == -1, no clock
2562    * or sync is disabled with GST_CLOCK_BADTIME. */
2563   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2564
2565   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2566       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2567
2568   /* invalid time, no clock or sync disabled, just render */
2569   if (status == GST_CLOCK_BADTIME)
2570     goto done;
2571
2572   /* waiting could have been interrupted and we can be flushing now */
2573   if (G_UNLIKELY (basesink->flushing))
2574     goto flushing;
2575
2576   /* check for unlocked by a state change, we are not flushing so
2577    * we can try to preroll on the current buffer. */
2578   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2579     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2580     priv->call_preroll = TRUE;
2581     goto again;
2582   }
2583
2584   /* successful syncing done, record observation */
2585   priv->current_jitter = jitter;
2586
2587   /* check if the object should be dropped */
2588   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2589       status, jitter);
2590
2591 done:
2592   return GST_FLOW_OK;
2593
2594   /* ERRORS */
2595 step_skipped:
2596   {
2597     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2598     *late = TRUE;
2599     return GST_FLOW_OK;
2600   }
2601 not_syncable:
2602   {
2603     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2604     return GST_FLOW_OK;
2605   }
2606 qos_dropped:
2607   {
2608     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2609     *late = TRUE;
2610     return GST_FLOW_OK;
2611   }
2612 flushing:
2613   {
2614     GST_DEBUG_OBJECT (basesink, "we are flushing");
2615     return GST_FLOW_WRONG_STATE;
2616   }
2617 preroll_failed:
2618   {
2619     GST_DEBUG_OBJECT (basesink, "preroll failed");
2620     *step_end = FALSE;
2621     return ret;
2622   }
2623 }
2624
2625 static gboolean
2626 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2627     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2628 {
2629   GstEvent *event;
2630   gboolean res;
2631
2632   /* generate Quality-of-Service event */
2633   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2634       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2635       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2636
2637   event = gst_event_new_qos_full (type, proportion, diff, time);
2638
2639   /* send upstream */
2640   res = gst_pad_push_event (basesink->sinkpad, event);
2641
2642   return res;
2643 }
2644
2645 static void
2646 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2647 {
2648   GstBaseSinkPrivate *priv;
2649   GstClockTime start, stop;
2650   GstClockTimeDiff jitter;
2651   GstClockTime pt, entered, left;
2652   GstClockTime duration;
2653   gdouble rate;
2654
2655   priv = sink->priv;
2656
2657   start = priv->current_rstart;
2658
2659   if (priv->current_step.valid)
2660     return;
2661
2662   /* if Quality-of-Service disabled, do nothing */
2663   if (!g_atomic_int_get (&priv->qos_enabled) ||
2664       !GST_CLOCK_TIME_IS_VALID (start))
2665     return;
2666
2667   stop = priv->current_rstop;
2668   jitter = priv->current_jitter;
2669
2670   if (jitter < 0) {
2671     /* this is the time the buffer entered the sink */
2672     if (start < -jitter)
2673       entered = 0;
2674     else
2675       entered = start + jitter;
2676     left = start;
2677   } else {
2678     /* this is the time the buffer entered the sink */
2679     entered = start + jitter;
2680     /* this is the time the buffer left the sink */
2681     left = start + jitter;
2682   }
2683
2684   /* calculate duration of the buffer */
2685   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2686     duration = stop - start;
2687   else
2688     duration = priv->avg_in_diff;
2689
2690   /* if we have the time when the last buffer left us, calculate
2691    * processing time */
2692   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2693     if (entered > priv->last_left) {
2694       pt = entered - priv->last_left;
2695     } else {
2696       pt = 0;
2697     }
2698   } else {
2699     pt = priv->avg_pt;
2700   }
2701
2702   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2703       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2704       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2705       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2706       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2707       GST_TIME_ARGS (duration), jitter);
2708
2709   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2710       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2711       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2712       priv->avg_rate);
2713
2714   /* collect running averages. for first observations, we copy the
2715    * values */
2716   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2717     priv->avg_duration = duration;
2718   else
2719     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2720
2721   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2722     priv->avg_pt = pt;
2723   else
2724     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2725
2726   if (priv->avg_duration != 0)
2727     rate =
2728         gst_guint64_to_gdouble (priv->avg_pt) /
2729         gst_guint64_to_gdouble (priv->avg_duration);
2730   else
2731     rate = 1.0;
2732
2733   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2734     if (dropped || priv->avg_rate < 0.0) {
2735       priv->avg_rate = rate;
2736     } else {
2737       if (rate > 1.0)
2738         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2739       else
2740         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2741     }
2742   }
2743
2744   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2745       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2746       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2747       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2748
2749
2750   if (priv->avg_rate >= 0.0) {
2751     GstQOSType type;
2752     GstClockTimeDiff diff;
2753
2754     /* if we have a valid rate, start sending QoS messages */
2755     if (priv->current_jitter < 0) {
2756       /* make sure we never go below 0 when adding the jitter to the
2757        * timestamp. */
2758       if (priv->current_rstart < -priv->current_jitter)
2759         priv->current_jitter = -priv->current_rstart;
2760     }
2761
2762     if (priv->throttle_time > 0) {
2763       diff = priv->throttle_time;
2764       type = GST_QOS_TYPE_THROTTLE;
2765     } else {
2766       diff = priv->current_jitter;
2767       if (diff <= 0)
2768         type = GST_QOS_TYPE_OVERFLOW;
2769       else
2770         type = GST_QOS_TYPE_UNDERFLOW;
2771     }
2772
2773     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2774         diff);
2775   }
2776
2777   /* record when this buffer will leave us */
2778   priv->last_left = left;
2779 }
2780
2781 /* reset all qos measuring */
2782 static void
2783 gst_base_sink_reset_qos (GstBaseSink * sink)
2784 {
2785   GstBaseSinkPrivate *priv;
2786
2787   priv = sink->priv;
2788
2789   priv->last_render_time = GST_CLOCK_TIME_NONE;
2790   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2791   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2792   priv->last_left = GST_CLOCK_TIME_NONE;
2793   priv->avg_duration = GST_CLOCK_TIME_NONE;
2794   priv->avg_pt = GST_CLOCK_TIME_NONE;
2795   priv->avg_rate = -1.0;
2796   priv->avg_render = GST_CLOCK_TIME_NONE;
2797   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2798   priv->rendered = 0;
2799   priv->dropped = 0;
2800
2801 }
2802
2803 /* Checks if the object was scheduled too late.
2804  *
2805  * rstart/rstop contain the running_time start and stop values
2806  * of the object.
2807  *
2808  * status and jitter contain the return values from the clock wait.
2809  *
2810  * returns TRUE if the buffer was too late.
2811  */
2812 static gboolean
2813 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2814     GstClockTime rstart, GstClockTime rstop,
2815     GstClockReturn status, GstClockTimeDiff jitter)
2816 {
2817   gboolean late;
2818   gint64 max_lateness;
2819   GstBaseSinkPrivate *priv;
2820
2821   priv = basesink->priv;
2822
2823   late = FALSE;
2824
2825   /* only for objects that were too late */
2826   if (G_LIKELY (status != GST_CLOCK_EARLY))
2827     goto in_time;
2828
2829   max_lateness = basesink->abidata.ABI.max_lateness;
2830
2831   /* check if frame dropping is enabled */
2832   if (max_lateness == -1)
2833     goto no_drop;
2834
2835   /* only check for buffers */
2836   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2837     goto not_buffer;
2838
2839   /* can't do check if we don't have a timestamp */
2840   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2841     goto no_timestamp;
2842
2843   /* we can add a valid stop time */
2844   if (GST_CLOCK_TIME_IS_VALID (rstop))
2845     max_lateness += rstop;
2846   else {
2847     max_lateness += rstart;
2848     /* no stop time, use avg frame diff */
2849     if (priv->avg_in_diff != -1)
2850       max_lateness += priv->avg_in_diff;
2851   }
2852
2853   /* if the jitter bigger than duration and lateness we are too late */
2854   if ((late = rstart + jitter > max_lateness)) {
2855     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2856         "buffer is too late %" GST_TIME_FORMAT
2857         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2858         GST_TIME_ARGS (max_lateness));
2859     /* !!emergency!!, if we did not receive anything valid for more than a
2860      * second, render it anyway so the user sees something */
2861     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2862         rstart - priv->last_render_time > GST_SECOND) {
2863       late = FALSE;
2864       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2865           (_("A lot of buffers are being dropped.")),
2866           ("There may be a timestamping problem, or this computer is too slow."));
2867       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2868           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2869           GST_TIME_ARGS (priv->last_render_time));
2870     }
2871   }
2872
2873 done:
2874   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2875     priv->last_render_time = rstart;
2876     /* the next allowed input timestamp */
2877     if (priv->throttle_time > 0)
2878       priv->earliest_in_time = rstart + priv->throttle_time;
2879   }
2880   return late;
2881
2882   /* all is fine */
2883 in_time:
2884   {
2885     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2886     goto done;
2887   }
2888 no_drop:
2889   {
2890     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2891     goto done;
2892   }
2893 not_buffer:
2894   {
2895     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2896     return FALSE;
2897   }
2898 no_timestamp:
2899   {
2900     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2901     return FALSE;
2902   }
2903 }
2904
2905 /* called before and after calling the render vmethod. It keeps track of how
2906  * much time was spent in the render method and is used to check if we are
2907  * flooded */
2908 static void
2909 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2910 {
2911   GstBaseSinkPrivate *priv;
2912
2913   priv = basesink->priv;
2914
2915   if (start) {
2916     priv->start = gst_util_get_timestamp ();
2917   } else {
2918     GstClockTime elapsed;
2919
2920     priv->stop = gst_util_get_timestamp ();
2921
2922     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2923
2924     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2925       priv->avg_render = elapsed;
2926     else
2927       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2928
2929     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2930         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2931   }
2932 }
2933
2934 /* with STREAM_LOCK, PREROLL_LOCK,
2935  *
2936  * Synchronize the object on the clock and then render it.
2937  *
2938  * takes ownership of obj.
2939  */
2940 static GstFlowReturn
2941 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2942     guint8 obj_type, gpointer obj)
2943 {
2944   GstFlowReturn ret;
2945   GstBaseSinkClass *bclass;
2946   gboolean late, step_end;
2947   gpointer sync_obj;
2948   GstBaseSinkPrivate *priv;
2949
2950   priv = basesink->priv;
2951
2952   if (OBJ_IS_BUFFERLIST (obj_type)) {
2953     /*
2954      * If buffer list, use the first group buffer within the list
2955      * for syncing
2956      */
2957     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2958     g_assert (NULL != sync_obj);
2959   } else {
2960     sync_obj = obj;
2961   }
2962
2963 again:
2964   late = FALSE;
2965   step_end = FALSE;
2966
2967   /* synchronize this object, non syncable objects return OK
2968    * immediatly. */
2969   ret =
2970       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2971       obj_type);
2972   if (G_UNLIKELY (ret != GST_FLOW_OK))
2973     goto sync_failed;
2974
2975   /* and now render, event or buffer/buffer list. */
2976   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2977     /* drop late buffers unconditionally, let's hope it's unlikely */
2978     if (G_UNLIKELY (late))
2979       goto dropped;
2980
2981     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2982
2983     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2984             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2985       gint do_qos;
2986
2987       /* read once, to get same value before and after */
2988       do_qos = g_atomic_int_get (&priv->qos_enabled);
2989
2990       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2991
2992       /* record rendering time for QoS and stats */
2993       if (do_qos)
2994         gst_base_sink_do_render_stats (basesink, TRUE);
2995
2996       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2997         GstBuffer *buf;
2998
2999         /* For buffer lists do not set last buffer. Creating buffer
3000          * with meaningful data can be done only with memcpy which will
3001          * significantly affect performance */
3002         buf = GST_BUFFER_CAST (obj);
3003         gst_base_sink_set_last_buffer (basesink, buf);
3004
3005         ret = bclass->render (basesink, buf);
3006       } else {
3007         GstBufferList *buflist;
3008
3009         buflist = GST_BUFFER_LIST_CAST (obj);
3010
3011         ret = bclass->render_list (basesink, buflist);
3012       }
3013
3014       if (do_qos)
3015         gst_base_sink_do_render_stats (basesink, FALSE);
3016
3017       if (ret == GST_FLOW_STEP)
3018         goto again;
3019
3020       if (G_UNLIKELY (basesink->flushing))
3021         goto flushing;
3022
3023       priv->rendered++;
3024     }
3025   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3026     GstEvent *event = GST_EVENT_CAST (obj);
3027     gboolean event_res = TRUE;
3028     GstEventType type;
3029
3030     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3031
3032     type = GST_EVENT_TYPE (event);
3033
3034     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3035         gst_event_type_get_name (type));
3036
3037     if (bclass->event)
3038       event_res = bclass->event (basesink, event);
3039
3040     /* when we get here we could be flushing again when the event handler calls
3041      * _wait_eos(). We have to ignore this object in that case. */
3042     if (G_UNLIKELY (basesink->flushing))
3043       goto flushing;
3044
3045     if (G_LIKELY (event_res)) {
3046       guint32 seqnum;
3047
3048       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3049       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3050
3051       switch (type) {
3052         case GST_EVENT_EOS:
3053         {
3054           GstMessage *message;
3055
3056           /* the EOS event is completely handled so we mark
3057            * ourselves as being in the EOS state. eos is also
3058            * protected by the object lock so we can read it when
3059            * answering the POSITION query. */
3060           GST_OBJECT_LOCK (basesink);
3061           basesink->eos = TRUE;
3062           GST_OBJECT_UNLOCK (basesink);
3063
3064           /* ok, now we can post the message */
3065           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3066
3067           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3068           gst_message_set_seqnum (message, seqnum);
3069           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3070           break;
3071         }
3072         case GST_EVENT_NEWSEGMENT:
3073           /* configure the segment */
3074           gst_base_sink_configure_segment (basesink, pad, event,
3075               &basesink->segment);
3076           break;
3077         case GST_EVENT_SINK_MESSAGE:{
3078           GstMessage *msg = NULL;
3079
3080           gst_event_parse_sink_message (event, &msg);
3081
3082           if (msg)
3083             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3084         }
3085         default:
3086           break;
3087       }
3088     }
3089   } else {
3090     g_return_val_if_reached (GST_FLOW_ERROR);
3091   }
3092
3093 done:
3094   if (step_end) {
3095     /* the step ended, check if we need to activate a new step */
3096     GST_DEBUG_OBJECT (basesink, "step ended");
3097     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3098         priv->current_rstart, priv->current_rstop, basesink->eos);
3099     goto again;
3100   }
3101
3102   gst_base_sink_perform_qos (basesink, late);
3103
3104   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3105   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3106   return ret;
3107
3108   /* ERRORS */
3109 sync_failed:
3110   {
3111     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3112     goto done;
3113   }
3114 dropped:
3115   {
3116     priv->dropped++;
3117     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3118
3119     if (g_atomic_int_get (&priv->qos_enabled)) {
3120       GstMessage *qos_msg;
3121       GstClockTime timestamp, duration;
3122
3123       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3124       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3125
3126       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3127           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3128           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3129           GST_TIME_ARGS (priv->current_rstart),
3130           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3131           GST_TIME_ARGS (duration));
3132       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3133           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3134           priv->rendered, priv->dropped);
3135
3136       qos_msg =
3137           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3138           priv->current_rstart, priv->current_sstart, timestamp, duration);
3139       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3140           1000000);
3141       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3142           priv->dropped);
3143       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3144     }
3145     goto done;
3146   }
3147 flushing:
3148   {
3149     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3150     gst_mini_object_unref (obj);
3151     return GST_FLOW_WRONG_STATE;
3152   }
3153 }
3154
3155 /* with STREAM_LOCK, PREROLL_LOCK
3156  *
3157  * Perform preroll on the given object. For buffers this means
3158  * calling the preroll subclass method.
3159  * If that succeeds, the state will be commited.
3160  *
3161  * function does not take ownership of obj.
3162  */
3163 static GstFlowReturn
3164 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3165     GstMiniObject * obj)
3166 {
3167   GstFlowReturn ret;
3168
3169   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3170
3171   /* if it's a buffer, we need to call the preroll method */
3172   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3173     GstBaseSinkClass *bclass;
3174     GstBuffer *buf;
3175     GstClockTime timestamp;
3176
3177     if (OBJ_IS_BUFFERLIST (obj_type)) {
3178       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3179       g_assert (NULL != buf);
3180     } else {
3181       buf = GST_BUFFER_CAST (obj);
3182     }
3183
3184     timestamp = GST_BUFFER_TIMESTAMP (buf);
3185
3186     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3187         GST_TIME_ARGS (timestamp));
3188
3189     /*
3190      * For buffer lists do not set last buffer. Creating buffer
3191      * with meaningful data can be done only with memcpy which will
3192      * significantly affect performance
3193      */
3194     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3195       gst_base_sink_set_last_buffer (basesink, buf);
3196     }
3197
3198     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3199     if (bclass->preroll)
3200       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3201         goto preroll_failed;
3202
3203     basesink->priv->call_preroll = FALSE;
3204   }
3205
3206   /* commit state */
3207   if (G_LIKELY (basesink->playing_async)) {
3208     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3209       goto stopping;
3210   }
3211
3212   return GST_FLOW_OK;
3213
3214   /* ERRORS */
3215 preroll_failed:
3216   {
3217     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3218     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3219     return ret;
3220   }
3221 stopping:
3222   {
3223     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3224     return GST_FLOW_WRONG_STATE;
3225   }
3226 }
3227
3228 /* with STREAM_LOCK, PREROLL_LOCK
3229  *
3230  * Queue an object for rendering.
3231  * The first prerollable object queued will complete the preroll. If the
3232  * preroll queue if filled, we render all the objects in the queue.
3233  *
3234  * This function takes ownership of the object.
3235  */
3236 static GstFlowReturn
3237 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3238     guint8 obj_type, gpointer obj, gboolean prerollable)
3239 {
3240   GstFlowReturn ret = GST_FLOW_OK;
3241   gint length;
3242   GQueue *q;
3243
3244   if (G_UNLIKELY (basesink->need_preroll)) {
3245     if (G_LIKELY (prerollable))
3246       basesink->preroll_queued++;
3247
3248     length = basesink->preroll_queued;
3249
3250     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3251
3252     /* first prerollable item needs to finish the preroll */
3253     if (length == 1) {
3254       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3255       if (G_UNLIKELY (ret != GST_FLOW_OK))
3256         goto preroll_failed;
3257     }
3258     /* need to recheck if we need preroll, commmit state during preroll
3259      * could have made us not need more preroll. */
3260     if (G_UNLIKELY (basesink->need_preroll)) {
3261       /* see if we can render now, if we can't add the object to the preroll
3262        * queue. */
3263       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3264         goto more_preroll;
3265     }
3266   }
3267   /* we can start rendering (or blocking) the queued object
3268    * if any. */
3269   q = basesink->preroll_queue;
3270   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3271     GstMiniObject *o;
3272     guint8 ot;
3273
3274     o = g_queue_pop_head (q);
3275     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3276
3277     ot = get_object_type (o);
3278
3279     /* do something with the return value */
3280     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3281     if (ret != GST_FLOW_OK)
3282       goto dequeue_failed;
3283   }
3284
3285   /* now render the object */
3286   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3287   basesink->preroll_queued = 0;
3288
3289   return ret;
3290
3291   /* special cases */
3292 preroll_failed:
3293   {
3294     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3295         gst_flow_get_name (ret));
3296     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3297     return ret;
3298   }
3299 more_preroll:
3300   {
3301     /* add object to the queue and return */
3302     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3303         length, basesink->preroll_queue_max_len);
3304     g_queue_push_tail (basesink->preroll_queue, obj);
3305     return GST_FLOW_OK;
3306   }
3307 dequeue_failed:
3308   {
3309     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3310         gst_flow_get_name (ret));
3311     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3312     return ret;
3313   }
3314 }
3315
3316 /* with STREAM_LOCK
3317  *
3318  * This function grabs the PREROLL_LOCK and adds the object to
3319  * the queue.
3320  *
3321  * This function takes ownership of obj.
3322  *
3323  * Note: Only GstEvent seem to be passed to this private method
3324  */
3325 static GstFlowReturn
3326 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3327     GstMiniObject * obj, gboolean prerollable)
3328 {
3329   GstFlowReturn ret;
3330
3331   GST_PAD_PREROLL_LOCK (pad);
3332   if (G_UNLIKELY (basesink->flushing))
3333     goto flushing;
3334
3335   if (G_UNLIKELY (basesink->priv->received_eos))
3336     goto was_eos;
3337
3338   ret =
3339       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3340       prerollable);
3341   GST_PAD_PREROLL_UNLOCK (pad);
3342
3343   return ret;
3344
3345   /* ERRORS */
3346 flushing:
3347   {
3348     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3349     GST_PAD_PREROLL_UNLOCK (pad);
3350     gst_mini_object_unref (obj);
3351     return GST_FLOW_WRONG_STATE;
3352   }
3353 was_eos:
3354   {
3355     GST_DEBUG_OBJECT (basesink,
3356         "we are EOS, dropping object, return UNEXPECTED");
3357     GST_PAD_PREROLL_UNLOCK (pad);
3358     gst_mini_object_unref (obj);
3359     return GST_FLOW_UNEXPECTED;
3360   }
3361 }
3362
3363 static void
3364 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3365 {
3366   /* make sure we are not blocked on the clock also clear any pending
3367    * eos state. */
3368   gst_base_sink_set_flushing (basesink, pad, TRUE);
3369
3370   /* we grab the stream lock but that is not needed since setting the
3371    * sink to flushing would make sure no state commit is being done
3372    * anymore */
3373   GST_PAD_STREAM_LOCK (pad);
3374   gst_base_sink_reset_qos (basesink);
3375   /* and we need to commit our state again on the next
3376    * prerolled buffer */
3377   basesink->playing_async = TRUE;
3378   if (basesink->priv->async_enabled) {
3379     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3380   } else {
3381     basesink->priv->have_latency = TRUE;
3382   }
3383   gst_base_sink_set_last_buffer (basesink, NULL);
3384   GST_PAD_STREAM_UNLOCK (pad);
3385 }
3386
3387 static void
3388 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3389 {
3390   /* unset flushing so we can accept new data, this also flushes out any EOS
3391    * event. */
3392   gst_base_sink_set_flushing (basesink, pad, FALSE);
3393
3394   /* for position reporting */
3395   GST_OBJECT_LOCK (basesink);
3396   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3397   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3398   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3399   basesink->priv->call_preroll = TRUE;
3400   basesink->priv->current_step.valid = FALSE;
3401   basesink->priv->pending_step.valid = FALSE;
3402   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3403     /* we need new segment info after the flush. */
3404     basesink->have_newsegment = FALSE;
3405     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3406     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3407   }
3408   GST_OBJECT_UNLOCK (basesink);
3409 }
3410
3411 static gboolean
3412 gst_base_sink_event (GstPad * pad, GstEvent * event)
3413 {
3414   GstBaseSink *basesink;
3415   gboolean result = TRUE;
3416   GstBaseSinkClass *bclass;
3417
3418   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3419   if (G_UNLIKELY (basesink == NULL)) {
3420     gst_event_unref (event);
3421     return FALSE;
3422   }
3423
3424   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3425
3426   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3427       event);
3428
3429   switch (GST_EVENT_TYPE (event)) {
3430     case GST_EVENT_EOS:
3431     {
3432       GstFlowReturn ret;
3433
3434       GST_PAD_PREROLL_LOCK (pad);
3435       if (G_UNLIKELY (basesink->flushing))
3436         goto flushing;
3437
3438       if (G_UNLIKELY (basesink->priv->received_eos)) {
3439         /* we can't accept anything when we are EOS */
3440         result = FALSE;
3441         gst_event_unref (event);
3442       } else {
3443         /* we set the received EOS flag here so that we can use it when testing if
3444          * we are prerolled and to refuse more buffers. */
3445         basesink->priv->received_eos = TRUE;
3446
3447         /* EOS is a prerollable object, we call the unlocked version because it
3448          * does not check the received_eos flag. */
3449         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3450             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3451         if (G_UNLIKELY (ret != GST_FLOW_OK))
3452           result = FALSE;
3453       }
3454       GST_PAD_PREROLL_UNLOCK (pad);
3455       break;
3456     }
3457     case GST_EVENT_NEWSEGMENT:
3458     {
3459       GstFlowReturn ret;
3460       gboolean update;
3461
3462       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3463
3464       GST_PAD_PREROLL_LOCK (pad);
3465       if (G_UNLIKELY (basesink->flushing))
3466         goto flushing;
3467
3468       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3469           NULL, NULL);
3470
3471       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3472         /* we can't accept anything when we are EOS */
3473         result = FALSE;
3474         gst_event_unref (event);
3475       } else {
3476         /* the new segment is a non prerollable item and does not block anything,
3477          * we need to configure the current clipping segment and insert the event
3478          * in the queue to serialize it with the buffers for rendering. */
3479         gst_base_sink_configure_segment (basesink, pad, event,
3480             basesink->abidata.ABI.clip_segment);
3481
3482         ret =
3483             gst_base_sink_queue_object_unlocked (basesink, pad,
3484             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3485         if (G_UNLIKELY (ret != GST_FLOW_OK))
3486           result = FALSE;
3487         else {
3488           GST_OBJECT_LOCK (basesink);
3489           basesink->have_newsegment = TRUE;
3490           GST_OBJECT_UNLOCK (basesink);
3491         }
3492       }
3493       GST_PAD_PREROLL_UNLOCK (pad);
3494       break;
3495     }
3496     case GST_EVENT_FLUSH_START:
3497       if (bclass->event)
3498         bclass->event (basesink, event);
3499
3500       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3501
3502       gst_base_sink_flush_start (basesink, pad);
3503
3504       gst_event_unref (event);
3505       break;
3506     case GST_EVENT_FLUSH_STOP:
3507       if (bclass->event)
3508         bclass->event (basesink, event);
3509
3510       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3511
3512       gst_base_sink_flush_stop (basesink, pad);
3513
3514       gst_event_unref (event);
3515       break;
3516     default:
3517       /* other events are sent to queue or subclass depending on if they
3518        * are serialized. */
3519       if (GST_EVENT_IS_SERIALIZED (event)) {
3520         gst_base_sink_queue_object (basesink, pad,
3521             GST_MINI_OBJECT_CAST (event), FALSE);
3522       } else {
3523         if (bclass->event)
3524           bclass->event (basesink, event);
3525         gst_event_unref (event);
3526       }
3527       break;
3528   }
3529 done:
3530   gst_object_unref (basesink);
3531
3532   return result;
3533
3534   /* ERRORS */
3535 flushing:
3536   {
3537     GST_DEBUG_OBJECT (basesink, "we are flushing");
3538     GST_PAD_PREROLL_UNLOCK (pad);
3539     result = FALSE;
3540     gst_event_unref (event);
3541     goto done;
3542   }
3543 }
3544
3545 /* default implementation to calculate the start and end
3546  * timestamps on a buffer, subclasses can override
3547  */
3548 static void
3549 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3550     GstClockTime * start, GstClockTime * end)
3551 {
3552   GstClockTime timestamp, duration;
3553
3554   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3555   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3556
3557     /* get duration to calculate end time */
3558     duration = GST_BUFFER_DURATION (buffer);
3559     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3560       *end = timestamp + duration;
3561     }
3562     *start = timestamp;
3563   }
3564 }
3565
3566 /* must be called with PREROLL_LOCK */
3567 static gboolean
3568 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3569 {
3570   gboolean is_prerolled, res;
3571
3572   /* we have 2 cases where the PREROLL_LOCK is released:
3573    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3574    *  2) we are syncing on the clock
3575    */
3576   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3577   res = !is_prerolled;
3578
3579   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3580       basesink->have_preroll, basesink->priv->received_eos, res);
3581
3582   return res;
3583 }
3584
3585 /* with STREAM_LOCK, PREROLL_LOCK
3586  *
3587  * Takes a buffer and compare the timestamps with the last segment.
3588  * If the buffer falls outside of the segment boundaries, drop it.
3589  * Else queue the buffer for preroll and rendering.
3590  *
3591  * This function takes ownership of the buffer.
3592  */
3593 static GstFlowReturn
3594 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3595     guint8 obj_type, gpointer obj)
3596 {
3597   GstBaseSinkClass *bclass;
3598   GstFlowReturn result;
3599   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3600   GstSegment *clip_segment;
3601   GstBuffer *time_buf;
3602
3603   if (G_UNLIKELY (basesink->flushing))
3604     goto flushing;
3605
3606   if (G_UNLIKELY (basesink->priv->received_eos))
3607     goto was_eos;
3608
3609   if (OBJ_IS_BUFFERLIST (obj_type)) {
3610     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3611     g_assert (NULL != time_buf);
3612   } else {
3613     time_buf = GST_BUFFER_CAST (obj);
3614   }
3615
3616   /* for code clarity */
3617   clip_segment = basesink->abidata.ABI.clip_segment;
3618
3619   if (G_UNLIKELY (!basesink->have_newsegment)) {
3620     gboolean sync;
3621
3622     sync = gst_base_sink_get_sync (basesink);
3623     if (sync) {
3624       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3625           (_("Internal data flow problem.")),
3626           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3627     }
3628
3629     /* this means this sink will assume timestamps start from 0 */
3630     GST_OBJECT_LOCK (basesink);
3631     clip_segment->start = 0;
3632     clip_segment->stop = -1;
3633     basesink->segment.start = 0;
3634     basesink->segment.stop = -1;
3635     basesink->have_newsegment = TRUE;
3636     GST_OBJECT_UNLOCK (basesink);
3637   }
3638
3639   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3640
3641   /* check if the buffer needs to be dropped, we first ask the subclass for the
3642    * start and end */
3643   if (bclass->get_times)
3644     bclass->get_times (basesink, time_buf, &start, &end);
3645
3646   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3647     /* if the subclass does not want sync, we use our own values so that we at
3648      * least clip the buffer to the segment */
3649     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3650   }
3651
3652   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3653       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3654
3655   /* a dropped buffer does not participate in anything */
3656   if (GST_CLOCK_TIME_IS_VALID (start) &&
3657       (clip_segment->format == GST_FORMAT_TIME)) {
3658     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3659                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3660       goto out_of_segment;
3661   }
3662
3663   /* now we can process the buffer in the queue, this function takes ownership
3664    * of the buffer */
3665   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3666       obj_type, obj, TRUE);
3667   return result;
3668
3669   /* ERRORS */
3670 flushing:
3671   {
3672     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3673     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3674     return GST_FLOW_WRONG_STATE;
3675   }
3676 was_eos:
3677   {
3678     GST_DEBUG_OBJECT (basesink,
3679         "we are EOS, dropping object, return UNEXPECTED");
3680     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3681     return GST_FLOW_UNEXPECTED;
3682   }
3683 out_of_segment:
3684   {
3685     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3686     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3687     return GST_FLOW_OK;
3688   }
3689 }
3690
3691 /* with STREAM_LOCK
3692  */
3693 static GstFlowReturn
3694 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3695     guint8 obj_type, gpointer obj)
3696 {
3697   GstFlowReturn result;
3698
3699   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3700     goto wrong_mode;
3701
3702   GST_PAD_PREROLL_LOCK (pad);
3703   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3704   GST_PAD_PREROLL_UNLOCK (pad);
3705
3706 done:
3707   return result;
3708
3709   /* ERRORS */
3710 wrong_mode:
3711   {
3712     GST_OBJECT_LOCK (pad);
3713     GST_WARNING_OBJECT (basesink,
3714         "Push on pad %s:%s, but it was not activated in push mode",
3715         GST_DEBUG_PAD_NAME (pad));
3716     GST_OBJECT_UNLOCK (pad);
3717     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3718     /* we don't post an error message this will signal to the peer
3719      * pushing that EOS is reached. */
3720     result = GST_FLOW_UNEXPECTED;
3721     goto done;
3722   }
3723 }
3724
3725 static GstFlowReturn
3726 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3727 {
3728   GstBaseSink *basesink;
3729
3730   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3731
3732   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3733 }
3734
3735 static GstFlowReturn
3736 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3737 {
3738   GstBaseSink *basesink;
3739   GstBaseSinkClass *bclass;
3740   GstFlowReturn result;
3741
3742   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3743   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3744
3745   if (G_LIKELY (bclass->render_list)) {
3746     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3747   } else {
3748     GstBufferListIterator *it;
3749     GstBuffer *group;
3750
3751     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3752
3753     it = gst_buffer_list_iterate (list);
3754
3755     if (gst_buffer_list_iterator_next_group (it)) {
3756       do {
3757         group = gst_buffer_list_iterator_merge_group (it);
3758         if (group == NULL) {
3759           group = gst_buffer_new ();
3760           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3761         } else {
3762           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3763         }
3764         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3765       } while (result == GST_FLOW_OK
3766           && gst_buffer_list_iterator_next_group (it));
3767     } else {
3768       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3769       result =
3770           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3771           gst_buffer_new ());
3772     }
3773     gst_buffer_list_iterator_free (it);
3774     gst_buffer_list_unref (list);
3775   }
3776   return result;
3777 }
3778
3779
3780 static gboolean
3781 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3782 {
3783   gboolean res = TRUE;
3784
3785   /* update our offset if the start/stop position was updated */
3786   if (segment->format == GST_FORMAT_BYTES) {
3787     segment->time = segment->start;
3788   } else if (segment->start == 0) {
3789     /* seek to start, we can implement a default for this. */
3790     segment->time = 0;
3791   } else {
3792     res = FALSE;
3793     GST_INFO_OBJECT (sink, "Can't do a default seek");
3794   }
3795
3796   return res;
3797 }
3798
3799 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3800
3801 static gboolean
3802 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3803     GstEvent * event, GstSegment * segment)
3804 {
3805   /* By default, we try one of 2 things:
3806    *   - For absolute seek positions, convert the requested position to our
3807    *     configured processing format and place it in the output segment \
3808    *   - For relative seek positions, convert our current (input) values to the
3809    *     seek format, adjust by the relative seek offset and then convert back to
3810    *     the processing format
3811    */
3812   GstSeekType cur_type, stop_type;
3813   gint64 cur, stop;
3814   GstSeekFlags flags;
3815   GstFormat seek_format, dest_format;
3816   gdouble rate;
3817   gboolean update;
3818   gboolean res = TRUE;
3819
3820   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3821       &cur_type, &cur, &stop_type, &stop);
3822   dest_format = segment->format;
3823
3824   if (seek_format == dest_format) {
3825     gst_segment_set_seek (segment, rate, seek_format, flags,
3826         cur_type, cur, stop_type, stop, &update);
3827     return TRUE;
3828   }
3829
3830   if (cur_type != GST_SEEK_TYPE_NONE) {
3831     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3832     res =
3833         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3834         &cur);
3835     cur_type = GST_SEEK_TYPE_SET;
3836   }
3837
3838   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3839     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3840     res =
3841         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3842         &stop);
3843     stop_type = GST_SEEK_TYPE_SET;
3844   }
3845
3846   /* And finally, configure our output segment in the desired format */
3847   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3848       stop_type, stop, &update);
3849
3850   if (!res)
3851     goto no_format;
3852
3853   return res;
3854
3855 no_format:
3856   {
3857     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3858     return FALSE;
3859   }
3860 }
3861
3862 /* perform a seek, only executed in pull mode */
3863 static gboolean
3864 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3865 {
3866   gboolean flush;
3867   gdouble rate;
3868   GstFormat seek_format, dest_format;
3869   GstSeekFlags flags;
3870   GstSeekType cur_type, stop_type;
3871   gboolean seekseg_configured = FALSE;
3872   gint64 cur, stop;
3873   gboolean update, res = TRUE;
3874   GstSegment seeksegment;
3875
3876   dest_format = sink->segment.format;
3877
3878   if (event) {
3879     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3880     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3881         &cur_type, &cur, &stop_type, &stop);
3882
3883     flush = flags & GST_SEEK_FLAG_FLUSH;
3884   } else {
3885     GST_DEBUG_OBJECT (sink, "performing seek without event");
3886     flush = FALSE;
3887   }
3888
3889   if (flush) {
3890     GST_DEBUG_OBJECT (sink, "flushing upstream");
3891     gst_pad_push_event (pad, gst_event_new_flush_start ());
3892     gst_base_sink_flush_start (sink, pad);
3893   } else {
3894     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3895   }
3896
3897   GST_PAD_STREAM_LOCK (pad);
3898
3899   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3900    * copy the current segment info into the temp segment that we can actually
3901    * attempt the seek with. We only update the real segment if the seek suceeds. */
3902   if (!seekseg_configured) {
3903     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3904
3905     /* now configure the final seek segment */
3906     if (event) {
3907       if (sink->segment.format != seek_format) {
3908         /* OK, here's where we give the subclass a chance to convert the relative
3909          * seek into an absolute one in the processing format. We set up any
3910          * absolute seek above, before taking the stream lock. */
3911         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3912                 &seeksegment)) {
3913           GST_DEBUG_OBJECT (sink,
3914               "Preparing the seek failed after flushing. " "Aborting seek");
3915           res = FALSE;
3916         }
3917       } else {
3918         /* The seek format matches our processing format, no need to ask the
3919          * the subclass to configure the segment. */
3920         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3921             cur_type, cur, stop_type, stop, &update);
3922       }
3923     }
3924     /* Else, no seek event passed, so we're just (re)starting the
3925        current segment. */
3926   }
3927
3928   if (res) {
3929     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3930         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3931         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3932
3933     /* do the seek, segment.last_stop contains the new position. */
3934     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3935   }
3936
3937
3938   if (flush) {
3939     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3940     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3941     gst_base_sink_flush_stop (sink, pad);
3942   } else if (res && sink->abidata.ABI.running) {
3943     /* we are running the current segment and doing a non-flushing seek,
3944      * close the segment first based on the last_stop. */
3945     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3946         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3947   }
3948
3949   /* The subclass must have converted the segment to the processing format
3950    * by now */
3951   if (res && seeksegment.format != dest_format) {
3952     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3953         "in the correct format. Aborting seek.");
3954     res = FALSE;
3955   }
3956
3957   /* if successfull seek, we update our real segment and push
3958    * out the new segment. */
3959   if (res) {
3960     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3961
3962     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3963       gst_element_post_message (GST_ELEMENT (sink),
3964           gst_message_new_segment_start (GST_OBJECT (sink),
3965               sink->segment.format, sink->segment.last_stop));
3966     }
3967   }
3968
3969   sink->priv->discont = TRUE;
3970   sink->abidata.ABI.running = TRUE;
3971
3972   GST_PAD_STREAM_UNLOCK (pad);
3973
3974   return res;
3975 }
3976
3977 static void
3978 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3979     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3980     gboolean flush, gboolean intermediate)
3981 {
3982   GST_OBJECT_LOCK (sink);
3983   pending->seqnum = seqnum;
3984   pending->format = format;
3985   pending->amount = amount;
3986   pending->position = 0;
3987   pending->rate = rate;
3988   pending->flush = flush;
3989   pending->intermediate = intermediate;
3990   pending->valid = TRUE;
3991   /* flush invalidates the current stepping segment */
3992   if (flush)
3993     current->valid = FALSE;
3994   GST_OBJECT_UNLOCK (sink);
3995 }
3996
3997 static gboolean
3998 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3999 {
4000   GstBaseSinkPrivate *priv;
4001   GstBaseSinkClass *bclass;
4002   gboolean flush, intermediate;
4003   gdouble rate;
4004   GstFormat format;
4005   guint64 amount;
4006   guint seqnum;
4007   GstStepInfo *pending, *current;
4008   GstMessage *message;
4009
4010   bclass = GST_BASE_SINK_GET_CLASS (sink);
4011   priv = sink->priv;
4012
4013   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
4014
4015   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
4016   seqnum = gst_event_get_seqnum (event);
4017
4018   pending = &priv->pending_step;
4019   current = &priv->current_step;
4020
4021   /* post message first */
4022   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4023       amount, rate, flush, intermediate);
4024   gst_message_set_seqnum (message, seqnum);
4025   gst_element_post_message (GST_ELEMENT (sink), message);
4026
4027   if (flush) {
4028     /* we need to call ::unlock before locking PREROLL_LOCK
4029      * since we lock it before going into ::render */
4030     if (bclass->unlock)
4031       bclass->unlock (sink);
4032
4033     GST_PAD_PREROLL_LOCK (sink->sinkpad);
4034     /* now that we have the PREROLL lock, clear our unlock request */
4035     if (bclass->unlock_stop)
4036       bclass->unlock_stop (sink);
4037
4038     /* update the stepinfo and make it valid */
4039     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4040         intermediate);
4041
4042     if (sink->priv->async_enabled) {
4043       /* and we need to commit our state again on the next
4044        * prerolled buffer */
4045       sink->playing_async = TRUE;
4046       priv->pending_step.need_preroll = TRUE;
4047       sink->need_preroll = FALSE;
4048       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4049     } else {
4050       sink->priv->have_latency = TRUE;
4051       sink->need_preroll = FALSE;
4052     }
4053     priv->current_sstart = GST_CLOCK_TIME_NONE;
4054     priv->current_sstop = GST_CLOCK_TIME_NONE;
4055     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4056     priv->call_preroll = TRUE;
4057     gst_base_sink_set_last_buffer (sink, NULL);
4058     gst_base_sink_reset_qos (sink);
4059
4060     if (sink->clock_id) {
4061       gst_clock_id_unschedule (sink->clock_id);
4062     }
4063
4064     if (sink->have_preroll) {
4065       GST_DEBUG_OBJECT (sink, "signal waiter");
4066       priv->step_unlock = TRUE;
4067       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4068     }
4069     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4070   } else {
4071     /* update the stepinfo and make it valid */
4072     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4073         intermediate);
4074   }
4075
4076   return TRUE;
4077 }
4078
4079 /* with STREAM_LOCK
4080  */
4081 static void
4082 gst_base_sink_loop (GstPad * pad)
4083 {
4084   GstBaseSink *basesink;
4085   GstBuffer *buf = NULL;
4086   GstFlowReturn result;
4087   guint blocksize;
4088   guint64 offset;
4089
4090   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4091
4092   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4093
4094   if ((blocksize = basesink->priv->blocksize) == 0)
4095     blocksize = -1;
4096
4097   offset = basesink->segment.last_stop;
4098
4099   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4100       offset, blocksize);
4101
4102   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4103   if (G_UNLIKELY (result != GST_FLOW_OK))
4104     goto paused;
4105
4106   if (G_UNLIKELY (buf == NULL))
4107     goto no_buffer;
4108
4109   offset += GST_BUFFER_SIZE (buf);
4110
4111   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4112
4113   GST_PAD_PREROLL_LOCK (pad);
4114   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4115   GST_PAD_PREROLL_UNLOCK (pad);
4116   if (G_UNLIKELY (result != GST_FLOW_OK))
4117     goto paused;
4118
4119   return;
4120
4121   /* ERRORS */
4122 paused:
4123   {
4124     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4125         gst_flow_get_name (result));
4126     gst_pad_pause_task (pad);
4127     if (result == GST_FLOW_UNEXPECTED) {
4128       /* perform EOS logic */
4129       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4130         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4131             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4132                 basesink->segment.format, basesink->segment.last_stop));
4133       } else {
4134         gst_base_sink_event (pad, gst_event_new_eos ());
4135       }
4136     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4137       /* for fatal errors we post an error message, post the error
4138        * first so the app knows about the error first. 
4139        * wrong-state is not a fatal error because it happens due to
4140        * flushing and posting an error message in that case is the
4141        * wrong thing to do, e.g. when basesrc is doing a flushing
4142        * seek. */
4143       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4144           (_("Internal data stream error.")),
4145           ("stream stopped, reason %s", gst_flow_get_name (result)));
4146       gst_base_sink_event (pad, gst_event_new_eos ());
4147     }
4148     return;
4149   }
4150 no_buffer:
4151   {
4152     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4153     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4154         (_("Internal data flow error.")), ("element returned NULL buffer"));
4155     result = GST_FLOW_ERROR;
4156     goto paused;
4157   }
4158 }
4159
4160 static gboolean
4161 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4162     gboolean flushing)
4163 {
4164   GstBaseSinkClass *bclass;
4165
4166   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4167
4168   if (flushing) {
4169     /* unlock any subclasses, we need to do this before grabbing the
4170      * PREROLL_LOCK since we hold this lock before going into ::render. */
4171     if (bclass->unlock)
4172       bclass->unlock (basesink);
4173   }
4174
4175   GST_PAD_PREROLL_LOCK (pad);
4176   basesink->flushing = flushing;
4177   if (flushing) {
4178     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4179     if (bclass->unlock_stop)
4180       bclass->unlock_stop (basesink);
4181
4182     /* set need_preroll before we unblock the clock. If the clock is unblocked
4183      * before timing out, we can reuse the buffer for preroll. */
4184     basesink->need_preroll = TRUE;
4185
4186     /* step 2, unblock clock sync (if any) or any other blocking thing */
4187     if (basesink->clock_id) {
4188       gst_clock_id_unschedule (basesink->clock_id);
4189     }
4190
4191     /* flush out the data thread if it's locked in finish_preroll, this will
4192      * also flush out the EOS state */
4193     GST_DEBUG_OBJECT (basesink,
4194         "flushing out data thread, need preroll to TRUE");
4195     gst_base_sink_preroll_queue_flush (basesink, pad);
4196   }
4197   GST_PAD_PREROLL_UNLOCK (pad);
4198
4199   return TRUE;
4200 }
4201
4202 static gboolean
4203 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4204 {
4205   gboolean result;
4206
4207   if (active) {
4208     /* start task */
4209     result = gst_pad_start_task (basesink->sinkpad,
4210         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4211   } else {
4212     /* step 2, make sure streaming finishes */
4213     result = gst_pad_stop_task (basesink->sinkpad);
4214   }
4215
4216   return result;
4217 }
4218
4219 static gboolean
4220 gst_base_sink_pad_activate (GstPad * pad)
4221 {
4222   gboolean result = FALSE;
4223   GstBaseSink *basesink;
4224
4225   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4226
4227   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4228
4229   gst_base_sink_set_flushing (basesink, pad, FALSE);
4230
4231   /* we need to have the pull mode enabled */
4232   if (!basesink->can_activate_pull) {
4233     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4234     goto fallback;
4235   }
4236
4237   /* check if downstreams supports pull mode at all */
4238   if (!gst_pad_check_pull_range (pad)) {
4239     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4240     goto fallback;
4241   }
4242
4243   /* set the pad mode before starting the task so that it's in the
4244    * correct state for the new thread. also the sink set_caps and get_caps
4245    * function checks this */
4246   basesink->pad_mode = GST_ACTIVATE_PULL;
4247
4248   /* we first try to negotiate a format so that when we try to activate
4249    * downstream, it knows about our format */
4250   if (!gst_base_sink_negotiate_pull (basesink)) {
4251     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4252     goto fallback;
4253   }
4254
4255   /* ok activate now */
4256   if (!gst_pad_activate_pull (pad, TRUE)) {
4257     /* clear any pending caps */
4258     GST_OBJECT_LOCK (basesink);
4259     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4260     GST_OBJECT_UNLOCK (basesink);
4261     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4262     goto fallback;
4263   }
4264
4265   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4266   result = TRUE;
4267   goto done;
4268
4269   /* push mode fallback */
4270 fallback:
4271   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4272   if ((result = gst_pad_activate_push (pad, TRUE))) {
4273     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4274   }
4275
4276 done:
4277   if (!result) {
4278     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4279     gst_base_sink_set_flushing (basesink, pad, TRUE);
4280   }
4281
4282   gst_object_unref (basesink);
4283
4284   return result;
4285 }
4286
4287 static gboolean
4288 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4289 {
4290   gboolean result;
4291   GstBaseSink *basesink;
4292
4293   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4294
4295   if (active) {
4296     if (!basesink->can_activate_push) {
4297       result = FALSE;
4298       basesink->pad_mode = GST_ACTIVATE_NONE;
4299     } else {
4300       result = TRUE;
4301       basesink->pad_mode = GST_ACTIVATE_PUSH;
4302     }
4303   } else {
4304     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4305       g_warning ("Internal GStreamer activation error!!!");
4306       result = FALSE;
4307     } else {
4308       gst_base_sink_set_flushing (basesink, pad, TRUE);
4309       result = TRUE;
4310       basesink->pad_mode = GST_ACTIVATE_NONE;
4311     }
4312   }
4313
4314   gst_object_unref (basesink);
4315
4316   return result;
4317 }
4318
4319 static gboolean
4320 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4321 {
4322   GstCaps *caps;
4323   gboolean result;
4324
4325   result = FALSE;
4326
4327   /* this returns the intersection between our caps and the peer caps. If there
4328    * is no peer, it returns NULL and we can't operate in pull mode so we can
4329    * fail the negotiation. */
4330   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4331   if (caps == NULL || gst_caps_is_empty (caps))
4332     goto no_caps_possible;
4333
4334   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4335
4336   caps = gst_caps_make_writable (caps);
4337   /* get the first (prefered) format */
4338   gst_caps_truncate (caps);
4339   /* try to fixate */
4340   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4341
4342   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4343
4344   if (gst_caps_is_any (caps)) {
4345     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4346         "allowing pull()");
4347     /* neither side has template caps in this case, so they are prepared for
4348        pull() without setcaps() */
4349     result = TRUE;
4350   } else if (gst_caps_is_fixed (caps)) {
4351     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4352       goto could_not_set_caps;
4353
4354     GST_OBJECT_LOCK (basesink);
4355     gst_caps_replace (&basesink->priv->pull_caps, caps);
4356     GST_OBJECT_UNLOCK (basesink);
4357
4358     result = TRUE;
4359   }
4360
4361   gst_caps_unref (caps);
4362
4363   return result;
4364
4365 no_caps_possible:
4366   {
4367     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4368     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4369     if (caps)
4370       gst_caps_unref (caps);
4371     return FALSE;
4372   }
4373 could_not_set_caps:
4374   {
4375     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4376     gst_caps_unref (caps);
4377     return FALSE;
4378   }
4379 }
4380
4381 /* this won't get called until we implement an activate function */
4382 static gboolean
4383 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4384 {
4385   gboolean result = FALSE;
4386   GstBaseSink *basesink;
4387   GstBaseSinkClass *bclass;
4388
4389   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4390   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4391
4392   if (active) {
4393     GstFormat format;
4394     gint64 duration;
4395
4396     /* we mark we have a newsegment here because pull based
4397      * mode works just fine without having a newsegment before the
4398      * first buffer */
4399     format = GST_FORMAT_BYTES;
4400
4401     gst_segment_init (&basesink->segment, format);
4402     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4403     GST_OBJECT_LOCK (basesink);
4404     basesink->have_newsegment = TRUE;
4405     GST_OBJECT_UNLOCK (basesink);
4406
4407     /* get the peer duration in bytes */
4408     result = gst_pad_query_peer_duration (pad, &format, &duration);
4409     if (result) {
4410       GST_DEBUG_OBJECT (basesink,
4411           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4412       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4413           duration);
4414       gst_segment_set_duration (&basesink->segment, format, duration);
4415     } else {
4416       GST_DEBUG_OBJECT (basesink, "unknown duration");
4417     }
4418
4419     if (bclass->activate_pull)
4420       result = bclass->activate_pull (basesink, TRUE);
4421     else
4422       result = FALSE;
4423
4424     if (!result)
4425       goto activate_failed;
4426
4427   } else {
4428     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4429       g_warning ("Internal GStreamer activation error!!!");
4430       result = FALSE;
4431     } else {
4432       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4433       if (bclass->activate_pull)
4434         result &= bclass->activate_pull (basesink, FALSE);
4435       basesink->pad_mode = GST_ACTIVATE_NONE;
4436       /* clear any pending caps */
4437       GST_OBJECT_LOCK (basesink);
4438       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4439       GST_OBJECT_UNLOCK (basesink);
4440     }
4441   }
4442   gst_object_unref (basesink);
4443
4444   return result;
4445
4446   /* ERRORS */
4447 activate_failed:
4448   {
4449     /* reset, as starting the thread failed */
4450     basesink->pad_mode = GST_ACTIVATE_NONE;
4451
4452     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4453     return FALSE;
4454   }
4455 }
4456
4457 /* send an event to our sinkpad peer. */
4458 static gboolean
4459 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4460 {
4461   GstPad *pad;
4462   GstBaseSink *basesink = GST_BASE_SINK (element);
4463   gboolean forward, result = TRUE;
4464   GstActivateMode mode;
4465
4466   GST_OBJECT_LOCK (element);
4467   /* get the pad and the scheduling mode */
4468   pad = gst_object_ref (basesink->sinkpad);
4469   mode = basesink->pad_mode;
4470   GST_OBJECT_UNLOCK (element);
4471
4472   /* only push UPSTREAM events upstream */
4473   forward = GST_EVENT_IS_UPSTREAM (event);
4474
4475   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4476       event);
4477
4478   switch (GST_EVENT_TYPE (event)) {
4479     case GST_EVENT_LATENCY:
4480     {
4481       GstClockTime latency;
4482
4483       gst_event_parse_latency (event, &latency);
4484
4485       /* store the latency. We use this to adjust the running_time before syncing
4486        * it to the clock. */
4487       GST_OBJECT_LOCK (element);
4488       basesink->priv->latency = latency;
4489       if (!basesink->priv->have_latency)
4490         forward = FALSE;
4491       GST_OBJECT_UNLOCK (element);
4492       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4493           GST_TIME_ARGS (latency));
4494
4495       /* We forward this event so that all elements know about the global pipeline
4496        * latency. This is interesting for an element when it wants to figure out
4497        * when a particular piece of data will be rendered. */
4498       break;
4499     }
4500     case GST_EVENT_SEEK:
4501       /* in pull mode we will execute the seek */
4502       if (mode == GST_ACTIVATE_PULL)
4503         result = gst_base_sink_perform_seek (basesink, pad, event);
4504       break;
4505     case GST_EVENT_STEP:
4506       result = gst_base_sink_perform_step (basesink, pad, event);
4507       forward = FALSE;
4508       break;
4509     default:
4510       break;
4511   }
4512
4513   if (forward) {
4514     result = gst_pad_push_event (pad, event);
4515   } else {
4516     /* not forwarded, unref the event */
4517     gst_event_unref (event);
4518   }
4519
4520   gst_object_unref (pad);
4521   return result;
4522 }
4523
4524 static gboolean
4525 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4526     gint64 * cur, gboolean * upstream)
4527 {
4528   GstClock *clock = NULL;
4529   gboolean res = FALSE;
4530   GstFormat oformat, tformat;
4531   GstSegment *segment;
4532   GstClockTime now, latency;
4533   GstClockTimeDiff base;
4534   gint64 time, accum, duration;
4535   gdouble rate;
4536   gint64 last;
4537   gboolean last_seen, with_clock, in_paused;
4538
4539   GST_OBJECT_LOCK (basesink);
4540   /* we can only get the segment when we are not NULL or READY */
4541   if (!basesink->have_newsegment)
4542     goto wrong_state;
4543
4544   in_paused = FALSE;
4545   /* when not in PLAYING or when we're busy with a state change, we
4546    * cannot read from the clock so we report time based on the
4547    * last seen timestamp. */
4548   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4549       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4550     in_paused = TRUE;
4551   }
4552
4553   /* we don't use the clip segment in pull mode, when seeking we update the
4554    * main segment directly with the new segment values without it having to be
4555    * activated by the rendering after preroll */
4556   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4557     segment = basesink->abidata.ABI.clip_segment;
4558   else
4559     segment = &basesink->segment;
4560
4561   /* our intermediate time format */
4562   tformat = GST_FORMAT_TIME;
4563   /* get the format in the segment */
4564   oformat = segment->format;
4565
4566   /* report with last seen position when EOS */
4567   last_seen = basesink->eos;
4568
4569   /* assume we will use the clock for getting the current position */
4570   with_clock = TRUE;
4571   if (basesink->sync == FALSE)
4572     with_clock = FALSE;
4573
4574   /* and we need a clock */
4575   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4576     with_clock = FALSE;
4577   else
4578     gst_object_ref (clock);
4579
4580   /* collect all data we need holding the lock */
4581   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4582     time = segment->time;
4583   else
4584     time = 0;
4585
4586   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4587     duration = segment->stop - segment->start;
4588   else
4589     duration = 0;
4590
4591   accum = segment->accum;
4592   rate = segment->rate * segment->applied_rate;
4593   latency = basesink->priv->latency;
4594
4595   if (oformat == GST_FORMAT_TIME) {
4596     gint64 start, stop;
4597
4598     start = basesink->priv->current_sstart;
4599     stop = basesink->priv->current_sstop;
4600
4601     if (in_paused) {
4602       /* in paused we use the last position as a lower bound */
4603       if (stop == -1 || segment->rate > 0.0)
4604         last = start;
4605       else
4606         last = stop;
4607     } else {
4608       /* in playing, use last stop time as upper bound */
4609       if (start == -1 || segment->rate > 0.0)
4610         last = stop;
4611       else
4612         last = start;
4613     }
4614   } else {
4615     /* convert last stop to stream time */
4616     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4617   }
4618
4619   if (in_paused) {
4620     /* in paused, use start_time */
4621     base = GST_ELEMENT_START_TIME (basesink);
4622     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4623         GST_TIME_ARGS (base));
4624   } else if (with_clock) {
4625     /* else use clock when needed */
4626     base = GST_ELEMENT_CAST (basesink)->base_time;
4627     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4628         GST_TIME_ARGS (base));
4629   } else {
4630     /* else, no sync or clock -> no base time */
4631     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4632     base = -1;
4633   }
4634
4635   /* no base, we can't calculate running_time, use last seem timestamp to report
4636    * time */
4637   if (base == -1)
4638     last_seen = TRUE;
4639
4640   /* need to release the object lock before we can get the time,
4641    * a clock might take the LOCK of the provider, which could be
4642    * a basesink subclass. */
4643   GST_OBJECT_UNLOCK (basesink);
4644
4645   if (last_seen) {
4646     /* in EOS or when no valid stream_time, report the value of last seen
4647      * timestamp */
4648     if (last == -1) {
4649       /* no timestamp, we need to ask upstream */
4650       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4651       res = FALSE;
4652       *upstream = TRUE;
4653       goto done;
4654     }
4655     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4656         GST_TIME_ARGS (last));
4657     *cur = last;
4658   } else {
4659     if (oformat != tformat) {
4660       /* convert accum, time and duration to time */
4661       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4662               &accum))
4663         goto convert_failed;
4664       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4665               &tformat, &duration))
4666         goto convert_failed;
4667       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4668               &time))
4669         goto convert_failed;
4670       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4671               &last))
4672         goto convert_failed;
4673
4674       /* assume time format from now on */
4675       oformat = tformat;
4676     }
4677
4678     if (!in_paused && with_clock) {
4679       now = gst_clock_get_time (clock);
4680     } else {
4681       now = base;
4682       base = 0;
4683     }
4684
4685     /* subtract base time and accumulated time from the clock time.
4686      * Make sure we don't go negative. This is the current time in
4687      * the segment which we need to scale with the combined
4688      * rate and applied rate. */
4689     base += accum;
4690     base += latency;
4691     if (GST_CLOCK_DIFF (base, now) < 0)
4692       base = now;
4693
4694     /* for negative rates we need to count back from the segment
4695      * duration. */
4696     if (rate < 0.0)
4697       time += duration;
4698
4699     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4700
4701     if (in_paused) {
4702       /* never report less than segment values in paused */
4703       if (last != -1)
4704         *cur = MAX (last, *cur);
4705     } else {
4706       /* never report more than last seen position in playing */
4707       if (last != -1)
4708         *cur = MIN (last, *cur);
4709     }
4710
4711     GST_DEBUG_OBJECT (basesink,
4712         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4713         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4714         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4715         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4716   }
4717
4718   if (oformat != format) {
4719     /* convert to final format */
4720     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4721       goto convert_failed;
4722   }
4723
4724   res = TRUE;
4725
4726 done:
4727   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4728       res, GST_TIME_ARGS (*cur));
4729
4730   if (clock)
4731     gst_object_unref (clock);
4732
4733   return res;
4734
4735   /* special cases */
4736 wrong_state:
4737   {
4738     /* in NULL or READY we always return FALSE and -1 */
4739     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4740     res = FALSE;
4741     *cur = -1;
4742     GST_OBJECT_UNLOCK (basesink);
4743     goto done;
4744   }
4745 convert_failed:
4746   {
4747     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4748     *upstream = TRUE;
4749     res = FALSE;
4750     goto done;
4751   }
4752 }
4753
4754 static gboolean
4755 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4756     gint64 * dur, gboolean * upstream)
4757 {
4758   gboolean res = FALSE;
4759
4760   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4761     GstFormat uformat = GST_FORMAT_BYTES;
4762     gint64 uduration;
4763
4764     /* get the duration in bytes, in pull mode that's all we are sure to
4765      * know. We have to explicitly get this value from upstream instead of
4766      * using our cached value because it might change. Duration caching
4767      * should be done at a higher level. */
4768     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4769     if (res) {
4770       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4771       if (format != uformat) {
4772         /* convert to the requested format */
4773         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4774             &format, dur);
4775       } else {
4776         *dur = uduration;
4777       }
4778     }
4779     *upstream = FALSE;
4780   } else {
4781     *upstream = TRUE;
4782   }
4783
4784   return res;
4785 }
4786
4787 static const GstQueryType *
4788 gst_base_sink_get_query_types (GstElement * element)
4789 {
4790   static const GstQueryType query_types[] = {
4791     GST_QUERY_DURATION,
4792     GST_QUERY_POSITION,
4793     GST_QUERY_SEGMENT,
4794     GST_QUERY_LATENCY,
4795     0
4796   };
4797
4798   return query_types;
4799 }
4800
4801 static gboolean
4802 gst_base_sink_query (GstElement * element, GstQuery * query)
4803 {
4804   gboolean res = FALSE;
4805
4806   GstBaseSink *basesink = GST_BASE_SINK (element);
4807
4808   switch (GST_QUERY_TYPE (query)) {
4809     case GST_QUERY_POSITION:
4810     {
4811       gint64 cur = 0;
4812       GstFormat format;
4813       gboolean upstream = FALSE;
4814
4815       gst_query_parse_position (query, &format, NULL);
4816
4817       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4818           gst_format_get_name (format));
4819
4820       /* first try to get the position based on the clock */
4821       if ((res =
4822               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4823         gst_query_set_position (query, format, cur);
4824       } else if (upstream) {
4825         /* fallback to peer query */
4826         res = gst_pad_peer_query (basesink->sinkpad, query);
4827       }
4828       if (!res) {
4829         /* we can handle a few things if upstream failed */
4830         if (format == GST_FORMAT_PERCENT) {
4831           gint64 dur = 0;
4832           GstFormat uformat = GST_FORMAT_TIME;
4833
4834           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4835               &upstream);
4836           if (!res && upstream) {
4837             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4838                 &cur);
4839           }
4840           if (res) {
4841             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4842                 &upstream);
4843             if (!res && upstream) {
4844               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4845                   &dur);
4846             }
4847           }
4848           if (res) {
4849             gint64 pos;
4850
4851             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4852                 dur);
4853             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4854           }
4855         }
4856       }
4857       break;
4858     }
4859     case GST_QUERY_DURATION:
4860     {
4861       gint64 dur = 0;
4862       GstFormat format;
4863       gboolean upstream = FALSE;
4864
4865       gst_query_parse_duration (query, &format, NULL);
4866
4867       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4868           gst_format_get_name (format));
4869
4870       if ((res =
4871               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4872         gst_query_set_duration (query, format, dur);
4873       } else if (upstream) {
4874         /* fallback to peer query */
4875         res = gst_pad_peer_query (basesink->sinkpad, query);
4876       }
4877       if (!res) {
4878         /* we can handle a few things if upstream failed */
4879         if (format == GST_FORMAT_PERCENT) {
4880           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4881               GST_FORMAT_PERCENT_MAX);
4882           res = TRUE;
4883         }
4884       }
4885       break;
4886     }
4887     case GST_QUERY_LATENCY:
4888     {
4889       gboolean live, us_live;
4890       GstClockTime min, max;
4891
4892       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4893                   &max))) {
4894         gst_query_set_latency (query, live, min, max);
4895       }
4896       break;
4897     }
4898     case GST_QUERY_JITTER:
4899       break;
4900     case GST_QUERY_RATE:
4901       /* gst_query_set_rate (query, basesink->segment_rate); */
4902       res = TRUE;
4903       break;
4904     case GST_QUERY_SEGMENT:
4905     {
4906       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4907         gst_query_set_segment (query, basesink->segment.rate,
4908             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4909         res = TRUE;
4910       } else {
4911         res = gst_pad_peer_query (basesink->sinkpad, query);
4912       }
4913       break;
4914     }
4915     case GST_QUERY_SEEKING:
4916     case GST_QUERY_CONVERT:
4917     case GST_QUERY_FORMATS:
4918     default:
4919       res = gst_pad_peer_query (basesink->sinkpad, query);
4920       break;
4921   }
4922   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4923       GST_QUERY_TYPE_NAME (query), res);
4924   return res;
4925 }
4926
4927 static GstStateChangeReturn
4928 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4929 {
4930   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4931   GstBaseSink *basesink = GST_BASE_SINK (element);
4932   GstBaseSinkClass *bclass;
4933   GstBaseSinkPrivate *priv;
4934
4935   priv = basesink->priv;
4936
4937   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4938
4939   switch (transition) {
4940     case GST_STATE_CHANGE_NULL_TO_READY:
4941       if (bclass->start)
4942         if (!bclass->start (basesink))
4943           goto start_failed;
4944       break;
4945     case GST_STATE_CHANGE_READY_TO_PAUSED:
4946       /* need to complete preroll before this state change completes, there
4947        * is no data flow in READY so we can safely assume we need to preroll. */
4948       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4949       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4950       basesink->have_newsegment = FALSE;
4951       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4952       gst_segment_init (basesink->abidata.ABI.clip_segment,
4953           GST_FORMAT_UNDEFINED);
4954       basesink->offset = 0;
4955       basesink->have_preroll = FALSE;
4956       priv->step_unlock = FALSE;
4957       basesink->need_preroll = TRUE;
4958       basesink->playing_async = TRUE;
4959       priv->current_sstart = GST_CLOCK_TIME_NONE;
4960       priv->current_sstop = GST_CLOCK_TIME_NONE;
4961       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4962       priv->latency = 0;
4963       basesink->eos = FALSE;
4964       priv->received_eos = FALSE;
4965       gst_base_sink_reset_qos (basesink);
4966       priv->commited = FALSE;
4967       priv->call_preroll = TRUE;
4968       priv->current_step.valid = FALSE;
4969       priv->pending_step.valid = FALSE;
4970       if (priv->async_enabled) {
4971         GST_DEBUG_OBJECT (basesink, "doing async state change");
4972         /* when async enabled, post async-start message and return ASYNC from
4973          * the state change function */
4974         ret = GST_STATE_CHANGE_ASYNC;
4975         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4976             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4977       } else {
4978         priv->have_latency = TRUE;
4979       }
4980       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4981       break;
4982     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4983       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4984       if (!gst_base_sink_needs_preroll (basesink)) {
4985         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4986         /* no preroll needed anymore now. */
4987         basesink->playing_async = FALSE;
4988         basesink->need_preroll = FALSE;
4989         if (basesink->eos) {
4990           GstMessage *message;
4991
4992           /* need to post EOS message here */
4993           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4994           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4995           gst_message_set_seqnum (message, basesink->priv->seqnum);
4996           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4997         } else {
4998           GST_DEBUG_OBJECT (basesink, "signal preroll");
4999           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
5000         }
5001       } else {
5002         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
5003         basesink->need_preroll = TRUE;
5004         basesink->playing_async = TRUE;
5005         priv->call_preroll = TRUE;
5006         priv->commited = FALSE;
5007         if (priv->async_enabled) {
5008           GST_DEBUG_OBJECT (basesink, "doing async state change");
5009           ret = GST_STATE_CHANGE_ASYNC;
5010           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5011               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5012         }
5013       }
5014       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5015       break;
5016     default:
5017       break;
5018   }
5019
5020   {
5021     GstStateChangeReturn bret;
5022
5023     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5024     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5025       goto activate_failed;
5026   }
5027
5028   switch (transition) {
5029     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5030       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5031       /* FIXME, make sure we cannot enter _render first */
5032
5033       /* we need to call ::unlock before locking PREROLL_LOCK
5034        * since we lock it before going into ::render */
5035       if (bclass->unlock)
5036         bclass->unlock (basesink);
5037
5038       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5039       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5040       /* now that we have the PREROLL lock, clear our unlock request */
5041       if (bclass->unlock_stop)
5042         bclass->unlock_stop (basesink);
5043
5044       /* we need preroll again and we set the flag before unlocking the clockid
5045        * because if the clockid is unlocked before a current buffer expired, we
5046        * can use that buffer to preroll with */
5047       basesink->need_preroll = TRUE;
5048
5049       if (basesink->clock_id) {
5050         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5051         gst_clock_id_unschedule (basesink->clock_id);
5052       }
5053
5054       /* if we don't have a preroll buffer we need to wait for a preroll and
5055        * return ASYNC. */
5056       if (!gst_base_sink_needs_preroll (basesink)) {
5057         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5058         basesink->playing_async = FALSE;
5059       } else {
5060         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5061           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5062           ret = GST_STATE_CHANGE_SUCCESS;
5063         } else {
5064           GST_DEBUG_OBJECT (basesink,
5065               "PLAYING to PAUSED, we are not prerolled");
5066           basesink->playing_async = TRUE;
5067           priv->commited = FALSE;
5068           priv->call_preroll = TRUE;
5069           if (priv->async_enabled) {
5070             GST_DEBUG_OBJECT (basesink, "doing async state change");
5071             ret = GST_STATE_CHANGE_ASYNC;
5072             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5073                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5074                     FALSE));
5075           }
5076         }
5077       }
5078       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5079           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5080
5081       gst_base_sink_reset_qos (basesink);
5082       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5083       break;
5084     case GST_STATE_CHANGE_PAUSED_TO_READY:
5085       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5086       /* start by reseting our position state with the object lock so that the
5087        * position query gets the right idea. We do this before we post the
5088        * messages so that the message handlers pick this up. */
5089       GST_OBJECT_LOCK (basesink);
5090       basesink->have_newsegment = FALSE;
5091       priv->current_sstart = GST_CLOCK_TIME_NONE;
5092       priv->current_sstop = GST_CLOCK_TIME_NONE;
5093       priv->have_latency = FALSE;
5094       if (priv->cached_clock_id) {
5095         gst_clock_id_unref (priv->cached_clock_id);
5096         priv->cached_clock_id = NULL;
5097       }
5098       GST_OBJECT_UNLOCK (basesink);
5099
5100       gst_base_sink_set_last_buffer (basesink, NULL);
5101       priv->call_preroll = FALSE;
5102
5103       if (!priv->commited) {
5104         if (priv->async_enabled) {
5105           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5106
5107           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5108               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5109                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5110
5111           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5112               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5113         }
5114         priv->commited = TRUE;
5115       } else {
5116         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5117       }
5118       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5119       break;
5120     case GST_STATE_CHANGE_READY_TO_NULL:
5121       if (bclass->stop) {
5122         if (!bclass->stop (basesink)) {
5123           GST_WARNING_OBJECT (basesink, "failed to stop");
5124         }
5125       }
5126       gst_base_sink_set_last_buffer (basesink, NULL);
5127       priv->call_preroll = FALSE;
5128       break;
5129     default:
5130       break;
5131   }
5132
5133   return ret;
5134
5135   /* ERRORS */
5136 start_failed:
5137   {
5138     GST_DEBUG_OBJECT (basesink, "failed to start");
5139     return GST_STATE_CHANGE_FAILURE;
5140   }
5141 activate_failed:
5142   {
5143     GST_DEBUG_OBJECT (basesink,
5144         "element failed to change states -- activation problem?");
5145     return GST_STATE_CHANGE_FAILURE;
5146   }
5147 }