Merge branch 'master' into 0.11-fdo
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199   /* the running time of the previous buffer */
200   GstClockTime prev_rstart;
201
202   /* EOS sync time in running time */
203   GstClockTime eos_rtime;
204
205   /* last buffer that arrived in time, running time */
206   GstClockTime last_render_time;
207   /* when the last buffer left the sink, running time */
208   GstClockTime last_left;
209
210   /* running averages go here these are done on running time */
211   GstClockTime avg_pt;
212   GstClockTime avg_duration;
213   gdouble avg_rate;
214   GstClockTime avg_in_diff;
215
216   /* these are done on system time. avg_jitter and avg_render are
217    * compared to eachother to see if the rendering time takes a
218    * huge amount of the processing, If so we are flooded with
219    * buffers. */
220   GstClockTime last_left_systime;
221   GstClockTime avg_jitter;
222   GstClockTime start, stop;
223   GstClockTime avg_render;
224
225   /* number of rendered and dropped frames */
226   guint64 rendered;
227   guint64 dropped;
228
229   /* latency stuff */
230   GstClockTime latency;
231
232   /* if we already commited the state */
233   gboolean commited;
234
235   /* when we received EOS */
236   gboolean received_eos;
237
238   /* when we are prerolled and able to report latency */
239   gboolean have_latency;
240
241   /* the last buffer we prerolled or rendered. Useful for making snapshots */
242   gint enable_last_buffer;      /* atomic */
243   GstBuffer *last_buffer;
244
245   /* caps for pull based scheduling */
246   GstCaps *pull_caps;
247
248   /* blocksize for pulling */
249   guint blocksize;
250
251   gboolean discont;
252
253   /* seqnum of the stream */
254   guint32 seqnum;
255
256   gboolean call_preroll;
257   gboolean step_unlock;
258
259   /* we have a pending and a current step operation */
260   GstStepInfo current_step;
261   GstStepInfo pending_step;
262
263   /* Cached GstClockID */
264   GstClockID cached_clock_id;
265
266   /* for throttling and QoS */
267   GstClockTime earliest_in_time;
268   GstClockTime throttle_time;
269 };
270
271 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
272
273 /* generic running average, this has a neutral window size */
274 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
275
276 /* the windows for these running averages are experimentally obtained.
277  * possitive values get averaged more while negative values use a small
278  * window so we can react faster to badness. */
279 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
280 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
281
282 enum
283 {
284   _PR_IS_NOTHING = 1 << 0,
285   _PR_IS_BUFFER = 1 << 1,
286   _PR_IS_BUFFERLIST = 1 << 2,
287   _PR_IS_EVENT = 1 << 3
288 } PrivateObjectType;
289
290 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
291 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
292 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
293 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
294
295 /* BaseSink properties */
296
297 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
298 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
299
300 #define DEFAULT_PREROLL_QUEUE_LEN   0
301 #define DEFAULT_SYNC                TRUE
302 #define DEFAULT_MAX_LATENESS        -1
303 #define DEFAULT_QOS                 FALSE
304 #define DEFAULT_ASYNC               TRUE
305 #define DEFAULT_TS_OFFSET           0
306 #define DEFAULT_BLOCKSIZE           4096
307 #define DEFAULT_RENDER_DELAY        0
308 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
309 #define DEFAULT_THROTTLE_TIME       0
310
311 enum
312 {
313   PROP_0,
314   PROP_PREROLL_QUEUE_LEN,
315   PROP_SYNC,
316   PROP_MAX_LATENESS,
317   PROP_QOS,
318   PROP_ASYNC,
319   PROP_TS_OFFSET,
320   PROP_ENABLE_LAST_BUFFER,
321   PROP_LAST_BUFFER,
322   PROP_BLOCKSIZE,
323   PROP_RENDER_DELAY,
324   PROP_THROTTLE_TIME,
325   PROP_LAST
326 };
327
328 static GstElementClass *parent_class = NULL;
329
330 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
331 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
332 static void gst_base_sink_finalize (GObject * object);
333
334 GType
335 gst_base_sink_get_type (void)
336 {
337   static volatile gsize base_sink_type = 0;
338
339   if (g_once_init_enter (&base_sink_type)) {
340     GType _type;
341     static const GTypeInfo base_sink_info = {
342       sizeof (GstBaseSinkClass),
343       NULL,
344       NULL,
345       (GClassInitFunc) gst_base_sink_class_init,
346       NULL,
347       NULL,
348       sizeof (GstBaseSink),
349       0,
350       (GInstanceInitFunc) gst_base_sink_init,
351     };
352
353     _type = g_type_register_static (GST_TYPE_ELEMENT,
354         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
355     g_once_init_leave (&base_sink_type, _type);
356   }
357   return base_sink_type;
358 }
359
360 static void gst_base_sink_set_property (GObject * object, guint prop_id,
361     const GValue * value, GParamSpec * pspec);
362 static void gst_base_sink_get_property (GObject * object, guint prop_id,
363     GValue * value, GParamSpec * pspec);
364
365 static gboolean gst_base_sink_send_event (GstElement * element,
366     GstEvent * event);
367 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
368 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
369
370 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
371 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
372 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
373     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
374 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
375     GstClockTime * start, GstClockTime * end);
376 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
377     GstPad * pad, gboolean flushing);
378 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
379     gboolean active);
380 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
381     GstSegment * segment);
382 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
383     GstEvent * event, GstSegment * segment);
384
385 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
386     GstStateChange transition);
387
388 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
389 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
390     GstBufferList * list);
391
392 static void gst_base_sink_loop (GstPad * pad);
393 static gboolean gst_base_sink_pad_activate (GstPad * pad);
394 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
395 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
396 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
397
398 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
399 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
400 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
401 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
402 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
403     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
404
405
406 /* check if an object was too late */
407 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
408     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
409     GstClockReturn status, GstClockTimeDiff jitter);
410 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
411     guint8 obj_type, GstMiniObject * obj);
412
413 static void
414 gst_base_sink_class_init (GstBaseSinkClass * klass)
415 {
416   GObjectClass *gobject_class;
417   GstElementClass *gstelement_class;
418
419   gobject_class = G_OBJECT_CLASS (klass);
420   gstelement_class = GST_ELEMENT_CLASS (klass);
421
422   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
423       "basesink element");
424
425   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
426
427   parent_class = g_type_class_peek_parent (klass);
428
429   gobject_class->finalize = gst_base_sink_finalize;
430   gobject_class->set_property = gst_base_sink_set_property;
431   gobject_class->get_property = gst_base_sink_get_property;
432
433   /* FIXME, this next value should be configured using an event from the
434    * upstream element, ie, the BUFFER_SIZE event. */
435   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
436       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
437           "Number of buffers to queue during preroll", 0, G_MAXUINT,
438           DEFAULT_PREROLL_QUEUE_LEN,
439           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_SYNC,
442       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
443           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444
445   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
446       g_param_spec_int64 ("max-lateness", "Max Lateness",
447           "Maximum number of nanoseconds that a buffer can be late before it "
448           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_QOS,
452       g_param_spec_boolean ("qos", "Qos",
453           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455   /**
456    * GstBaseSink:async
457    *
458    * If set to #TRUE, the basesink will perform asynchronous state changes.
459    * When set to #FALSE, the sink will not signal the parent when it prerolls.
460    * Use this option when dealing with sparse streams or when synchronisation is
461    * not required.
462    *
463    * Since: 0.10.15
464    */
465   g_object_class_install_property (gobject_class, PROP_ASYNC,
466       g_param_spec_boolean ("async", "Async",
467           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstBaseSink:ts-offset
471    *
472    * Controls the final synchronisation, a negative value will render the buffer
473    * earlier while a positive value delays playback. This property can be
474    * used to fix synchronisation in bad files.
475    *
476    * Since: 0.10.15
477    */
478   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
479       g_param_spec_int64 ("ts-offset", "TS Offset",
480           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
481           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
482
483   /**
484    * GstBaseSink:enable-last-buffer
485    *
486    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
487    * reference to the last buffer arrived and the last-buffer property is always
488    * set to NULL. This can be useful if you need buffers to be released as soon
489    * as possible, eg. if you're using a buffer pool.
490    *
491    * Since: 0.10.30
492    */
493   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
494       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
495           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
496           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
497
498   /**
499    * GstBaseSink:last-buffer
500    *
501    * The last buffer that arrived in the sink and was used for preroll or for
502    * rendering. This property can be used to generate thumbnails. This property
503    * can be NULL when the sink has not yet received a bufer.
504    *
505    * Since: 0.10.15
506    */
507   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
508       g_param_spec_boxed ("last-buffer", "Last Buffer",
509           "The last buffer received in the sink", GST_TYPE_BUFFER,
510           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
511   /**
512    * GstBaseSink:blocksize
513    *
514    * The amount of bytes to pull when operating in pull mode.
515    *
516    * Since: 0.10.22
517    */
518   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
519   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
520       g_param_spec_uint ("blocksize", "Block size",
521           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
522           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
523   /**
524    * GstBaseSink:render-delay
525    *
526    * The additional delay between synchronisation and actual rendering of the
527    * media. This property will add additional latency to the device in order to
528    * make other sinks compensate for the delay.
529    *
530    * Since: 0.10.22
531    */
532   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
533       g_param_spec_uint64 ("render-delay", "Render Delay",
534           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
535           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536   /**
537    * GstBaseSink:throttle-time
538    *
539    * The time to insert between buffers. This property can be used to control
540    * the maximum amount of buffers per second to render. Setting this property
541    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
542    *
543    * Since: 0.10.33
544    */
545   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
546       g_param_spec_uint64 ("throttle-time", "Throttle time",
547           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
548           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   gstelement_class->change_state =
551       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
552   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
553   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
554   gstelement_class->get_query_types =
555       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
556
557   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
558   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
559   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
560   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
561   klass->activate_pull =
562       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
563
564   /* Registering debug symbols for function pointers */
565   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
566   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
571   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
572   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
573   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
575 }
576
577 static GstCaps *
578 gst_base_sink_pad_getcaps (GstPad * pad)
579 {
580   GstBaseSinkClass *bclass;
581   GstBaseSink *bsink;
582   GstCaps *caps = NULL;
583
584   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
585   bclass = GST_BASE_SINK_GET_CLASS (bsink);
586
587   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
588     /* if we are operating in pull mode we only accept the negotiated caps */
589     GST_OBJECT_LOCK (pad);
590     if ((caps = GST_PAD_CAPS (pad)))
591       gst_caps_ref (caps);
592     GST_OBJECT_UNLOCK (pad);
593   }
594   if (caps == NULL) {
595     if (bclass->get_caps)
596       caps = bclass->get_caps (bsink);
597
598     if (caps == NULL) {
599       GstPadTemplate *pad_template;
600
601       pad_template =
602           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
603           "sink");
604       if (pad_template != NULL) {
605         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
606       }
607     }
608   }
609   gst_object_unref (bsink);
610
611   return caps;
612 }
613
614 static gboolean
615 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
616 {
617   GstBaseSinkClass *bclass;
618   GstBaseSink *bsink;
619   gboolean res = TRUE;
620
621   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
622   bclass = GST_BASE_SINK_GET_CLASS (bsink);
623
624   if (res && bclass->set_caps)
625     res = bclass->set_caps (bsink, caps);
626
627   gst_object_unref (bsink);
628
629   return res;
630 }
631
632 static void
633 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
634 {
635   GstBaseSinkClass *bclass;
636   GstBaseSink *bsink;
637
638   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
639   bclass = GST_BASE_SINK_GET_CLASS (bsink);
640
641   if (bclass->fixate)
642     bclass->fixate (bsink, caps);
643
644   gst_object_unref (bsink);
645 }
646
647 static GstFlowReturn
648 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
649     GstCaps * caps, GstBuffer ** buf)
650 {
651   GstBaseSinkClass *bclass;
652   GstBaseSink *bsink;
653   GstFlowReturn result = GST_FLOW_OK;
654
655   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
656   bclass = GST_BASE_SINK_GET_CLASS (bsink);
657
658   if (bclass->buffer_alloc)
659     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
660   else
661     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
662
663   gst_object_unref (bsink);
664
665   return result;
666 }
667
668 static void
669 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
670 {
671   GstPadTemplate *pad_template;
672   GstBaseSinkPrivate *priv;
673
674   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
675
676   pad_template =
677       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
678   g_return_if_fail (pad_template != NULL);
679
680   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
681
682   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
683   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
684   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
685   gst_pad_set_bufferalloc_function (basesink->sinkpad,
686       gst_base_sink_pad_buffer_alloc);
687   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
688   gst_pad_set_activatepush_function (basesink->sinkpad,
689       gst_base_sink_pad_activate_push);
690   gst_pad_set_activatepull_function (basesink->sinkpad,
691       gst_base_sink_pad_activate_pull);
692   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
693   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
694   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
695   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
696
697   basesink->pad_mode = GST_ACTIVATE_NONE;
698   basesink->preroll_lock = g_mutex_new ();
699   basesink->preroll_cond = g_cond_new ();
700   basesink->preroll_queue = g_queue_new ();
701   basesink->clip_segment = gst_segment_new ();
702   priv->have_latency = FALSE;
703
704   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
705   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
706
707   basesink->sync = DEFAULT_SYNC;
708   basesink->max_lateness = DEFAULT_MAX_LATENESS;
709   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
710   priv->async_enabled = DEFAULT_ASYNC;
711   priv->ts_offset = DEFAULT_TS_OFFSET;
712   priv->render_delay = DEFAULT_RENDER_DELAY;
713   priv->blocksize = DEFAULT_BLOCKSIZE;
714   priv->cached_clock_id = NULL;
715   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
716   priv->throttle_time = DEFAULT_THROTTLE_TIME;
717
718   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
719 }
720
721 static void
722 gst_base_sink_finalize (GObject * object)
723 {
724   GstBaseSink *basesink;
725
726   basesink = GST_BASE_SINK (object);
727
728   g_mutex_free (basesink->preroll_lock);
729   g_cond_free (basesink->preroll_cond);
730   g_queue_free (basesink->preroll_queue);
731   gst_segment_free (basesink->clip_segment);
732
733   G_OBJECT_CLASS (parent_class)->finalize (object);
734 }
735
736 /**
737  * gst_base_sink_set_sync:
738  * @sink: the sink
739  * @sync: the new sync value.
740  *
741  * Configures @sink to synchronize on the clock or not. When
742  * @sync is FALSE, incomming samples will be played as fast as
743  * possible. If @sync is TRUE, the timestamps of the incomming
744  * buffers will be used to schedule the exact render time of its
745  * contents.
746  *
747  * Since: 0.10.4
748  */
749 void
750 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
751 {
752   g_return_if_fail (GST_IS_BASE_SINK (sink));
753
754   GST_OBJECT_LOCK (sink);
755   sink->sync = sync;
756   GST_OBJECT_UNLOCK (sink);
757 }
758
759 /**
760  * gst_base_sink_get_sync:
761  * @sink: the sink
762  *
763  * Checks if @sink is currently configured to synchronize against the
764  * clock.
765  *
766  * Returns: TRUE if the sink is configured to synchronize against the clock.
767  *
768  * Since: 0.10.4
769  */
770 gboolean
771 gst_base_sink_get_sync (GstBaseSink * sink)
772 {
773   gboolean res;
774
775   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
776
777   GST_OBJECT_LOCK (sink);
778   res = sink->sync;
779   GST_OBJECT_UNLOCK (sink);
780
781   return res;
782 }
783
784 /**
785  * gst_base_sink_set_max_lateness:
786  * @sink: the sink
787  * @max_lateness: the new max lateness value.
788  *
789  * Sets the new max lateness value to @max_lateness. This value is
790  * used to decide if a buffer should be dropped or not based on the
791  * buffer timestamp and the current clock time. A value of -1 means
792  * an unlimited time.
793  *
794  * Since: 0.10.4
795  */
796 void
797 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
798 {
799   g_return_if_fail (GST_IS_BASE_SINK (sink));
800
801   GST_OBJECT_LOCK (sink);
802   sink->max_lateness = max_lateness;
803   GST_OBJECT_UNLOCK (sink);
804 }
805
806 /**
807  * gst_base_sink_get_max_lateness:
808  * @sink: the sink
809  *
810  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
811  * more details.
812  *
813  * Returns: The maximum time in nanoseconds that a buffer can be late
814  * before it is dropped and not rendered. A value of -1 means an
815  * unlimited time.
816  *
817  * Since: 0.10.4
818  */
819 gint64
820 gst_base_sink_get_max_lateness (GstBaseSink * sink)
821 {
822   gint64 res;
823
824   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
825
826   GST_OBJECT_LOCK (sink);
827   res = sink->max_lateness;
828   GST_OBJECT_UNLOCK (sink);
829
830   return res;
831 }
832
833 /**
834  * gst_base_sink_set_qos_enabled:
835  * @sink: the sink
836  * @enabled: the new qos value.
837  *
838  * Configures @sink to send Quality-of-Service events upstream.
839  *
840  * Since: 0.10.5
841  */
842 void
843 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
844 {
845   g_return_if_fail (GST_IS_BASE_SINK (sink));
846
847   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
848 }
849
850 /**
851  * gst_base_sink_is_qos_enabled:
852  * @sink: the sink
853  *
854  * Checks if @sink is currently configured to send Quality-of-Service events
855  * upstream.
856  *
857  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
858  *
859  * Since: 0.10.5
860  */
861 gboolean
862 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
863 {
864   gboolean res;
865
866   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
867
868   res = g_atomic_int_get (&sink->priv->qos_enabled);
869
870   return res;
871 }
872
873 /**
874  * gst_base_sink_set_async_enabled:
875  * @sink: the sink
876  * @enabled: the new async value.
877  *
878  * Configures @sink to perform all state changes asynchronusly. When async is
879  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
880  * preroll buffer. This feature is usefull if the sink does not synchronize
881  * against the clock or when it is dealing with sparse streams.
882  *
883  * Since: 0.10.15
884  */
885 void
886 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
887 {
888   g_return_if_fail (GST_IS_BASE_SINK (sink));
889
890   GST_BASE_SINK_PREROLL_LOCK (sink);
891   g_atomic_int_set (&sink->priv->async_enabled, enabled);
892   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
893   GST_BASE_SINK_PREROLL_UNLOCK (sink);
894 }
895
896 /**
897  * gst_base_sink_is_async_enabled:
898  * @sink: the sink
899  *
900  * Checks if @sink is currently configured to perform asynchronous state
901  * changes to PAUSED.
902  *
903  * Returns: TRUE if the sink is configured to perform asynchronous state
904  * changes.
905  *
906  * Since: 0.10.15
907  */
908 gboolean
909 gst_base_sink_is_async_enabled (GstBaseSink * sink)
910 {
911   gboolean res;
912
913   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
914
915   res = g_atomic_int_get (&sink->priv->async_enabled);
916
917   return res;
918 }
919
920 /**
921  * gst_base_sink_set_ts_offset:
922  * @sink: the sink
923  * @offset: the new offset
924  *
925  * Adjust the synchronisation of @sink with @offset. A negative value will
926  * render buffers earlier than their timestamp. A positive value will delay
927  * rendering. This function can be used to fix playback of badly timestamped
928  * buffers.
929  *
930  * Since: 0.10.15
931  */
932 void
933 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
934 {
935   g_return_if_fail (GST_IS_BASE_SINK (sink));
936
937   GST_OBJECT_LOCK (sink);
938   sink->priv->ts_offset = offset;
939   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
940   GST_OBJECT_UNLOCK (sink);
941 }
942
943 /**
944  * gst_base_sink_get_ts_offset:
945  * @sink: the sink
946  *
947  * Get the synchronisation offset of @sink.
948  *
949  * Returns: The synchronisation offset.
950  *
951  * Since: 0.10.15
952  */
953 GstClockTimeDiff
954 gst_base_sink_get_ts_offset (GstBaseSink * sink)
955 {
956   GstClockTimeDiff res;
957
958   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
959
960   GST_OBJECT_LOCK (sink);
961   res = sink->priv->ts_offset;
962   GST_OBJECT_UNLOCK (sink);
963
964   return res;
965 }
966
967 /**
968  * gst_base_sink_get_last_buffer:
969  * @sink: the sink
970  *
971  * Get the last buffer that arrived in the sink and was used for preroll or for
972  * rendering. This property can be used to generate thumbnails.
973  *
974  * The #GstCaps on the buffer can be used to determine the type of the buffer.
975  *
976  * Free-function: gst_buffer_unref
977  *
978  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
979  *     This function returns NULL when no buffer has arrived in the sink yet
980  *     or when the sink is not in PAUSED or PLAYING.
981  *
982  * Since: 0.10.15
983  */
984 GstBuffer *
985 gst_base_sink_get_last_buffer (GstBaseSink * sink)
986 {
987   GstBuffer *res;
988
989   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
990
991   GST_OBJECT_LOCK (sink);
992   if ((res = sink->priv->last_buffer))
993     gst_buffer_ref (res);
994   GST_OBJECT_UNLOCK (sink);
995
996   return res;
997 }
998
999 /* with OBJECT_LOCK */
1000 static void
1001 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1002 {
1003   GstBuffer *old;
1004
1005   old = sink->priv->last_buffer;
1006   if (G_LIKELY (old != buffer)) {
1007     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1008     if (G_LIKELY (buffer))
1009       gst_buffer_ref (buffer);
1010     sink->priv->last_buffer = buffer;
1011   } else {
1012     old = NULL;
1013   }
1014   /* avoid unreffing with the lock because cleanup code might want to take the
1015    * lock too */
1016   if (G_LIKELY (old)) {
1017     GST_OBJECT_UNLOCK (sink);
1018     gst_buffer_unref (old);
1019     GST_OBJECT_LOCK (sink);
1020   }
1021 }
1022
1023 static void
1024 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1025 {
1026   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1027     return;
1028
1029   GST_OBJECT_LOCK (sink);
1030   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1031   GST_OBJECT_UNLOCK (sink);
1032 }
1033
1034 /**
1035  * gst_base_sink_set_last_buffer_enabled:
1036  * @sink: the sink
1037  * @enabled: the new enable-last-buffer value.
1038  *
1039  * Configures @sink to store the last received buffer in the last-buffer
1040  * property.
1041  *
1042  * Since: 0.10.30
1043  */
1044 void
1045 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1046 {
1047   g_return_if_fail (GST_IS_BASE_SINK (sink));
1048
1049   /* Only take lock if we change the value */
1050   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1051           !enabled, enabled) && !enabled) {
1052     GST_OBJECT_LOCK (sink);
1053     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1054     GST_OBJECT_UNLOCK (sink);
1055   }
1056 }
1057
1058 /**
1059  * gst_base_sink_is_last_buffer_enabled:
1060  * @sink: the sink
1061  *
1062  * Checks if @sink is currently configured to store the last received buffer in
1063  * the last-buffer property.
1064  *
1065  * Returns: TRUE if the sink is configured to store the last received buffer.
1066  *
1067  * Since: 0.10.30
1068  */
1069 gboolean
1070 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1071 {
1072   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1073
1074   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1075 }
1076
1077 /**
1078  * gst_base_sink_get_latency:
1079  * @sink: the sink
1080  *
1081  * Get the currently configured latency.
1082  *
1083  * Returns: The configured latency.
1084  *
1085  * Since: 0.10.12
1086  */
1087 GstClockTime
1088 gst_base_sink_get_latency (GstBaseSink * sink)
1089 {
1090   GstClockTime res;
1091
1092   GST_OBJECT_LOCK (sink);
1093   res = sink->priv->latency;
1094   GST_OBJECT_UNLOCK (sink);
1095
1096   return res;
1097 }
1098
1099 /**
1100  * gst_base_sink_query_latency:
1101  * @sink: the sink
1102  * @live: (out) (allow-none): if the sink is live
1103  * @upstream_live: (out) (allow-none): if an upstream element is live
1104  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1105  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1106  *
1107  * Query the sink for the latency parameters. The latency will be queried from
1108  * the upstream elements. @live will be TRUE if @sink is configured to
1109  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1110  * element is live.
1111  *
1112  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1113  * for the latency introduced by the upstream elements by setting the
1114  * @min_latency to a strictly possitive value.
1115  *
1116  * This function is mostly used by subclasses.
1117  *
1118  * Returns: TRUE if the query succeeded.
1119  *
1120  * Since: 0.10.12
1121  */
1122 gboolean
1123 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1124     gboolean * upstream_live, GstClockTime * min_latency,
1125     GstClockTime * max_latency)
1126 {
1127   gboolean l, us_live, res, have_latency;
1128   GstClockTime min, max, render_delay;
1129   GstQuery *query;
1130   GstClockTime us_min, us_max;
1131
1132   /* we are live when we sync to the clock */
1133   GST_OBJECT_LOCK (sink);
1134   l = sink->sync;
1135   have_latency = sink->priv->have_latency;
1136   render_delay = sink->priv->render_delay;
1137   GST_OBJECT_UNLOCK (sink);
1138
1139   /* assume no latency */
1140   min = 0;
1141   max = -1;
1142   us_live = FALSE;
1143
1144   if (have_latency) {
1145     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1146     /* we are ready for a latency query this is when we preroll or when we are
1147      * not async. */
1148     query = gst_query_new_latency ();
1149
1150     /* ask the peer for the latency */
1151     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1152       /* get upstream min and max latency */
1153       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1154
1155       if (us_live) {
1156         /* upstream live, use its latency, subclasses should use these
1157          * values to create the complete latency. */
1158         min = us_min;
1159         max = us_max;
1160       }
1161       if (l) {
1162         /* we need to add the render delay if we are live */
1163         if (min != -1)
1164           min += render_delay;
1165         if (max != -1)
1166           max += render_delay;
1167       }
1168     }
1169     gst_query_unref (query);
1170   } else {
1171     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1172     res = FALSE;
1173   }
1174
1175   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1176   if (!res) {
1177     if (!l) {
1178       res = TRUE;
1179       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1180     } else {
1181       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1182     }
1183   }
1184
1185   if (res) {
1186     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1187         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1188         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1189
1190     if (live)
1191       *live = l;
1192     if (upstream_live)
1193       *upstream_live = us_live;
1194     if (min_latency)
1195       *min_latency = min;
1196     if (max_latency)
1197       *max_latency = max;
1198   }
1199   return res;
1200 }
1201
1202 /**
1203  * gst_base_sink_set_render_delay:
1204  * @sink: a #GstBaseSink
1205  * @delay: the new delay
1206  *
1207  * Set the render delay in @sink to @delay. The render delay is the time
1208  * between actual rendering of a buffer and its synchronisation time. Some
1209  * devices might delay media rendering which can be compensated for with this
1210  * function.
1211  *
1212  * After calling this function, this sink will report additional latency and
1213  * other sinks will adjust their latency to delay the rendering of their media.
1214  *
1215  * This function is usually called by subclasses.
1216  *
1217  * Since: 0.10.21
1218  */
1219 void
1220 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1221 {
1222   GstClockTime old_render_delay;
1223
1224   g_return_if_fail (GST_IS_BASE_SINK (sink));
1225
1226   GST_OBJECT_LOCK (sink);
1227   old_render_delay = sink->priv->render_delay;
1228   sink->priv->render_delay = delay;
1229   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1230       GST_TIME_ARGS (delay));
1231   GST_OBJECT_UNLOCK (sink);
1232
1233   if (delay != old_render_delay) {
1234     GST_DEBUG_OBJECT (sink, "posting latency changed");
1235     gst_element_post_message (GST_ELEMENT_CAST (sink),
1236         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1237   }
1238 }
1239
1240 /**
1241  * gst_base_sink_get_render_delay:
1242  * @sink: a #GstBaseSink
1243  *
1244  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1245  * information about the render delay.
1246  *
1247  * Returns: the render delay of @sink.
1248  *
1249  * Since: 0.10.21
1250  */
1251 GstClockTime
1252 gst_base_sink_get_render_delay (GstBaseSink * sink)
1253 {
1254   GstClockTimeDiff res;
1255
1256   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1257
1258   GST_OBJECT_LOCK (sink);
1259   res = sink->priv->render_delay;
1260   GST_OBJECT_UNLOCK (sink);
1261
1262   return res;
1263 }
1264
1265 /**
1266  * gst_base_sink_set_blocksize:
1267  * @sink: a #GstBaseSink
1268  * @blocksize: the blocksize in bytes
1269  *
1270  * Set the number of bytes that the sink will pull when it is operating in pull
1271  * mode.
1272  *
1273  * Since: 0.10.22
1274  */
1275 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1276 void
1277 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1278 {
1279   g_return_if_fail (GST_IS_BASE_SINK (sink));
1280
1281   GST_OBJECT_LOCK (sink);
1282   sink->priv->blocksize = blocksize;
1283   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1284   GST_OBJECT_UNLOCK (sink);
1285 }
1286
1287 /**
1288  * gst_base_sink_get_blocksize:
1289  * @sink: a #GstBaseSink
1290  *
1291  * Get the number of bytes that the sink will pull when it is operating in pull
1292  * mode.
1293  *
1294  * Returns: the number of bytes @sink will pull in pull mode.
1295  *
1296  * Since: 0.10.22
1297  */
1298 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1299 guint
1300 gst_base_sink_get_blocksize (GstBaseSink * sink)
1301 {
1302   guint res;
1303
1304   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1305
1306   GST_OBJECT_LOCK (sink);
1307   res = sink->priv->blocksize;
1308   GST_OBJECT_UNLOCK (sink);
1309
1310   return res;
1311 }
1312
1313 /**
1314  * gst_base_sink_set_throttle_time:
1315  * @sink: a #GstBaseSink
1316  * @throttle: the throttle time in nanoseconds
1317  *
1318  * Set the time that will be inserted between rendered buffers. This
1319  * can be used to control the maximum buffers per second that the sink
1320  * will render. 
1321  *
1322  * Since: 0.10.33
1323  */
1324 void
1325 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1326 {
1327   g_return_if_fail (GST_IS_BASE_SINK (sink));
1328
1329   GST_OBJECT_LOCK (sink);
1330   sink->priv->throttle_time = throttle;
1331   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1332   GST_OBJECT_UNLOCK (sink);
1333 }
1334
1335 /**
1336  * gst_base_sink_get_throttle_time:
1337  * @sink: a #GstBaseSink
1338  *
1339  * Get the time that will be inserted between frames to control the 
1340  * maximum buffers per second.
1341  *
1342  * Returns: the number of nanoseconds @sink will put between frames.
1343  *
1344  * Since: 0.10.33
1345  */
1346 guint64
1347 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1348 {
1349   guint64 res;
1350
1351   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1352
1353   GST_OBJECT_LOCK (sink);
1354   res = sink->priv->throttle_time;
1355   GST_OBJECT_UNLOCK (sink);
1356
1357   return res;
1358 }
1359
1360 static void
1361 gst_base_sink_set_property (GObject * object, guint prop_id,
1362     const GValue * value, GParamSpec * pspec)
1363 {
1364   GstBaseSink *sink = GST_BASE_SINK (object);
1365
1366   switch (prop_id) {
1367     case PROP_PREROLL_QUEUE_LEN:
1368       /* preroll lock necessary to serialize with finish_preroll */
1369       GST_BASE_SINK_PREROLL_LOCK (sink);
1370       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1371       GST_BASE_SINK_PREROLL_UNLOCK (sink);
1372       break;
1373     case PROP_SYNC:
1374       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1375       break;
1376     case PROP_MAX_LATENESS:
1377       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1378       break;
1379     case PROP_QOS:
1380       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1381       break;
1382     case PROP_ASYNC:
1383       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1384       break;
1385     case PROP_TS_OFFSET:
1386       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1387       break;
1388     case PROP_BLOCKSIZE:
1389       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1390       break;
1391     case PROP_RENDER_DELAY:
1392       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1393       break;
1394     case PROP_ENABLE_LAST_BUFFER:
1395       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1396       break;
1397     case PROP_THROTTLE_TIME:
1398       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1399       break;
1400     default:
1401       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1402       break;
1403   }
1404 }
1405
1406 static void
1407 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1408     GParamSpec * pspec)
1409 {
1410   GstBaseSink *sink = GST_BASE_SINK (object);
1411
1412   switch (prop_id) {
1413     case PROP_PREROLL_QUEUE_LEN:
1414       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1415       break;
1416     case PROP_SYNC:
1417       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1418       break;
1419     case PROP_MAX_LATENESS:
1420       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1421       break;
1422     case PROP_QOS:
1423       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1424       break;
1425     case PROP_ASYNC:
1426       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1427       break;
1428     case PROP_TS_OFFSET:
1429       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1430       break;
1431     case PROP_LAST_BUFFER:
1432       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1433       break;
1434     case PROP_ENABLE_LAST_BUFFER:
1435       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1436       break;
1437     case PROP_BLOCKSIZE:
1438       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1439       break;
1440     case PROP_RENDER_DELAY:
1441       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1442       break;
1443     case PROP_THROTTLE_TIME:
1444       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1445       break;
1446     default:
1447       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1448       break;
1449   }
1450 }
1451
1452
1453 static GstCaps *
1454 gst_base_sink_get_caps (GstBaseSink * sink)
1455 {
1456   return NULL;
1457 }
1458
1459 static gboolean
1460 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1461 {
1462   return TRUE;
1463 }
1464
1465 static GstFlowReturn
1466 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1467     GstCaps * caps, GstBuffer ** buf)
1468 {
1469   *buf = NULL;
1470   return GST_FLOW_OK;
1471 }
1472
1473 /* with PREROLL_LOCK, STREAM_LOCK */
1474 static void
1475 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1476 {
1477   GstMiniObject *obj;
1478
1479   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1480   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1481     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1482     gst_mini_object_unref (obj);
1483   }
1484   /* we can't have EOS anymore now */
1485   basesink->eos = FALSE;
1486   basesink->priv->received_eos = FALSE;
1487   basesink->have_preroll = FALSE;
1488   basesink->priv->step_unlock = FALSE;
1489   basesink->eos_queued = FALSE;
1490   basesink->preroll_queued = 0;
1491   basesink->buffers_queued = 0;
1492   basesink->events_queued = 0;
1493   /* can't report latency anymore until we preroll again */
1494   if (basesink->priv->async_enabled) {
1495     GST_OBJECT_LOCK (basesink);
1496     basesink->priv->have_latency = FALSE;
1497     GST_OBJECT_UNLOCK (basesink);
1498   }
1499   /* and signal any waiters now */
1500   GST_BASE_SINK_PREROLL_SIGNAL (basesink);
1501 }
1502
1503 /* with STREAM_LOCK, configures given segment with the event information. */
1504 static void
1505 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1506     GstEvent * event, GstSegment * segment)
1507 {
1508   gboolean update;
1509   gdouble rate, arate;
1510   GstFormat format;
1511   gint64 start;
1512   gint64 stop;
1513   gint64 time;
1514
1515   /* the newsegment event is needed to bring the buffer timestamps to the
1516    * stream time and to drop samples outside of the playback segment. */
1517   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1518       &start, &stop, &time);
1519
1520   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1521    * We protect with the OBJECT_LOCK so that we can use the values to
1522    * safely answer a POSITION query. */
1523   GST_OBJECT_LOCK (basesink);
1524   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1525       stop, time);
1526
1527   if (format == GST_FORMAT_TIME) {
1528     GST_DEBUG_OBJECT (basesink,
1529         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1530         "format GST_FORMAT_TIME, "
1531         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1532         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1533         update, rate, arate, GST_TIME_ARGS (segment->start),
1534         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1535         GST_TIME_ARGS (segment->accum));
1536   } else {
1537     GST_DEBUG_OBJECT (basesink,
1538         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1539         "format %d, "
1540         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1541         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1542         segment->format, segment->start, segment->stop, segment->time,
1543         segment->accum);
1544   }
1545   GST_OBJECT_UNLOCK (basesink);
1546 }
1547
1548 /* with PREROLL_LOCK, STREAM_LOCK */
1549 static gboolean
1550 gst_base_sink_commit_state (GstBaseSink * basesink)
1551 {
1552   /* commit state and proceed to next pending state */
1553   GstState current, next, pending, post_pending;
1554   gboolean post_paused = FALSE;
1555   gboolean post_async_done = FALSE;
1556   gboolean post_playing = FALSE;
1557
1558   /* we are certainly not playing async anymore now */
1559   basesink->playing_async = FALSE;
1560
1561   GST_OBJECT_LOCK (basesink);
1562   current = GST_STATE (basesink);
1563   next = GST_STATE_NEXT (basesink);
1564   pending = GST_STATE_PENDING (basesink);
1565   post_pending = pending;
1566
1567   switch (pending) {
1568     case GST_STATE_PLAYING:
1569     {
1570       GstBaseSinkClass *bclass;
1571
1572       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1573
1574       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1575
1576       basesink->need_preroll = FALSE;
1577       post_async_done = TRUE;
1578       basesink->priv->commited = TRUE;
1579       post_playing = TRUE;
1580       /* post PAUSED too when we were READY */
1581       if (current == GST_STATE_READY) {
1582         post_paused = TRUE;
1583       }
1584       break;
1585     }
1586     case GST_STATE_PAUSED:
1587       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1588       post_paused = TRUE;
1589       post_async_done = TRUE;
1590       basesink->priv->commited = TRUE;
1591       post_pending = GST_STATE_VOID_PENDING;
1592       break;
1593     case GST_STATE_READY:
1594     case GST_STATE_NULL:
1595       goto stopping;
1596     case GST_STATE_VOID_PENDING:
1597       goto nothing_pending;
1598     default:
1599       break;
1600   }
1601
1602   /* we can report latency queries now */
1603   basesink->priv->have_latency = TRUE;
1604
1605   GST_STATE (basesink) = pending;
1606   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1607   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1608   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1609   GST_OBJECT_UNLOCK (basesink);
1610
1611   if (post_paused) {
1612     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1613     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1614         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1615             current, next, post_pending));
1616   }
1617   if (post_async_done) {
1618     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1619     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1620         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1621   }
1622   if (post_playing) {
1623     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1624     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1625         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1626             next, pending, GST_STATE_VOID_PENDING));
1627   }
1628
1629   GST_STATE_BROADCAST (basesink);
1630
1631   return TRUE;
1632
1633 nothing_pending:
1634   {
1635     /* Depending on the state, set our vars. We get in this situation when the
1636      * state change function got a change to update the state vars before the
1637      * streaming thread did. This is fine but we need to make sure that we
1638      * update the need_preroll var since it was TRUE when we got here and might
1639      * become FALSE if we got to PLAYING. */
1640     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1641         gst_element_state_get_name (current));
1642     switch (current) {
1643       case GST_STATE_PLAYING:
1644         basesink->need_preroll = FALSE;
1645         break;
1646       case GST_STATE_PAUSED:
1647         basesink->need_preroll = TRUE;
1648         break;
1649       default:
1650         basesink->need_preroll = FALSE;
1651         basesink->flushing = TRUE;
1652         break;
1653     }
1654     /* we can report latency queries now */
1655     basesink->priv->have_latency = TRUE;
1656     GST_OBJECT_UNLOCK (basesink);
1657     return TRUE;
1658   }
1659 stopping:
1660   {
1661     /* app is going to READY */
1662     GST_DEBUG_OBJECT (basesink, "stopping");
1663     basesink->need_preroll = FALSE;
1664     basesink->flushing = TRUE;
1665     GST_OBJECT_UNLOCK (basesink);
1666     return FALSE;
1667   }
1668 }
1669
1670 static void
1671 start_stepping (GstBaseSink * sink, GstSegment * segment,
1672     GstStepInfo * pending, GstStepInfo * current)
1673 {
1674   gint64 end;
1675   GstMessage *message;
1676
1677   GST_DEBUG_OBJECT (sink, "update pending step");
1678
1679   GST_OBJECT_LOCK (sink);
1680   memcpy (current, pending, sizeof (GstStepInfo));
1681   pending->valid = FALSE;
1682   GST_OBJECT_UNLOCK (sink);
1683
1684   /* post message first */
1685   message =
1686       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1687       current->amount, current->rate, current->flush, current->intermediate);
1688   gst_message_set_seqnum (message, current->seqnum);
1689   gst_element_post_message (GST_ELEMENT (sink), message);
1690
1691   /* get the running time of where we paused and remember it */
1692   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1693   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1694
1695   /* set the new rate for the remainder of the segment */
1696   current->start_rate = segment->rate;
1697   segment->rate *= current->rate;
1698   segment->abs_rate = ABS (segment->rate);
1699
1700   /* save values */
1701   if (segment->rate > 0.0)
1702     current->start_stop = segment->stop;
1703   else
1704     current->start_start = segment->start;
1705
1706   if (current->format == GST_FORMAT_TIME) {
1707     end = current->start + current->amount;
1708     if (!current->flush) {
1709       /* update the segment clipping regions for non-flushing seeks */
1710       if (segment->rate > 0.0) {
1711         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1712         segment->last_stop = segment->stop;
1713       } else {
1714         gint64 position;
1715
1716         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1717         segment->time = position;
1718         segment->start = position;
1719         segment->last_stop = position;
1720       }
1721     }
1722   }
1723
1724   GST_DEBUG_OBJECT (sink,
1725       "segment now rate %lf, applied rate %lf, "
1726       "format GST_FORMAT_TIME, "
1727       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1728       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1729       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1730       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1731       GST_TIME_ARGS (segment->accum));
1732
1733   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1734       GST_TIME_ARGS (current->start));
1735
1736   if (current->amount == -1) {
1737     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1738     current->valid = FALSE;
1739   } else {
1740     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1741         "rate: %f", current->amount, gst_format_get_name (current->format),
1742         current->rate);
1743   }
1744 }
1745
1746 static void
1747 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1748     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1749 {
1750   gint64 stop, position;
1751   GstMessage *message;
1752
1753   GST_DEBUG_OBJECT (sink, "step complete");
1754
1755   if (segment->rate > 0.0)
1756     stop = rstart;
1757   else
1758     stop = rstop;
1759
1760   GST_DEBUG_OBJECT (sink,
1761       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1762
1763   if (stop == -1)
1764     current->duration = current->position;
1765   else
1766     current->duration = stop - current->start;
1767
1768   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1769       GST_TIME_ARGS (current->duration));
1770
1771   position = current->start + current->duration;
1772
1773   /* now move the segment to the new running time */
1774   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1775
1776   if (current->flush) {
1777     /* and remove the accumulated time we flushed, start time did not change */
1778     segment->accum = current->start;
1779   } else {
1780     /* start time is now the stepped position */
1781     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1782   }
1783
1784   /* restore the previous rate */
1785   segment->rate = current->start_rate;
1786   segment->abs_rate = ABS (segment->rate);
1787
1788   if (segment->rate > 0.0)
1789     segment->stop = current->start_stop;
1790   else
1791     segment->start = current->start_start;
1792
1793   /* the clip segment is used for position report in paused... */
1794   memcpy (sink->clip_segment, segment, sizeof (GstSegment));
1795
1796   /* post the step done when we know the stepped duration in TIME */
1797   message =
1798       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1799       current->amount, current->rate, current->flush, current->intermediate,
1800       current->duration, eos);
1801   gst_message_set_seqnum (message, current->seqnum);
1802   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1803
1804   if (!current->intermediate)
1805     sink->need_preroll = current->need_preroll;
1806
1807   /* and the current step info finished and becomes invalid */
1808   current->valid = FALSE;
1809 }
1810
1811 static gboolean
1812 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1813     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1814     gint64 * rstop)
1815 {
1816   gboolean step_end = FALSE;
1817
1818   /* see if we need to skip this buffer because of stepping */
1819   switch (current->format) {
1820     case GST_FORMAT_TIME:
1821     {
1822       guint64 end;
1823       gint64 first, last;
1824
1825       if (segment->rate > 0.0) {
1826         if (segment->stop == *cstop)
1827           *rstop = *rstart + current->amount;
1828
1829         first = *rstart;
1830         last = *rstop;
1831       } else {
1832         if (segment->start == *cstart)
1833           *rstart = *rstop + current->amount;
1834
1835         first = *rstop;
1836         last = *rstart;
1837       }
1838
1839       end = current->start + current->amount;
1840       current->position = first - current->start;
1841
1842       if (G_UNLIKELY (segment->abs_rate != 1.0))
1843         current->position /= segment->abs_rate;
1844
1845       GST_DEBUG_OBJECT (sink,
1846           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1847           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1848       GST_DEBUG_OBJECT (sink,
1849           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1850           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1851           GST_TIME_ARGS (last - current->start),
1852           GST_TIME_ARGS (current->amount));
1853
1854       if ((current->flush && current->position >= current->amount)
1855           || last >= end) {
1856         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1857         step_end = TRUE;
1858         if (segment->rate > 0.0) {
1859           *rstart = end;
1860           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1861         } else {
1862           *rstop = end;
1863           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1864         }
1865       }
1866       GST_DEBUG_OBJECT (sink,
1867           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1868           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1869       GST_DEBUG_OBJECT (sink,
1870           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1871           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1872       break;
1873     }
1874     case GST_FORMAT_BUFFERS:
1875       GST_DEBUG_OBJECT (sink,
1876           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1877           current->position, current->amount);
1878
1879       if (current->position < current->amount) {
1880         current->position++;
1881       } else {
1882         step_end = TRUE;
1883       }
1884       break;
1885     case GST_FORMAT_DEFAULT:
1886     default:
1887       GST_DEBUG_OBJECT (sink,
1888           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1889           current->position, current->amount);
1890       break;
1891   }
1892   return step_end;
1893 }
1894
1895 /* with STREAM_LOCK, PREROLL_LOCK
1896  *
1897  * Returns TRUE if the object needs synchronisation and takes therefore
1898  * part in prerolling.
1899  *
1900  * rsstart/rsstop contain the start/stop in stream time.
1901  * rrstart/rrstop contain the start/stop in running time.
1902  */
1903 static gboolean
1904 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1905     GstClockTime * rsstart, GstClockTime * rsstop,
1906     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1907     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1908     gboolean * step_end, guint8 obj_type)
1909 {
1910   GstBaseSinkClass *bclass;
1911   GstBuffer *buffer;
1912   GstClockTime start, stop;     /* raw start/stop timestamps */
1913   gint64 cstart, cstop;         /* clipped raw timestamps */
1914   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1915   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1916   GstFormat format;
1917   GstBaseSinkPrivate *priv;
1918   gboolean eos;
1919
1920   priv = basesink->priv;
1921
1922   /* start with nothing */
1923   start = stop = GST_CLOCK_TIME_NONE;
1924
1925   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1926     GstEvent *event = GST_EVENT_CAST (obj);
1927
1928     switch (GST_EVENT_TYPE (event)) {
1929         /* EOS event needs syncing */
1930       case GST_EVENT_EOS:
1931       {
1932         if (basesink->segment.rate >= 0.0) {
1933           sstart = sstop = priv->current_sstop;
1934           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1935             /* we have not seen a buffer yet, use the segment values */
1936             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1937                 basesink->segment.format, basesink->segment.stop);
1938           }
1939         } else {
1940           sstart = sstop = priv->current_sstart;
1941           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1942             /* we have not seen a buffer yet, use the segment values */
1943             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1944                 basesink->segment.format, basesink->segment.start);
1945           }
1946         }
1947
1948         rstart = rstop = priv->eos_rtime;
1949         *do_sync = rstart != -1;
1950         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1951             GST_TIME_ARGS (rstart));
1952         /* if we are stepping, we end now */
1953         *step_end = step->valid;
1954         eos = TRUE;
1955         goto eos_done;
1956       }
1957       default:
1958         /* other events do not need syncing */
1959         /* FIXME, maybe NEWSEGMENT might need synchronisation
1960          * since the POSITION query depends on accumulated times and
1961          * we cannot accumulate the current segment before the previous
1962          * one completed.
1963          */
1964         return FALSE;
1965     }
1966   }
1967
1968   eos = FALSE;
1969
1970 again:
1971   /* else do buffer sync code */
1972   buffer = GST_BUFFER_CAST (obj);
1973
1974   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1975
1976   /* just get the times to see if we need syncing, if the start returns -1 we
1977    * don't sync. */
1978   if (bclass->get_times)
1979     bclass->get_times (basesink, buffer, &start, &stop);
1980
1981   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1982     /* we don't need to sync but we still want to get the timestamps for
1983      * tracking the position */
1984     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1985     *do_sync = FALSE;
1986   } else {
1987     *do_sync = TRUE;
1988   }
1989
1990   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1991       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1992       GST_TIME_ARGS (stop), *do_sync);
1993
1994   /* collect segment and format for code clarity */
1995   format = segment->format;
1996
1997   /* no timestamp clipping if we did not get a TIME segment format */
1998   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1999     cstart = start;
2000     cstop = stop;
2001     /* do running and stream time in TIME format */
2002     format = GST_FORMAT_TIME;
2003     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2004     goto do_times;
2005   }
2006
2007   /* clip, only when we know about time */
2008   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2009               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2010     if (step->valid) {
2011       GST_DEBUG_OBJECT (basesink, "step out of segment");
2012       /* when we are stepping, pretend we're at the end of the segment */
2013       if (segment->rate > 0.0) {
2014         cstart = segment->stop;
2015         cstop = segment->stop;
2016       } else {
2017         cstart = segment->start;
2018         cstop = segment->start;
2019       }
2020       goto do_times;
2021     }
2022     goto out_of_segment;
2023   }
2024
2025   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2026     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2027         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2028         GST_TIME_ARGS (cstop));
2029   }
2030
2031   /* set last stop position */
2032   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2033     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2034   else
2035     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2036
2037 do_times:
2038   rstart = gst_segment_to_running_time (segment, format, cstart);
2039   rstop = gst_segment_to_running_time (segment, format, cstop);
2040
2041   if (G_UNLIKELY (step->valid)) {
2042     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2043                 &rstart, &rstop))) {
2044       /* step is still busy, we discard data when we are flushing */
2045       *stepped = step->flush;
2046       GST_DEBUG_OBJECT (basesink, "stepping busy");
2047     }
2048   }
2049   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2050    * upstream is behaving very badly */
2051   sstart = gst_segment_to_stream_time (segment, format, cstart);
2052   sstop = gst_segment_to_stream_time (segment, format, cstop);
2053
2054 eos_done:
2055   /* eos_done label only called when doing EOS, we also stop stepping then */
2056   if (*step_end && step->flush) {
2057     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2058     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2059     *step_end = FALSE;
2060     /* re-determine running start times for adjusted segment
2061      * (which has a flushed amount of running/accumulated time removed) */
2062     if (!GST_IS_EVENT (obj)) {
2063       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2064       goto again;
2065     }
2066   }
2067
2068   /* save times */
2069   *rsstart = sstart;
2070   *rsstop = sstop;
2071   *rrstart = rstart;
2072   *rrstop = rstop;
2073
2074   /* buffers and EOS always need syncing and preroll */
2075   return TRUE;
2076
2077   /* special cases */
2078 out_of_segment:
2079   {
2080     /* we usually clip in the chain function already but stepping could cause
2081      * the segment to be updated later. we return FALSE so that we don't try
2082      * to sync on it. */
2083     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2084     return FALSE;
2085   }
2086 }
2087
2088 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2089  * adjust a timestamp with the latency and timestamp offset. This function does
2090  * not adjust for the render delay. */
2091 static GstClockTime
2092 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2093 {
2094   GstClockTimeDiff ts_offset;
2095
2096   /* don't do anything funny with invalid timestamps */
2097   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2098     return time;
2099
2100   time += basesink->priv->latency;
2101
2102   /* apply offset, be carefull for underflows */
2103   ts_offset = basesink->priv->ts_offset;
2104   if (ts_offset < 0) {
2105     ts_offset = -ts_offset;
2106     if (ts_offset < time)
2107       time -= ts_offset;
2108     else
2109       time = 0;
2110   } else
2111     time += ts_offset;
2112
2113   /* subtract the render delay again, which was included in the latency */
2114   if (time > basesink->priv->render_delay)
2115     time -= basesink->priv->render_delay;
2116   else
2117     time = 0;
2118
2119   return time;
2120 }
2121
2122 /**
2123  * gst_base_sink_wait_clock:
2124  * @sink: the sink
2125  * @time: the running_time to be reached
2126  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2127  *
2128  * This function will block until @time is reached. It is usually called by
2129  * subclasses that use their own internal synchronisation.
2130  *
2131  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2132  * returned. Likewise, if synchronisation is disabled in the element or there
2133  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2134  *
2135  * This function should only be called with the PREROLL_LOCK held, like when
2136  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2137  * receiving a buffer in
2138  * the #GstBaseSinkClass.render() vmethod.
2139  *
2140  * The @time argument should be the running_time of when this method should
2141  * return and is not adjusted with any latency or offset configured in the
2142  * sink.
2143  *
2144  * Since: 0.10.20
2145  *
2146  * Returns: #GstClockReturn
2147  */
2148 GstClockReturn
2149 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2150     GstClockTimeDiff * jitter)
2151 {
2152   GstClockReturn ret;
2153   GstClock *clock;
2154   GstClockTime base_time;
2155
2156   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2157     goto invalid_time;
2158
2159   GST_OBJECT_LOCK (sink);
2160   if (G_UNLIKELY (!sink->sync))
2161     goto no_sync;
2162
2163   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2164     goto no_clock;
2165
2166   base_time = GST_ELEMENT_CAST (sink)->base_time;
2167   GST_LOG_OBJECT (sink,
2168       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2169       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2170
2171   /* add base_time to running_time to get the time against the clock */
2172   time += base_time;
2173
2174   /* Re-use existing clockid if available */
2175   if (G_LIKELY (sink->priv->cached_clock_id != NULL)) {
2176     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2177             time)) {
2178       gst_clock_id_unref (sink->priv->cached_clock_id);
2179       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2180     }
2181   } else
2182     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2183   GST_OBJECT_UNLOCK (sink);
2184
2185   /* A blocking wait is performed on the clock. We save the ClockID
2186    * so we can unlock the entry at any time. While we are blocking, we
2187    * release the PREROLL_LOCK so that other threads can interrupt the
2188    * entry. */
2189   sink->clock_id = sink->priv->cached_clock_id;
2190   /* release the preroll lock while waiting */
2191   GST_BASE_SINK_PREROLL_UNLOCK (sink);
2192
2193   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2194
2195   GST_BASE_SINK_PREROLL_LOCK (sink);
2196   sink->clock_id = NULL;
2197
2198   return ret;
2199
2200   /* no syncing needed */
2201 invalid_time:
2202   {
2203     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2204     return GST_CLOCK_BADTIME;
2205   }
2206 no_sync:
2207   {
2208     GST_DEBUG_OBJECT (sink, "sync disabled");
2209     GST_OBJECT_UNLOCK (sink);
2210     return GST_CLOCK_BADTIME;
2211   }
2212 no_clock:
2213   {
2214     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2215     GST_OBJECT_UNLOCK (sink);
2216     return GST_CLOCK_BADTIME;
2217   }
2218 }
2219
2220 /**
2221  * gst_base_sink_wait_preroll:
2222  * @sink: the sink
2223  *
2224  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2225  * against the clock it must unblock when going from PLAYING to the PAUSED state
2226  * and call this method before continuing to render the remaining data.
2227  *
2228  * This function will block until a state change to PLAYING happens (in which
2229  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2230  * to a state change to READY or a FLUSH event (in which case this function
2231  * returns #GST_FLOW_WRONG_STATE).
2232  *
2233  * This function should only be called with the PREROLL_LOCK held, like in the
2234  * render function.
2235  *
2236  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2237  * continue. Any other return value should be returned from the render vmethod.
2238  *
2239  * Since: 0.10.11
2240  */
2241 GstFlowReturn
2242 gst_base_sink_wait_preroll (GstBaseSink * sink)
2243 {
2244   sink->have_preroll = TRUE;
2245   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2246   /* block until the state changes, or we get a flush, or something */
2247   GST_BASE_SINK_PREROLL_WAIT (sink);
2248   sink->have_preroll = FALSE;
2249   if (G_UNLIKELY (sink->flushing))
2250     goto stopping;
2251   if (G_UNLIKELY (sink->priv->step_unlock))
2252     goto step_unlocked;
2253   GST_DEBUG_OBJECT (sink, "continue after preroll");
2254
2255   return GST_FLOW_OK;
2256
2257   /* ERRORS */
2258 stopping:
2259   {
2260     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2261     return GST_FLOW_WRONG_STATE;
2262   }
2263 step_unlocked:
2264   {
2265     sink->priv->step_unlock = FALSE;
2266     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2267     return GST_FLOW_STEP;
2268   }
2269 }
2270
2271 static inline guint8
2272 get_object_type (GstMiniObject * obj)
2273 {
2274   guint8 obj_type;
2275
2276   if (G_LIKELY (GST_IS_BUFFER (obj)))
2277     obj_type = _PR_IS_BUFFER;
2278   else if (GST_IS_EVENT (obj))
2279     obj_type = _PR_IS_EVENT;
2280   else if (GST_IS_BUFFER_LIST (obj))
2281     obj_type = _PR_IS_BUFFERLIST;
2282   else
2283     obj_type = _PR_IS_NOTHING;
2284
2285   return obj_type;
2286 }
2287
2288 /**
2289  * gst_base_sink_do_preroll:
2290  * @sink: the sink
2291  * @obj: (transfer none): the mini object that caused the preroll
2292  *
2293  * If the @sink spawns its own thread for pulling buffers from upstream it
2294  * should call this method after it has pulled a buffer. If the element needed
2295  * to preroll, this function will perform the preroll and will then block
2296  * until the element state is changed.
2297  *
2298  * This function should be called with the PREROLL_LOCK held.
2299  *
2300  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2301  * continue. Any other return value should be returned from the render vmethod.
2302  *
2303  * Since: 0.10.22
2304  */
2305 GstFlowReturn
2306 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2307 {
2308   GstFlowReturn ret;
2309
2310   while (G_UNLIKELY (sink->need_preroll)) {
2311     guint8 obj_type;
2312     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2313
2314     obj_type = get_object_type (obj);
2315
2316     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2317     if (ret != GST_FLOW_OK)
2318       goto preroll_failed;
2319
2320     /* need to recheck here because the commit state could have
2321      * made us not need the preroll anymore */
2322     if (G_LIKELY (sink->need_preroll)) {
2323       /* block until the state changes, or we get a flush, or something */
2324       ret = gst_base_sink_wait_preroll (sink);
2325       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2326         goto preroll_failed;
2327     }
2328   }
2329   return GST_FLOW_OK;
2330
2331   /* ERRORS */
2332 preroll_failed:
2333   {
2334     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2335     return ret;
2336   }
2337 }
2338
2339 /**
2340  * gst_base_sink_wait_eos:
2341  * @sink: the sink
2342  * @time: the running_time to be reached
2343  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2344  *
2345  * This function will block until @time is reached. It is usually called by
2346  * subclasses that use their own internal synchronisation but want to let the
2347  * EOS be handled by the base class.
2348  *
2349  * This function should only be called with the PREROLL_LOCK held, like when
2350  * receiving an EOS event in the ::event vmethod.
2351  *
2352  * The @time argument should be the running_time of when the EOS should happen
2353  * and will be adjusted with any latency and offset configured in the sink.
2354  *
2355  * Returns: #GstFlowReturn
2356  *
2357  * Since: 0.10.15
2358  */
2359 GstFlowReturn
2360 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2361     GstClockTimeDiff * jitter)
2362 {
2363   GstClockReturn status;
2364   GstFlowReturn ret;
2365
2366   do {
2367     GstClockTime stime;
2368
2369     GST_DEBUG_OBJECT (sink, "checking preroll");
2370
2371     /* first wait for the playing state before we can continue */
2372     while (G_UNLIKELY (sink->need_preroll)) {
2373       ret = gst_base_sink_wait_preroll (sink);
2374       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2375         goto flushing;
2376     }
2377
2378     /* preroll done, we can sync since we are in PLAYING now. */
2379     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2380         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2381
2382     /* compensate for latency and ts_offset. We don't adjust for render delay
2383      * because we don't interact with the device on EOS normally. */
2384     stime = gst_base_sink_adjust_time (sink, time);
2385
2386     /* wait for the clock, this can be interrupted because we got shut down or
2387      * we PAUSED. */
2388     status = gst_base_sink_wait_clock (sink, stime, jitter);
2389
2390     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2391
2392     /* invalid time, no clock or sync disabled, just continue then */
2393     if (status == GST_CLOCK_BADTIME)
2394       break;
2395
2396     /* waiting could have been interrupted and we can be flushing now */
2397     if (G_UNLIKELY (sink->flushing))
2398       goto flushing;
2399
2400     /* retry if we got unscheduled, which means we did not reach the timeout
2401      * yet. if some other error occures, we continue. */
2402   } while (status == GST_CLOCK_UNSCHEDULED);
2403
2404   GST_DEBUG_OBJECT (sink, "end of stream");
2405
2406   return GST_FLOW_OK;
2407
2408   /* ERRORS */
2409 flushing:
2410   {
2411     GST_DEBUG_OBJECT (sink, "we are flushing");
2412     return GST_FLOW_WRONG_STATE;
2413   }
2414 }
2415
2416 /* with STREAM_LOCK, PREROLL_LOCK
2417  *
2418  * Make sure we are in PLAYING and synchronize an object to the clock.
2419  *
2420  * If we need preroll, we are not in PLAYING. We try to commit the state
2421  * if needed and then block if we still are not PLAYING.
2422  *
2423  * We start waiting on the clock in PLAYING. If we got interrupted, we
2424  * immediatly try to re-preroll.
2425  *
2426  * Some objects do not need synchronisation (most events) and so this function
2427  * immediatly returns GST_FLOW_OK.
2428  *
2429  * for objects that arrive later than max-lateness to be synchronized to the
2430  * clock have the @late boolean set to TRUE.
2431  *
2432  * This function keeps a running average of the jitter (the diff between the
2433  * clock time and the requested sync time). The jitter is negative for
2434  * objects that arrive in time and positive for late buffers.
2435  *
2436  * does not take ownership of obj.
2437  */
2438 static GstFlowReturn
2439 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2440     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2441 {
2442   GstClockTimeDiff jitter = 0;
2443   gboolean syncable;
2444   GstClockReturn status = GST_CLOCK_OK;
2445   GstClockTime rstart, rstop, sstart, sstop, stime;
2446   gboolean do_sync;
2447   GstBaseSinkPrivate *priv;
2448   GstFlowReturn ret;
2449   GstStepInfo *current, *pending;
2450   gboolean stepped;
2451
2452   priv = basesink->priv;
2453
2454 do_step:
2455   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2456   do_sync = TRUE;
2457   stepped = FALSE;
2458
2459   priv->current_rstart = GST_CLOCK_TIME_NONE;
2460
2461   /* get stepping info */
2462   current = &priv->current_step;
2463   pending = &priv->pending_step;
2464
2465   /* get timing information for this object against the render segment */
2466   syncable = gst_base_sink_get_sync_times (basesink, obj,
2467       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2468       current, step_end, obj_type);
2469
2470   if (G_UNLIKELY (stepped))
2471     goto step_skipped;
2472
2473   /* a syncable object needs to participate in preroll and
2474    * clocking. All buffers and EOS are syncable. */
2475   if (G_UNLIKELY (!syncable))
2476     goto not_syncable;
2477
2478   /* store timing info for current object */
2479   priv->current_rstart = rstart;
2480   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2481
2482   /* save sync time for eos when the previous object needed sync */
2483   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2484
2485   /* calculate inter frame spacing */
2486   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2487     GstClockTime in_diff;
2488
2489     in_diff = rstart - priv->prev_rstart;
2490
2491     if (priv->avg_in_diff == -1)
2492       priv->avg_in_diff = in_diff;
2493     else
2494       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2495
2496     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2497         GST_TIME_ARGS (priv->avg_in_diff));
2498
2499   }
2500   priv->prev_rstart = rstart;
2501
2502   if (G_UNLIKELY (priv->earliest_in_time != -1
2503           && rstart < priv->earliest_in_time))
2504     goto qos_dropped;
2505
2506 again:
2507   /* first do preroll, this makes sure we commit our state
2508    * to PAUSED and can continue to PLAYING. We cannot perform
2509    * any clock sync in PAUSED because there is no clock. */
2510   ret = gst_base_sink_do_preroll (basesink, obj);
2511   if (G_UNLIKELY (ret != GST_FLOW_OK))
2512     goto preroll_failed;
2513
2514   /* update the segment with a pending step if the current one is invalid and we
2515    * have a new pending one. We only accept new step updates after a preroll */
2516   if (G_UNLIKELY (pending->valid && !current->valid)) {
2517     start_stepping (basesink, &basesink->segment, pending, current);
2518     goto do_step;
2519   }
2520
2521   /* After rendering we store the position of the last buffer so that we can use
2522    * it to report the position. We need to take the lock here. */
2523   GST_OBJECT_LOCK (basesink);
2524   priv->current_sstart = sstart;
2525   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2526   GST_OBJECT_UNLOCK (basesink);
2527
2528   if (!do_sync)
2529     goto done;
2530
2531   /* adjust for latency */
2532   stime = gst_base_sink_adjust_time (basesink, rstart);
2533
2534   /* adjust for render-delay, avoid underflows */
2535   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2536     if (stime > priv->render_delay)
2537       stime -= priv->render_delay;
2538     else
2539       stime = 0;
2540   }
2541
2542   /* preroll done, we can sync since we are in PLAYING now. */
2543   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2544       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2545       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2546
2547   /* This function will return immediatly if start == -1, no clock
2548    * or sync is disabled with GST_CLOCK_BADTIME. */
2549   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2550
2551   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2552       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2553
2554   /* invalid time, no clock or sync disabled, just render */
2555   if (status == GST_CLOCK_BADTIME)
2556     goto done;
2557
2558   /* waiting could have been interrupted and we can be flushing now */
2559   if (G_UNLIKELY (basesink->flushing))
2560     goto flushing;
2561
2562   /* check for unlocked by a state change, we are not flushing so
2563    * we can try to preroll on the current buffer. */
2564   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2565     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2566     priv->call_preroll = TRUE;
2567     goto again;
2568   }
2569
2570   /* successful syncing done, record observation */
2571   priv->current_jitter = jitter;
2572
2573   /* check if the object should be dropped */
2574   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2575       status, jitter);
2576
2577 done:
2578   return GST_FLOW_OK;
2579
2580   /* ERRORS */
2581 step_skipped:
2582   {
2583     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2584     *late = TRUE;
2585     return GST_FLOW_OK;
2586   }
2587 not_syncable:
2588   {
2589     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2590     return GST_FLOW_OK;
2591   }
2592 qos_dropped:
2593   {
2594     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2595     *late = TRUE;
2596     return GST_FLOW_OK;
2597   }
2598 flushing:
2599   {
2600     GST_DEBUG_OBJECT (basesink, "we are flushing");
2601     return GST_FLOW_WRONG_STATE;
2602   }
2603 preroll_failed:
2604   {
2605     GST_DEBUG_OBJECT (basesink, "preroll failed");
2606     *step_end = FALSE;
2607     return ret;
2608   }
2609 }
2610
2611 static gboolean
2612 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2613     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2614 {
2615   GstEvent *event;
2616   gboolean res;
2617
2618   /* generate Quality-of-Service event */
2619   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2620       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2621       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2622
2623   event = gst_event_new_qos_full (type, proportion, diff, time);
2624
2625   /* send upstream */
2626   res = gst_pad_push_event (basesink->sinkpad, event);
2627
2628   return res;
2629 }
2630
2631 static void
2632 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2633 {
2634   GstBaseSinkPrivate *priv;
2635   GstClockTime start, stop;
2636   GstClockTimeDiff jitter;
2637   GstClockTime pt, entered, left;
2638   GstClockTime duration;
2639   gdouble rate;
2640
2641   priv = sink->priv;
2642
2643   start = priv->current_rstart;
2644
2645   if (priv->current_step.valid)
2646     return;
2647
2648   /* if Quality-of-Service disabled, do nothing */
2649   if (!g_atomic_int_get (&priv->qos_enabled) ||
2650       !GST_CLOCK_TIME_IS_VALID (start))
2651     return;
2652
2653   stop = priv->current_rstop;
2654   jitter = priv->current_jitter;
2655
2656   if (jitter < 0) {
2657     /* this is the time the buffer entered the sink */
2658     if (start < -jitter)
2659       entered = 0;
2660     else
2661       entered = start + jitter;
2662     left = start;
2663   } else {
2664     /* this is the time the buffer entered the sink */
2665     entered = start + jitter;
2666     /* this is the time the buffer left the sink */
2667     left = start + jitter;
2668   }
2669
2670   /* calculate duration of the buffer */
2671   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2672     duration = stop - start;
2673   else
2674     duration = priv->avg_in_diff;
2675
2676   /* if we have the time when the last buffer left us, calculate
2677    * processing time */
2678   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2679     if (entered > priv->last_left) {
2680       pt = entered - priv->last_left;
2681     } else {
2682       pt = 0;
2683     }
2684   } else {
2685     pt = priv->avg_pt;
2686   }
2687
2688   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2689       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2690       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2691       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2692       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2693       GST_TIME_ARGS (duration), jitter);
2694
2695   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2696       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2697       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2698       priv->avg_rate);
2699
2700   /* collect running averages. for first observations, we copy the
2701    * values */
2702   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2703     priv->avg_duration = duration;
2704   else
2705     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2706
2707   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2708     priv->avg_pt = pt;
2709   else
2710     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2711
2712   if (priv->avg_duration != 0)
2713     rate =
2714         gst_guint64_to_gdouble (priv->avg_pt) /
2715         gst_guint64_to_gdouble (priv->avg_duration);
2716   else
2717     rate = 1.0;
2718
2719   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2720     if (dropped || priv->avg_rate < 0.0) {
2721       priv->avg_rate = rate;
2722     } else {
2723       if (rate > 1.0)
2724         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2725       else
2726         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2727     }
2728   }
2729
2730   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2731       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2732       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2733       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2734
2735
2736   if (priv->avg_rate >= 0.0) {
2737     GstQOSType type;
2738     GstClockTimeDiff diff;
2739
2740     /* if we have a valid rate, start sending QoS messages */
2741     if (priv->current_jitter < 0) {
2742       /* make sure we never go below 0 when adding the jitter to the
2743        * timestamp. */
2744       if (priv->current_rstart < -priv->current_jitter)
2745         priv->current_jitter = -priv->current_rstart;
2746     }
2747
2748     if (priv->throttle_time > 0) {
2749       diff = priv->throttle_time;
2750       type = GST_QOS_TYPE_THROTTLE;
2751     } else {
2752       diff = priv->current_jitter;
2753       if (diff <= 0)
2754         type = GST_QOS_TYPE_OVERFLOW;
2755       else
2756         type = GST_QOS_TYPE_UNDERFLOW;
2757     }
2758
2759     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2760         diff);
2761   }
2762
2763   /* record when this buffer will leave us */
2764   priv->last_left = left;
2765 }
2766
2767 /* reset all qos measuring */
2768 static void
2769 gst_base_sink_reset_qos (GstBaseSink * sink)
2770 {
2771   GstBaseSinkPrivate *priv;
2772
2773   priv = sink->priv;
2774
2775   priv->last_render_time = GST_CLOCK_TIME_NONE;
2776   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2777   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2778   priv->last_left = GST_CLOCK_TIME_NONE;
2779   priv->avg_duration = GST_CLOCK_TIME_NONE;
2780   priv->avg_pt = GST_CLOCK_TIME_NONE;
2781   priv->avg_rate = -1.0;
2782   priv->avg_render = GST_CLOCK_TIME_NONE;
2783   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2784   priv->rendered = 0;
2785   priv->dropped = 0;
2786
2787 }
2788
2789 /* Checks if the object was scheduled too late.
2790  *
2791  * rstart/rstop contain the running_time start and stop values
2792  * of the object.
2793  *
2794  * status and jitter contain the return values from the clock wait.
2795  *
2796  * returns TRUE if the buffer was too late.
2797  */
2798 static gboolean
2799 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2800     GstClockTime rstart, GstClockTime rstop,
2801     GstClockReturn status, GstClockTimeDiff jitter)
2802 {
2803   gboolean late;
2804   gint64 max_lateness;
2805   GstBaseSinkPrivate *priv;
2806
2807   priv = basesink->priv;
2808
2809   late = FALSE;
2810
2811   /* only for objects that were too late */
2812   if (G_LIKELY (status != GST_CLOCK_EARLY))
2813     goto in_time;
2814
2815   max_lateness = basesink->max_lateness;
2816
2817   /* check if frame dropping is enabled */
2818   if (max_lateness == -1)
2819     goto no_drop;
2820
2821   /* only check for buffers */
2822   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2823     goto not_buffer;
2824
2825   /* can't do check if we don't have a timestamp */
2826   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2827     goto no_timestamp;
2828
2829   /* we can add a valid stop time */
2830   if (GST_CLOCK_TIME_IS_VALID (rstop))
2831     max_lateness += rstop;
2832   else {
2833     max_lateness += rstart;
2834     /* no stop time, use avg frame diff */
2835     if (priv->avg_in_diff != -1)
2836       max_lateness += priv->avg_in_diff;
2837   }
2838
2839   /* if the jitter bigger than duration and lateness we are too late */
2840   if ((late = rstart + jitter > max_lateness)) {
2841     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2842         "buffer is too late %" GST_TIME_FORMAT
2843         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2844         GST_TIME_ARGS (max_lateness));
2845     /* !!emergency!!, if we did not receive anything valid for more than a
2846      * second, render it anyway so the user sees something */
2847     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2848         rstart - priv->last_render_time > GST_SECOND) {
2849       late = FALSE;
2850       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2851           (_("A lot of buffers are being dropped.")),
2852           ("There may be a timestamping problem, or this computer is too slow."));
2853       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2854           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2855           GST_TIME_ARGS (priv->last_render_time));
2856     }
2857   }
2858
2859 done:
2860   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2861     priv->last_render_time = rstart;
2862     /* the next allowed input timestamp */
2863     if (priv->throttle_time > 0)
2864       priv->earliest_in_time = rstart + priv->throttle_time;
2865   }
2866   return late;
2867
2868   /* all is fine */
2869 in_time:
2870   {
2871     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2872     goto done;
2873   }
2874 no_drop:
2875   {
2876     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2877     goto done;
2878   }
2879 not_buffer:
2880   {
2881     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2882     return FALSE;
2883   }
2884 no_timestamp:
2885   {
2886     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2887     return FALSE;
2888   }
2889 }
2890
2891 /* called before and after calling the render vmethod. It keeps track of how
2892  * much time was spent in the render method and is used to check if we are
2893  * flooded */
2894 static void
2895 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2896 {
2897   GstBaseSinkPrivate *priv;
2898
2899   priv = basesink->priv;
2900
2901   if (start) {
2902     priv->start = gst_util_get_timestamp ();
2903   } else {
2904     GstClockTime elapsed;
2905
2906     priv->stop = gst_util_get_timestamp ();
2907
2908     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2909
2910     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2911       priv->avg_render = elapsed;
2912     else
2913       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2914
2915     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2916         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2917   }
2918 }
2919
2920 /* with STREAM_LOCK, PREROLL_LOCK,
2921  *
2922  * Synchronize the object on the clock and then render it.
2923  *
2924  * takes ownership of obj.
2925  */
2926 static GstFlowReturn
2927 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2928     guint8 obj_type, gpointer obj)
2929 {
2930   GstFlowReturn ret;
2931   GstBaseSinkClass *bclass;
2932   gboolean late, step_end;
2933   gpointer sync_obj;
2934   GstBaseSinkPrivate *priv;
2935
2936   priv = basesink->priv;
2937
2938   if (OBJ_IS_BUFFERLIST (obj_type)) {
2939     /*
2940      * If buffer list, use the first group buffer within the list
2941      * for syncing
2942      */
2943     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2944     g_assert (NULL != sync_obj);
2945   } else {
2946     sync_obj = obj;
2947   }
2948
2949 again:
2950   late = FALSE;
2951   step_end = FALSE;
2952
2953   /* synchronize this object, non syncable objects return OK
2954    * immediatly. */
2955   ret =
2956       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2957       obj_type);
2958   if (G_UNLIKELY (ret != GST_FLOW_OK))
2959     goto sync_failed;
2960
2961   /* and now render, event or buffer/buffer list. */
2962   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2963     /* drop late buffers unconditionally, let's hope it's unlikely */
2964     if (G_UNLIKELY (late))
2965       goto dropped;
2966
2967     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2968
2969     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2970             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2971       gint do_qos;
2972
2973       /* read once, to get same value before and after */
2974       do_qos = g_atomic_int_get (&priv->qos_enabled);
2975
2976       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2977
2978       /* record rendering time for QoS and stats */
2979       if (do_qos)
2980         gst_base_sink_do_render_stats (basesink, TRUE);
2981
2982       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2983         GstBuffer *buf;
2984
2985         /* For buffer lists do not set last buffer. Creating buffer
2986          * with meaningful data can be done only with memcpy which will
2987          * significantly affect performance */
2988         buf = GST_BUFFER_CAST (obj);
2989         gst_base_sink_set_last_buffer (basesink, buf);
2990
2991         ret = bclass->render (basesink, buf);
2992       } else {
2993         GstBufferList *buflist;
2994
2995         buflist = GST_BUFFER_LIST_CAST (obj);
2996
2997         ret = bclass->render_list (basesink, buflist);
2998       }
2999
3000       if (do_qos)
3001         gst_base_sink_do_render_stats (basesink, FALSE);
3002
3003       if (ret == GST_FLOW_STEP)
3004         goto again;
3005
3006       if (G_UNLIKELY (basesink->flushing))
3007         goto flushing;
3008
3009       priv->rendered++;
3010     }
3011   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3012     GstEvent *event = GST_EVENT_CAST (obj);
3013     gboolean event_res = TRUE;
3014     GstEventType type;
3015
3016     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3017
3018     type = GST_EVENT_TYPE (event);
3019
3020     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3021         gst_event_type_get_name (type));
3022
3023     if (bclass->event)
3024       event_res = bclass->event (basesink, event);
3025
3026     /* when we get here we could be flushing again when the event handler calls
3027      * _wait_eos(). We have to ignore this object in that case. */
3028     if (G_UNLIKELY (basesink->flushing))
3029       goto flushing;
3030
3031     if (G_LIKELY (event_res)) {
3032       guint32 seqnum;
3033
3034       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3035       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3036
3037       switch (type) {
3038         case GST_EVENT_EOS:
3039         {
3040           GstMessage *message;
3041
3042           /* the EOS event is completely handled so we mark
3043            * ourselves as being in the EOS state. eos is also
3044            * protected by the object lock so we can read it when
3045            * answering the POSITION query. */
3046           GST_OBJECT_LOCK (basesink);
3047           basesink->eos = TRUE;
3048           GST_OBJECT_UNLOCK (basesink);
3049
3050           /* ok, now we can post the message */
3051           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3052
3053           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3054           gst_message_set_seqnum (message, seqnum);
3055           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3056           break;
3057         }
3058         case GST_EVENT_NEWSEGMENT:
3059           /* configure the segment */
3060           gst_base_sink_configure_segment (basesink, pad, event,
3061               &basesink->segment);
3062           break;
3063         case GST_EVENT_SINK_MESSAGE:{
3064           GstMessage *msg = NULL;
3065
3066           gst_event_parse_sink_message (event, &msg);
3067
3068           if (msg)
3069             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3070         }
3071         default:
3072           break;
3073       }
3074     }
3075   } else {
3076     g_return_val_if_reached (GST_FLOW_ERROR);
3077   }
3078
3079 done:
3080   if (step_end) {
3081     /* the step ended, check if we need to activate a new step */
3082     GST_DEBUG_OBJECT (basesink, "step ended");
3083     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3084         priv->current_rstart, priv->current_rstop, basesink->eos);
3085     goto again;
3086   }
3087
3088   gst_base_sink_perform_qos (basesink, late);
3089
3090   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3091   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3092   return ret;
3093
3094   /* ERRORS */
3095 sync_failed:
3096   {
3097     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3098     goto done;
3099   }
3100 dropped:
3101   {
3102     priv->dropped++;
3103     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3104
3105     if (g_atomic_int_get (&priv->qos_enabled)) {
3106       GstMessage *qos_msg;
3107       GstClockTime timestamp, duration;
3108
3109       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3110       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3111
3112       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3113           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3114           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3115           GST_TIME_ARGS (priv->current_rstart),
3116           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3117           GST_TIME_ARGS (duration));
3118       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3119           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3120           priv->rendered, priv->dropped);
3121
3122       qos_msg =
3123           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3124           priv->current_rstart, priv->current_sstart, timestamp, duration);
3125       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3126           1000000);
3127       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3128           priv->dropped);
3129       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3130     }
3131     goto done;
3132   }
3133 flushing:
3134   {
3135     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3136     gst_mini_object_unref (obj);
3137     return GST_FLOW_WRONG_STATE;
3138   }
3139 }
3140
3141 /* with STREAM_LOCK, PREROLL_LOCK
3142  *
3143  * Perform preroll on the given object. For buffers this means
3144  * calling the preroll subclass method.
3145  * If that succeeds, the state will be commited.
3146  *
3147  * function does not take ownership of obj.
3148  */
3149 static GstFlowReturn
3150 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3151     GstMiniObject * obj)
3152 {
3153   GstFlowReturn ret;
3154
3155   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3156
3157   /* if it's a buffer, we need to call the preroll method */
3158   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3159     GstBaseSinkClass *bclass;
3160     GstBuffer *buf;
3161     GstClockTime timestamp;
3162
3163     if (OBJ_IS_BUFFERLIST (obj_type)) {
3164       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3165       g_assert (NULL != buf);
3166     } else {
3167       buf = GST_BUFFER_CAST (obj);
3168     }
3169
3170     timestamp = GST_BUFFER_TIMESTAMP (buf);
3171
3172     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3173         GST_TIME_ARGS (timestamp));
3174
3175     /*
3176      * For buffer lists do not set last buffer. Creating buffer
3177      * with meaningful data can be done only with memcpy which will
3178      * significantly affect performance
3179      */
3180     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3181       gst_base_sink_set_last_buffer (basesink, buf);
3182     }
3183
3184     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3185     if (bclass->preroll)
3186       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3187         goto preroll_failed;
3188
3189     basesink->priv->call_preroll = FALSE;
3190   }
3191
3192   /* commit state */
3193   if (G_LIKELY (basesink->playing_async)) {
3194     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3195       goto stopping;
3196   }
3197
3198   return GST_FLOW_OK;
3199
3200   /* ERRORS */
3201 preroll_failed:
3202   {
3203     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3204     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3205     return ret;
3206   }
3207 stopping:
3208   {
3209     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3210     return GST_FLOW_WRONG_STATE;
3211   }
3212 }
3213
3214 /* with STREAM_LOCK, PREROLL_LOCK
3215  *
3216  * Queue an object for rendering.
3217  * The first prerollable object queued will complete the preroll. If the
3218  * preroll queue if filled, we render all the objects in the queue.
3219  *
3220  * This function takes ownership of the object.
3221  */
3222 static GstFlowReturn
3223 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3224     guint8 obj_type, gpointer obj, gboolean prerollable)
3225 {
3226   GstFlowReturn ret = GST_FLOW_OK;
3227   gint length;
3228   GQueue *q;
3229
3230   if (G_UNLIKELY (basesink->need_preroll)) {
3231     if (G_LIKELY (prerollable))
3232       basesink->preroll_queued++;
3233
3234     length = basesink->preroll_queued;
3235
3236     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3237
3238     /* first prerollable item needs to finish the preroll */
3239     if (length == 1) {
3240       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3241       if (G_UNLIKELY (ret != GST_FLOW_OK))
3242         goto preroll_failed;
3243     }
3244     /* need to recheck if we need preroll, commmit state during preroll
3245      * could have made us not need more preroll. */
3246     if (G_UNLIKELY (basesink->need_preroll)) {
3247       /* see if we can render now, if we can't add the object to the preroll
3248        * queue. */
3249       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3250         goto more_preroll;
3251     }
3252   }
3253   /* we can start rendering (or blocking) the queued object
3254    * if any. */
3255   q = basesink->preroll_queue;
3256   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3257     GstMiniObject *o;
3258     guint8 ot;
3259
3260     o = g_queue_pop_head (q);
3261     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3262
3263     ot = get_object_type (o);
3264
3265     /* do something with the return value */
3266     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3267     if (ret != GST_FLOW_OK)
3268       goto dequeue_failed;
3269   }
3270
3271   /* now render the object */
3272   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3273   basesink->preroll_queued = 0;
3274
3275   return ret;
3276
3277   /* special cases */
3278 preroll_failed:
3279   {
3280     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3281         gst_flow_get_name (ret));
3282     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3283     return ret;
3284   }
3285 more_preroll:
3286   {
3287     /* add object to the queue and return */
3288     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3289         length, basesink->preroll_queue_max_len);
3290     g_queue_push_tail (basesink->preroll_queue, obj);
3291     return GST_FLOW_OK;
3292   }
3293 dequeue_failed:
3294   {
3295     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3296         gst_flow_get_name (ret));
3297     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3298     return ret;
3299   }
3300 }
3301
3302 /* with STREAM_LOCK
3303  *
3304  * This function grabs the PREROLL_LOCK and adds the object to
3305  * the queue.
3306  *
3307  * This function takes ownership of obj.
3308  *
3309  * Note: Only GstEvent seem to be passed to this private method
3310  */
3311 static GstFlowReturn
3312 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3313     GstMiniObject * obj, gboolean prerollable)
3314 {
3315   GstFlowReturn ret;
3316
3317   GST_BASE_SINK_PREROLL_LOCK (basesink);
3318   if (G_UNLIKELY (basesink->flushing))
3319     goto flushing;
3320
3321   if (G_UNLIKELY (basesink->priv->received_eos))
3322     goto was_eos;
3323
3324   ret =
3325       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3326       prerollable);
3327   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3328
3329   return ret;
3330
3331   /* ERRORS */
3332 flushing:
3333   {
3334     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3335     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3336     gst_mini_object_unref (obj);
3337     return GST_FLOW_WRONG_STATE;
3338   }
3339 was_eos:
3340   {
3341     GST_DEBUG_OBJECT (basesink,
3342         "we are EOS, dropping object, return UNEXPECTED");
3343     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3344     gst_mini_object_unref (obj);
3345     return GST_FLOW_UNEXPECTED;
3346   }
3347 }
3348
3349 static void
3350 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3351 {
3352   /* make sure we are not blocked on the clock also clear any pending
3353    * eos state. */
3354   gst_base_sink_set_flushing (basesink, pad, TRUE);
3355
3356   /* we grab the stream lock but that is not needed since setting the
3357    * sink to flushing would make sure no state commit is being done
3358    * anymore */
3359   GST_PAD_STREAM_LOCK (pad);
3360   gst_base_sink_reset_qos (basesink);
3361   /* and we need to commit our state again on the next
3362    * prerolled buffer */
3363   basesink->playing_async = TRUE;
3364   if (basesink->priv->async_enabled) {
3365     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3366   } else {
3367     basesink->priv->have_latency = TRUE;
3368   }
3369   gst_base_sink_set_last_buffer (basesink, NULL);
3370   GST_PAD_STREAM_UNLOCK (pad);
3371 }
3372
3373 static void
3374 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3375 {
3376   /* unset flushing so we can accept new data, this also flushes out any EOS
3377    * event. */
3378   gst_base_sink_set_flushing (basesink, pad, FALSE);
3379
3380   /* for position reporting */
3381   GST_OBJECT_LOCK (basesink);
3382   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3383   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3384   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3385   basesink->priv->call_preroll = TRUE;
3386   basesink->priv->current_step.valid = FALSE;
3387   basesink->priv->pending_step.valid = FALSE;
3388   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3389     /* we need new segment info after the flush. */
3390     basesink->have_newsegment = FALSE;
3391     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3392     gst_segment_init (basesink->clip_segment, GST_FORMAT_UNDEFINED);
3393   }
3394   GST_OBJECT_UNLOCK (basesink);
3395 }
3396
3397 static gboolean
3398 gst_base_sink_event (GstPad * pad, GstEvent * event)
3399 {
3400   GstBaseSink *basesink;
3401   gboolean result = TRUE;
3402   GstBaseSinkClass *bclass;
3403
3404   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3405
3406   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3407
3408   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3409       event);
3410
3411   switch (GST_EVENT_TYPE (event)) {
3412     case GST_EVENT_EOS:
3413     {
3414       GstFlowReturn ret;
3415
3416       GST_BASE_SINK_PREROLL_LOCK (basesink);
3417       if (G_UNLIKELY (basesink->flushing))
3418         goto flushing;
3419
3420       if (G_UNLIKELY (basesink->priv->received_eos)) {
3421         /* we can't accept anything when we are EOS */
3422         result = FALSE;
3423         gst_event_unref (event);
3424       } else {
3425         /* we set the received EOS flag here so that we can use it when testing if
3426          * we are prerolled and to refuse more buffers. */
3427         basesink->priv->received_eos = TRUE;
3428
3429         /* EOS is a prerollable object, we call the unlocked version because it
3430          * does not check the received_eos flag. */
3431         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3432             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3433         if (G_UNLIKELY (ret != GST_FLOW_OK))
3434           result = FALSE;
3435       }
3436       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3437       break;
3438     }
3439     case GST_EVENT_NEWSEGMENT:
3440     {
3441       GstFlowReturn ret;
3442       gboolean update;
3443
3444       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3445
3446       GST_BASE_SINK_PREROLL_LOCK (basesink);
3447       if (G_UNLIKELY (basesink->flushing))
3448         goto flushing;
3449
3450       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3451           NULL, NULL);
3452
3453       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3454         /* we can't accept anything when we are EOS */
3455         result = FALSE;
3456         gst_event_unref (event);
3457       } else {
3458         /* the new segment is a non prerollable item and does not block anything,
3459          * we need to configure the current clipping segment and insert the event
3460          * in the queue to serialize it with the buffers for rendering. */
3461         gst_base_sink_configure_segment (basesink, pad, event,
3462             basesink->clip_segment);
3463
3464         ret =
3465             gst_base_sink_queue_object_unlocked (basesink, pad,
3466             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3467         if (G_UNLIKELY (ret != GST_FLOW_OK))
3468           result = FALSE;
3469         else {
3470           GST_OBJECT_LOCK (basesink);
3471           basesink->have_newsegment = TRUE;
3472           GST_OBJECT_UNLOCK (basesink);
3473         }
3474       }
3475       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3476       break;
3477     }
3478     case GST_EVENT_FLUSH_START:
3479       if (bclass->event)
3480         bclass->event (basesink, event);
3481
3482       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3483
3484       gst_base_sink_flush_start (basesink, pad);
3485
3486       gst_event_unref (event);
3487       break;
3488     case GST_EVENT_FLUSH_STOP:
3489       if (bclass->event)
3490         bclass->event (basesink, event);
3491
3492       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3493
3494       gst_base_sink_flush_stop (basesink, pad);
3495
3496       gst_event_unref (event);
3497       break;
3498     default:
3499       /* other events are sent to queue or subclass depending on if they
3500        * are serialized. */
3501       if (GST_EVENT_IS_SERIALIZED (event)) {
3502         gst_base_sink_queue_object (basesink, pad,
3503             GST_MINI_OBJECT_CAST (event), FALSE);
3504       } else {
3505         if (bclass->event)
3506           bclass->event (basesink, event);
3507         gst_event_unref (event);
3508       }
3509       break;
3510   }
3511 done:
3512   gst_object_unref (basesink);
3513
3514   return result;
3515
3516   /* ERRORS */
3517 flushing:
3518   {
3519     GST_DEBUG_OBJECT (basesink, "we are flushing");
3520     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3521     result = FALSE;
3522     gst_event_unref (event);
3523     goto done;
3524   }
3525 }
3526
3527 /* default implementation to calculate the start and end
3528  * timestamps on a buffer, subclasses can override
3529  */
3530 static void
3531 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3532     GstClockTime * start, GstClockTime * end)
3533 {
3534   GstClockTime timestamp, duration;
3535
3536   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3537   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3538
3539     /* get duration to calculate end time */
3540     duration = GST_BUFFER_DURATION (buffer);
3541     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3542       *end = timestamp + duration;
3543     }
3544     *start = timestamp;
3545   }
3546 }
3547
3548 /* must be called with PREROLL_LOCK */
3549 static gboolean
3550 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3551 {
3552   gboolean is_prerolled, res;
3553
3554   /* we have 2 cases where the PREROLL_LOCK is released:
3555    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3556    *  2) we are syncing on the clock
3557    */
3558   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3559   res = !is_prerolled;
3560
3561   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3562       basesink->have_preroll, basesink->priv->received_eos, res);
3563
3564   return res;
3565 }
3566
3567 /* with STREAM_LOCK, PREROLL_LOCK
3568  *
3569  * Takes a buffer and compare the timestamps with the last segment.
3570  * If the buffer falls outside of the segment boundaries, drop it.
3571  * Else queue the buffer for preroll and rendering.
3572  *
3573  * This function takes ownership of the buffer.
3574  */
3575 static GstFlowReturn
3576 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3577     guint8 obj_type, gpointer obj)
3578 {
3579   GstBaseSinkClass *bclass;
3580   GstFlowReturn result;
3581   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3582   GstSegment *clip_segment;
3583   GstBuffer *time_buf;
3584
3585   if (G_UNLIKELY (basesink->flushing))
3586     goto flushing;
3587
3588   if (G_UNLIKELY (basesink->priv->received_eos))
3589     goto was_eos;
3590
3591   if (OBJ_IS_BUFFERLIST (obj_type)) {
3592     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3593     g_assert (NULL != time_buf);
3594   } else {
3595     time_buf = GST_BUFFER_CAST (obj);
3596   }
3597
3598   /* for code clarity */
3599   clip_segment = basesink->clip_segment;
3600
3601   if (G_UNLIKELY (!basesink->have_newsegment)) {
3602     gboolean sync;
3603
3604     sync = gst_base_sink_get_sync (basesink);
3605     if (sync) {
3606       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3607           (_("Internal data flow problem.")),
3608           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3609     }
3610
3611     /* this means this sink will assume timestamps start from 0 */
3612     GST_OBJECT_LOCK (basesink);
3613     clip_segment->start = 0;
3614     clip_segment->stop = -1;
3615     basesink->segment.start = 0;
3616     basesink->segment.stop = -1;
3617     basesink->have_newsegment = TRUE;
3618     GST_OBJECT_UNLOCK (basesink);
3619   }
3620
3621   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3622
3623   /* check if the buffer needs to be dropped, we first ask the subclass for the
3624    * start and end */
3625   if (bclass->get_times)
3626     bclass->get_times (basesink, time_buf, &start, &end);
3627
3628   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3629     /* if the subclass does not want sync, we use our own values so that we at
3630      * least clip the buffer to the segment */
3631     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3632   }
3633
3634   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3635       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3636
3637   /* a dropped buffer does not participate in anything */
3638   if (GST_CLOCK_TIME_IS_VALID (start) &&
3639       (clip_segment->format == GST_FORMAT_TIME)) {
3640     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3641                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3642       goto out_of_segment;
3643   }
3644
3645   /* now we can process the buffer in the queue, this function takes ownership
3646    * of the buffer */
3647   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3648       obj_type, obj, TRUE);
3649   return result;
3650
3651   /* ERRORS */
3652 flushing:
3653   {
3654     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3655     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3656     return GST_FLOW_WRONG_STATE;
3657   }
3658 was_eos:
3659   {
3660     GST_DEBUG_OBJECT (basesink,
3661         "we are EOS, dropping object, return UNEXPECTED");
3662     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3663     return GST_FLOW_UNEXPECTED;
3664   }
3665 out_of_segment:
3666   {
3667     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3668     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3669     return GST_FLOW_OK;
3670   }
3671 }
3672
3673 /* with STREAM_LOCK
3674  */
3675 static GstFlowReturn
3676 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3677     guint8 obj_type, gpointer obj)
3678 {
3679   GstFlowReturn result;
3680
3681   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3682     goto wrong_mode;
3683
3684   GST_BASE_SINK_PREROLL_LOCK (basesink);
3685   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3686   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3687
3688 done:
3689   return result;
3690
3691   /* ERRORS */
3692 wrong_mode:
3693   {
3694     GST_OBJECT_LOCK (pad);
3695     GST_WARNING_OBJECT (basesink,
3696         "Push on pad %s:%s, but it was not activated in push mode",
3697         GST_DEBUG_PAD_NAME (pad));
3698     GST_OBJECT_UNLOCK (pad);
3699     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3700     /* we don't post an error message this will signal to the peer
3701      * pushing that EOS is reached. */
3702     result = GST_FLOW_UNEXPECTED;
3703     goto done;
3704   }
3705 }
3706
3707 static GstFlowReturn
3708 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3709 {
3710   GstBaseSink *basesink;
3711
3712   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3713
3714   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3715 }
3716
3717 static GstFlowReturn
3718 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3719 {
3720   GstBaseSink *basesink;
3721   GstBaseSinkClass *bclass;
3722   GstFlowReturn result;
3723
3724   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3725   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3726
3727   if (G_LIKELY (bclass->render_list)) {
3728     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3729   } else {
3730     GstBufferListIterator *it;
3731     GstBuffer *group;
3732
3733     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3734
3735     it = gst_buffer_list_iterate (list);
3736
3737     if (gst_buffer_list_iterator_next_group (it)) {
3738       do {
3739         group = gst_buffer_list_iterator_merge_group (it);
3740         if (group == NULL) {
3741           group = gst_buffer_new ();
3742           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3743         } else {
3744           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3745         }
3746         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3747       } while (result == GST_FLOW_OK
3748           && gst_buffer_list_iterator_next_group (it));
3749     } else {
3750       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3751       result =
3752           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3753           gst_buffer_new ());
3754     }
3755     gst_buffer_list_iterator_free (it);
3756     gst_buffer_list_unref (list);
3757   }
3758   return result;
3759 }
3760
3761
3762 static gboolean
3763 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3764 {
3765   gboolean res = TRUE;
3766
3767   /* update our offset if the start/stop position was updated */
3768   if (segment->format == GST_FORMAT_BYTES) {
3769     segment->time = segment->start;
3770   } else if (segment->start == 0) {
3771     /* seek to start, we can implement a default for this. */
3772     segment->time = 0;
3773   } else {
3774     res = FALSE;
3775     GST_INFO_OBJECT (sink, "Can't do a default seek");
3776   }
3777
3778   return res;
3779 }
3780
3781 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3782
3783 static gboolean
3784 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3785     GstEvent * event, GstSegment * segment)
3786 {
3787   /* By default, we try one of 2 things:
3788    *   - For absolute seek positions, convert the requested position to our
3789    *     configured processing format and place it in the output segment \
3790    *   - For relative seek positions, convert our current (input) values to the
3791    *     seek format, adjust by the relative seek offset and then convert back to
3792    *     the processing format
3793    */
3794   GstSeekType cur_type, stop_type;
3795   gint64 cur, stop;
3796   GstSeekFlags flags;
3797   GstFormat seek_format, dest_format;
3798   gdouble rate;
3799   gboolean update;
3800   gboolean res = TRUE;
3801
3802   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3803       &cur_type, &cur, &stop_type, &stop);
3804   dest_format = segment->format;
3805
3806   if (seek_format == dest_format) {
3807     gst_segment_set_seek (segment, rate, seek_format, flags,
3808         cur_type, cur, stop_type, stop, &update);
3809     return TRUE;
3810   }
3811
3812   if (cur_type != GST_SEEK_TYPE_NONE) {
3813     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3814     res =
3815         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3816         &cur);
3817     cur_type = GST_SEEK_TYPE_SET;
3818   }
3819
3820   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3821     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3822     res =
3823         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3824         &stop);
3825     stop_type = GST_SEEK_TYPE_SET;
3826   }
3827
3828   /* And finally, configure our output segment in the desired format */
3829   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3830       stop_type, stop, &update);
3831
3832   if (!res)
3833     goto no_format;
3834
3835   return res;
3836
3837 no_format:
3838   {
3839     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3840     return FALSE;
3841   }
3842 }
3843
3844 /* perform a seek, only executed in pull mode */
3845 static gboolean
3846 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3847 {
3848   gboolean flush;
3849   gdouble rate;
3850   GstFormat seek_format, dest_format;
3851   GstSeekFlags flags;
3852   GstSeekType cur_type, stop_type;
3853   gboolean seekseg_configured = FALSE;
3854   gint64 cur, stop;
3855   gboolean update, res = TRUE;
3856   GstSegment seeksegment;
3857
3858   dest_format = sink->segment.format;
3859
3860   if (event) {
3861     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3862     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3863         &cur_type, &cur, &stop_type, &stop);
3864
3865     flush = flags & GST_SEEK_FLAG_FLUSH;
3866   } else {
3867     GST_DEBUG_OBJECT (sink, "performing seek without event");
3868     flush = FALSE;
3869   }
3870
3871   if (flush) {
3872     GST_DEBUG_OBJECT (sink, "flushing upstream");
3873     gst_pad_push_event (pad, gst_event_new_flush_start ());
3874     gst_base_sink_flush_start (sink, pad);
3875   } else {
3876     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3877   }
3878
3879   GST_PAD_STREAM_LOCK (pad);
3880
3881   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3882    * copy the current segment info into the temp segment that we can actually
3883    * attempt the seek with. We only update the real segment if the seek suceeds. */
3884   if (!seekseg_configured) {
3885     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3886
3887     /* now configure the final seek segment */
3888     if (event) {
3889       if (sink->segment.format != seek_format) {
3890         /* OK, here's where we give the subclass a chance to convert the relative
3891          * seek into an absolute one in the processing format. We set up any
3892          * absolute seek above, before taking the stream lock. */
3893         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3894                 &seeksegment)) {
3895           GST_DEBUG_OBJECT (sink,
3896               "Preparing the seek failed after flushing. " "Aborting seek");
3897           res = FALSE;
3898         }
3899       } else {
3900         /* The seek format matches our processing format, no need to ask the
3901          * the subclass to configure the segment. */
3902         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3903             cur_type, cur, stop_type, stop, &update);
3904       }
3905     }
3906     /* Else, no seek event passed, so we're just (re)starting the
3907        current segment. */
3908   }
3909
3910   if (res) {
3911     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3912         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3913         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3914
3915     /* do the seek, segment.last_stop contains the new position. */
3916     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3917   }
3918
3919
3920   if (flush) {
3921     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3922     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3923     gst_base_sink_flush_stop (sink, pad);
3924   } else if (res && sink->running) {
3925     /* we are running the current segment and doing a non-flushing seek,
3926      * close the segment first based on the last_stop. */
3927     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3928         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3929   }
3930
3931   /* The subclass must have converted the segment to the processing format
3932    * by now */
3933   if (res && seeksegment.format != dest_format) {
3934     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3935         "in the correct format. Aborting seek.");
3936     res = FALSE;
3937   }
3938
3939   /* if successfull seek, we update our real segment and push
3940    * out the new segment. */
3941   if (res) {
3942     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3943
3944     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3945       gst_element_post_message (GST_ELEMENT (sink),
3946           gst_message_new_segment_start (GST_OBJECT (sink),
3947               sink->segment.format, sink->segment.last_stop));
3948     }
3949   }
3950
3951   sink->priv->discont = TRUE;
3952   sink->running = TRUE;
3953
3954   GST_PAD_STREAM_UNLOCK (pad);
3955
3956   return res;
3957 }
3958
3959 static void
3960 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3961     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3962     gboolean flush, gboolean intermediate)
3963 {
3964   GST_OBJECT_LOCK (sink);
3965   pending->seqnum = seqnum;
3966   pending->format = format;
3967   pending->amount = amount;
3968   pending->position = 0;
3969   pending->rate = rate;
3970   pending->flush = flush;
3971   pending->intermediate = intermediate;
3972   pending->valid = TRUE;
3973   /* flush invalidates the current stepping segment */
3974   if (flush)
3975     current->valid = FALSE;
3976   GST_OBJECT_UNLOCK (sink);
3977 }
3978
3979 static gboolean
3980 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3981 {
3982   GstBaseSinkPrivate *priv;
3983   GstBaseSinkClass *bclass;
3984   gboolean flush, intermediate;
3985   gdouble rate;
3986   GstFormat format;
3987   guint64 amount;
3988   guint seqnum;
3989   GstStepInfo *pending, *current;
3990   GstMessage *message;
3991
3992   bclass = GST_BASE_SINK_GET_CLASS (sink);
3993   priv = sink->priv;
3994
3995   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3996
3997   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3998   seqnum = gst_event_get_seqnum (event);
3999
4000   pending = &priv->pending_step;
4001   current = &priv->current_step;
4002
4003   /* post message first */
4004   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4005       amount, rate, flush, intermediate);
4006   gst_message_set_seqnum (message, seqnum);
4007   gst_element_post_message (GST_ELEMENT (sink), message);
4008
4009   if (flush) {
4010     /* we need to call ::unlock before locking PREROLL_LOCK
4011      * since we lock it before going into ::render */
4012     if (bclass->unlock)
4013       bclass->unlock (sink);
4014
4015     GST_BASE_SINK_PREROLL_LOCK (sink);
4016     /* now that we have the PREROLL lock, clear our unlock request */
4017     if (bclass->unlock_stop)
4018       bclass->unlock_stop (sink);
4019
4020     /* update the stepinfo and make it valid */
4021     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4022         intermediate);
4023
4024     if (sink->priv->async_enabled) {
4025       /* and we need to commit our state again on the next
4026        * prerolled buffer */
4027       sink->playing_async = TRUE;
4028       priv->pending_step.need_preroll = TRUE;
4029       sink->need_preroll = FALSE;
4030       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4031     } else {
4032       sink->priv->have_latency = TRUE;
4033       sink->need_preroll = FALSE;
4034     }
4035     priv->current_sstart = GST_CLOCK_TIME_NONE;
4036     priv->current_sstop = GST_CLOCK_TIME_NONE;
4037     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4038     priv->call_preroll = TRUE;
4039     gst_base_sink_set_last_buffer (sink, NULL);
4040     gst_base_sink_reset_qos (sink);
4041
4042     if (sink->clock_id) {
4043       gst_clock_id_unschedule (sink->clock_id);
4044     }
4045
4046     if (sink->have_preroll) {
4047       GST_DEBUG_OBJECT (sink, "signal waiter");
4048       priv->step_unlock = TRUE;
4049       GST_BASE_SINK_PREROLL_SIGNAL (sink);
4050     }
4051     GST_BASE_SINK_PREROLL_UNLOCK (sink);
4052   } else {
4053     /* update the stepinfo and make it valid */
4054     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4055         intermediate);
4056   }
4057
4058   return TRUE;
4059 }
4060
4061 /* with STREAM_LOCK
4062  */
4063 static void
4064 gst_base_sink_loop (GstPad * pad)
4065 {
4066   GstBaseSink *basesink;
4067   GstBuffer *buf = NULL;
4068   GstFlowReturn result;
4069   guint blocksize;
4070   guint64 offset;
4071
4072   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4073
4074   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4075
4076   if ((blocksize = basesink->priv->blocksize) == 0)
4077     blocksize = -1;
4078
4079   offset = basesink->segment.last_stop;
4080
4081   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4082       offset, blocksize);
4083
4084   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4085   if (G_UNLIKELY (result != GST_FLOW_OK))
4086     goto paused;
4087
4088   if (G_UNLIKELY (buf == NULL))
4089     goto no_buffer;
4090
4091   offset += gst_buffer_get_size (buf);
4092
4093   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4094
4095   GST_BASE_SINK_PREROLL_LOCK (basesink);
4096   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4097   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4098   if (G_UNLIKELY (result != GST_FLOW_OK))
4099     goto paused;
4100
4101   return;
4102
4103   /* ERRORS */
4104 paused:
4105   {
4106     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4107         gst_flow_get_name (result));
4108     gst_pad_pause_task (pad);
4109     if (result == GST_FLOW_UNEXPECTED) {
4110       /* perform EOS logic */
4111       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4112         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4113             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4114                 basesink->segment.format, basesink->segment.last_stop));
4115       } else {
4116         gst_base_sink_event (pad, gst_event_new_eos ());
4117       }
4118     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4119       /* for fatal errors we post an error message, post the error
4120        * first so the app knows about the error first. 
4121        * wrong-state is not a fatal error because it happens due to
4122        * flushing and posting an error message in that case is the
4123        * wrong thing to do, e.g. when basesrc is doing a flushing
4124        * seek. */
4125       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4126           (_("Internal data stream error.")),
4127           ("stream stopped, reason %s", gst_flow_get_name (result)));
4128       gst_base_sink_event (pad, gst_event_new_eos ());
4129     }
4130     return;
4131   }
4132 no_buffer:
4133   {
4134     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4135     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4136         (_("Internal data flow error.")), ("element returned NULL buffer"));
4137     result = GST_FLOW_ERROR;
4138     goto paused;
4139   }
4140 }
4141
4142 static gboolean
4143 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4144     gboolean flushing)
4145 {
4146   GstBaseSinkClass *bclass;
4147
4148   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4149
4150   if (flushing) {
4151     /* unlock any subclasses, we need to do this before grabbing the
4152      * PREROLL_LOCK since we hold this lock before going into ::render. */
4153     if (bclass->unlock)
4154       bclass->unlock (basesink);
4155   }
4156
4157   GST_BASE_SINK_PREROLL_LOCK (basesink);
4158   basesink->flushing = flushing;
4159   if (flushing) {
4160     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4161     if (bclass->unlock_stop)
4162       bclass->unlock_stop (basesink);
4163
4164     /* set need_preroll before we unblock the clock. If the clock is unblocked
4165      * before timing out, we can reuse the buffer for preroll. */
4166     basesink->need_preroll = TRUE;
4167
4168     /* step 2, unblock clock sync (if any) or any other blocking thing */
4169     if (basesink->clock_id) {
4170       gst_clock_id_unschedule (basesink->clock_id);
4171     }
4172
4173     /* flush out the data thread if it's locked in finish_preroll, this will
4174      * also flush out the EOS state */
4175     GST_DEBUG_OBJECT (basesink,
4176         "flushing out data thread, need preroll to TRUE");
4177     gst_base_sink_preroll_queue_flush (basesink, pad);
4178   }
4179   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4180
4181   return TRUE;
4182 }
4183
4184 static gboolean
4185 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4186 {
4187   gboolean result;
4188
4189   if (active) {
4190     /* start task */
4191     result = gst_pad_start_task (basesink->sinkpad,
4192         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4193   } else {
4194     /* step 2, make sure streaming finishes */
4195     result = gst_pad_stop_task (basesink->sinkpad);
4196   }
4197
4198   return result;
4199 }
4200
4201 static gboolean
4202 gst_base_sink_pad_activate (GstPad * pad)
4203 {
4204   gboolean result = FALSE;
4205   GstBaseSink *basesink;
4206
4207   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4208
4209   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4210
4211   gst_base_sink_set_flushing (basesink, pad, FALSE);
4212
4213   /* we need to have the pull mode enabled */
4214   if (!basesink->can_activate_pull) {
4215     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4216     goto fallback;
4217   }
4218
4219   /* check if downstreams supports pull mode at all */
4220   if (!gst_pad_check_pull_range (pad)) {
4221     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4222     goto fallback;
4223   }
4224
4225   /* set the pad mode before starting the task so that it's in the
4226    * correct state for the new thread. also the sink set_caps and get_caps
4227    * function checks this */
4228   basesink->pad_mode = GST_ACTIVATE_PULL;
4229
4230   /* we first try to negotiate a format so that when we try to activate
4231    * downstream, it knows about our format */
4232   if (!gst_base_sink_negotiate_pull (basesink)) {
4233     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4234     goto fallback;
4235   }
4236
4237   /* ok activate now */
4238   if (!gst_pad_activate_pull (pad, TRUE)) {
4239     /* clear any pending caps */
4240     GST_OBJECT_LOCK (basesink);
4241     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4242     GST_OBJECT_UNLOCK (basesink);
4243     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4244     goto fallback;
4245   }
4246
4247   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4248   result = TRUE;
4249   goto done;
4250
4251   /* push mode fallback */
4252 fallback:
4253   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4254   if ((result = gst_pad_activate_push (pad, TRUE))) {
4255     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4256   }
4257
4258 done:
4259   if (!result) {
4260     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4261     gst_base_sink_set_flushing (basesink, pad, TRUE);
4262   }
4263
4264   gst_object_unref (basesink);
4265
4266   return result;
4267 }
4268
4269 static gboolean
4270 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4271 {
4272   gboolean result;
4273   GstBaseSink *basesink;
4274
4275   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4276
4277   if (active) {
4278     if (!basesink->can_activate_push) {
4279       result = FALSE;
4280       basesink->pad_mode = GST_ACTIVATE_NONE;
4281     } else {
4282       result = TRUE;
4283       basesink->pad_mode = GST_ACTIVATE_PUSH;
4284     }
4285   } else {
4286     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4287       g_warning ("Internal GStreamer activation error!!!");
4288       result = FALSE;
4289     } else {
4290       gst_base_sink_set_flushing (basesink, pad, TRUE);
4291       result = TRUE;
4292       basesink->pad_mode = GST_ACTIVATE_NONE;
4293     }
4294   }
4295
4296   gst_object_unref (basesink);
4297
4298   return result;
4299 }
4300
4301 static gboolean
4302 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4303 {
4304   GstCaps *caps;
4305   gboolean result;
4306
4307   result = FALSE;
4308
4309   /* this returns the intersection between our caps and the peer caps. If there
4310    * is no peer, it returns NULL and we can't operate in pull mode so we can
4311    * fail the negotiation. */
4312   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4313   if (caps == NULL || gst_caps_is_empty (caps))
4314     goto no_caps_possible;
4315
4316   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4317
4318   caps = gst_caps_make_writable (caps);
4319   /* get the first (prefered) format */
4320   gst_caps_truncate (caps);
4321   /* try to fixate */
4322   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4323
4324   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4325
4326   if (gst_caps_is_any (caps)) {
4327     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4328         "allowing pull()");
4329     /* neither side has template caps in this case, so they are prepared for
4330        pull() without setcaps() */
4331     result = TRUE;
4332   } else if (gst_caps_is_fixed (caps)) {
4333     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4334       goto could_not_set_caps;
4335
4336     GST_OBJECT_LOCK (basesink);
4337     gst_caps_replace (&basesink->priv->pull_caps, caps);
4338     GST_OBJECT_UNLOCK (basesink);
4339
4340     result = TRUE;
4341   }
4342
4343   gst_caps_unref (caps);
4344
4345   return result;
4346
4347 no_caps_possible:
4348   {
4349     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4350     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4351     if (caps)
4352       gst_caps_unref (caps);
4353     return FALSE;
4354   }
4355 could_not_set_caps:
4356   {
4357     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4358     gst_caps_unref (caps);
4359     return FALSE;
4360   }
4361 }
4362
4363 /* this won't get called until we implement an activate function */
4364 static gboolean
4365 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4366 {
4367   gboolean result = FALSE;
4368   GstBaseSink *basesink;
4369   GstBaseSinkClass *bclass;
4370
4371   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4372   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4373
4374   if (active) {
4375     GstFormat format;
4376     gint64 duration;
4377
4378     /* we mark we have a newsegment here because pull based
4379      * mode works just fine without having a newsegment before the
4380      * first buffer */
4381     format = GST_FORMAT_BYTES;
4382
4383     gst_segment_init (&basesink->segment, format);
4384     gst_segment_init (basesink->clip_segment, format);
4385     GST_OBJECT_LOCK (basesink);
4386     basesink->have_newsegment = TRUE;
4387     GST_OBJECT_UNLOCK (basesink);
4388
4389     /* get the peer duration in bytes */
4390     result = gst_pad_query_peer_duration (pad, &format, &duration);
4391     if (result) {
4392       GST_DEBUG_OBJECT (basesink,
4393           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4394       gst_segment_set_duration (basesink->clip_segment, format, duration);
4395       gst_segment_set_duration (&basesink->segment, format, duration);
4396     } else {
4397       GST_DEBUG_OBJECT (basesink, "unknown duration");
4398     }
4399
4400     if (bclass->activate_pull)
4401       result = bclass->activate_pull (basesink, TRUE);
4402     else
4403       result = FALSE;
4404
4405     if (!result)
4406       goto activate_failed;
4407
4408   } else {
4409     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4410       g_warning ("Internal GStreamer activation error!!!");
4411       result = FALSE;
4412     } else {
4413       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4414       if (bclass->activate_pull)
4415         result &= bclass->activate_pull (basesink, FALSE);
4416       basesink->pad_mode = GST_ACTIVATE_NONE;
4417       /* clear any pending caps */
4418       GST_OBJECT_LOCK (basesink);
4419       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4420       GST_OBJECT_UNLOCK (basesink);
4421     }
4422   }
4423   gst_object_unref (basesink);
4424
4425   return result;
4426
4427   /* ERRORS */
4428 activate_failed:
4429   {
4430     /* reset, as starting the thread failed */
4431     basesink->pad_mode = GST_ACTIVATE_NONE;
4432
4433     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4434     return FALSE;
4435   }
4436 }
4437
4438 /* send an event to our sinkpad peer. */
4439 static gboolean
4440 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4441 {
4442   GstPad *pad;
4443   GstBaseSink *basesink = GST_BASE_SINK (element);
4444   gboolean forward, result = TRUE;
4445   GstActivateMode mode;
4446
4447   GST_OBJECT_LOCK (element);
4448   /* get the pad and the scheduling mode */
4449   pad = gst_object_ref (basesink->sinkpad);
4450   mode = basesink->pad_mode;
4451   GST_OBJECT_UNLOCK (element);
4452
4453   /* only push UPSTREAM events upstream */
4454   forward = GST_EVENT_IS_UPSTREAM (event);
4455
4456   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4457       event);
4458
4459   switch (GST_EVENT_TYPE (event)) {
4460     case GST_EVENT_LATENCY:
4461     {
4462       GstClockTime latency;
4463
4464       gst_event_parse_latency (event, &latency);
4465
4466       /* store the latency. We use this to adjust the running_time before syncing
4467        * it to the clock. */
4468       GST_OBJECT_LOCK (element);
4469       basesink->priv->latency = latency;
4470       if (!basesink->priv->have_latency)
4471         forward = FALSE;
4472       GST_OBJECT_UNLOCK (element);
4473       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4474           GST_TIME_ARGS (latency));
4475
4476       /* We forward this event so that all elements know about the global pipeline
4477        * latency. This is interesting for an element when it wants to figure out
4478        * when a particular piece of data will be rendered. */
4479       break;
4480     }
4481     case GST_EVENT_SEEK:
4482       /* in pull mode we will execute the seek */
4483       if (mode == GST_ACTIVATE_PULL)
4484         result = gst_base_sink_perform_seek (basesink, pad, event);
4485       break;
4486     case GST_EVENT_STEP:
4487       result = gst_base_sink_perform_step (basesink, pad, event);
4488       forward = FALSE;
4489       break;
4490     default:
4491       break;
4492   }
4493
4494   if (forward) {
4495     result = gst_pad_push_event (pad, event);
4496   } else {
4497     /* not forwarded, unref the event */
4498     gst_event_unref (event);
4499   }
4500
4501   gst_object_unref (pad);
4502   return result;
4503 }
4504
4505 static gboolean
4506 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4507     gint64 * cur, gboolean * upstream)
4508 {
4509   GstClock *clock = NULL;
4510   gboolean res = FALSE;
4511   GstFormat oformat, tformat;
4512   GstSegment *segment;
4513   GstClockTime now, latency;
4514   GstClockTimeDiff base;
4515   gint64 time, accum, duration;
4516   gdouble rate;
4517   gint64 last;
4518   gboolean last_seen, with_clock, in_paused;
4519
4520   GST_OBJECT_LOCK (basesink);
4521   /* we can only get the segment when we are not NULL or READY */
4522   if (!basesink->have_newsegment)
4523     goto wrong_state;
4524
4525   in_paused = FALSE;
4526   /* when not in PLAYING or when we're busy with a state change, we
4527    * cannot read from the clock so we report time based on the
4528    * last seen timestamp. */
4529   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4530       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4531     in_paused = TRUE;
4532   }
4533
4534   /* we don't use the clip segment in pull mode, when seeking we update the
4535    * main segment directly with the new segment values without it having to be
4536    * activated by the rendering after preroll */
4537   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4538     segment = basesink->clip_segment;
4539   else
4540     segment = &basesink->segment;
4541
4542   /* our intermediate time format */
4543   tformat = GST_FORMAT_TIME;
4544   /* get the format in the segment */
4545   oformat = segment->format;
4546
4547   /* report with last seen position when EOS */
4548   last_seen = basesink->eos;
4549
4550   /* assume we will use the clock for getting the current position */
4551   with_clock = TRUE;
4552   if (basesink->sync == FALSE)
4553     with_clock = FALSE;
4554
4555   /* and we need a clock */
4556   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4557     with_clock = FALSE;
4558   else
4559     gst_object_ref (clock);
4560
4561   /* collect all data we need holding the lock */
4562   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4563     time = segment->time;
4564   else
4565     time = 0;
4566
4567   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4568     duration = segment->stop - segment->start;
4569   else
4570     duration = 0;
4571
4572   accum = segment->accum;
4573   rate = segment->rate * segment->applied_rate;
4574   latency = basesink->priv->latency;
4575
4576   if (oformat == GST_FORMAT_TIME) {
4577     gint64 start, stop;
4578
4579     start = basesink->priv->current_sstart;
4580     stop = basesink->priv->current_sstop;
4581
4582     if (in_paused) {
4583       /* in paused we use the last position as a lower bound */
4584       if (stop == -1 || segment->rate > 0.0)
4585         last = start;
4586       else
4587         last = stop;
4588     } else {
4589       /* in playing, use last stop time as upper bound */
4590       if (start == -1 || segment->rate > 0.0)
4591         last = stop;
4592       else
4593         last = start;
4594     }
4595   } else {
4596     /* convert last stop to stream time */
4597     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4598   }
4599
4600   if (in_paused) {
4601     /* in paused, use start_time */
4602     base = GST_ELEMENT_START_TIME (basesink);
4603     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4604         GST_TIME_ARGS (base));
4605   } else if (with_clock) {
4606     /* else use clock when needed */
4607     base = GST_ELEMENT_CAST (basesink)->base_time;
4608     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4609         GST_TIME_ARGS (base));
4610   } else {
4611     /* else, no sync or clock -> no base time */
4612     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4613     base = -1;
4614   }
4615
4616   /* no base, we can't calculate running_time, use last seem timestamp to report
4617    * time */
4618   if (base == -1)
4619     last_seen = TRUE;
4620
4621   /* need to release the object lock before we can get the time,
4622    * a clock might take the LOCK of the provider, which could be
4623    * a basesink subclass. */
4624   GST_OBJECT_UNLOCK (basesink);
4625
4626   if (last_seen) {
4627     /* in EOS or when no valid stream_time, report the value of last seen
4628      * timestamp */
4629     if (last == -1) {
4630       /* no timestamp, we need to ask upstream */
4631       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4632       res = FALSE;
4633       *upstream = TRUE;
4634       goto done;
4635     }
4636     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4637         GST_TIME_ARGS (last));
4638     *cur = last;
4639   } else {
4640     if (oformat != tformat) {
4641       /* convert accum, time and duration to time */
4642       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4643               &accum))
4644         goto convert_failed;
4645       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4646               &tformat, &duration))
4647         goto convert_failed;
4648       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4649               &time))
4650         goto convert_failed;
4651       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4652               &last))
4653         goto convert_failed;
4654
4655       /* assume time format from now on */
4656       oformat = tformat;
4657     }
4658
4659     if (!in_paused && with_clock) {
4660       now = gst_clock_get_time (clock);
4661     } else {
4662       now = base;
4663       base = 0;
4664     }
4665
4666     /* subtract base time and accumulated time from the clock time.
4667      * Make sure we don't go negative. This is the current time in
4668      * the segment which we need to scale with the combined
4669      * rate and applied rate. */
4670     base += accum;
4671     base += latency;
4672     if (GST_CLOCK_DIFF (base, now) < 0)
4673       base = now;
4674
4675     /* for negative rates we need to count back from the segment
4676      * duration. */
4677     if (rate < 0.0)
4678       time += duration;
4679
4680     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4681
4682     if (in_paused) {
4683       /* never report less than segment values in paused */
4684       if (last != -1)
4685         *cur = MAX (last, *cur);
4686     } else {
4687       /* never report more than last seen position in playing */
4688       if (last != -1)
4689         *cur = MIN (last, *cur);
4690     }
4691
4692     GST_DEBUG_OBJECT (basesink,
4693         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4694         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4695         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4696         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4697   }
4698
4699   if (oformat != format) {
4700     /* convert to final format */
4701     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4702       goto convert_failed;
4703   }
4704
4705   res = TRUE;
4706
4707 done:
4708   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4709       res, GST_TIME_ARGS (*cur));
4710
4711   if (clock)
4712     gst_object_unref (clock);
4713
4714   return res;
4715
4716   /* special cases */
4717 wrong_state:
4718   {
4719     /* in NULL or READY we always return FALSE and -1 */
4720     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4721     res = FALSE;
4722     *cur = -1;
4723     GST_OBJECT_UNLOCK (basesink);
4724     goto done;
4725   }
4726 convert_failed:
4727   {
4728     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4729     *upstream = TRUE;
4730     res = FALSE;
4731     goto done;
4732   }
4733 }
4734
4735 static gboolean
4736 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4737     gint64 * dur, gboolean * upstream)
4738 {
4739   gboolean res = FALSE;
4740
4741   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4742     GstFormat uformat = GST_FORMAT_BYTES;
4743     gint64 uduration;
4744
4745     /* get the duration in bytes, in pull mode that's all we are sure to
4746      * know. We have to explicitly get this value from upstream instead of
4747      * using our cached value because it might change. Duration caching
4748      * should be done at a higher level. */
4749     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4750     if (res) {
4751       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4752       if (format != uformat) {
4753         /* convert to the requested format */
4754         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4755             &format, dur);
4756       } else {
4757         *dur = uduration;
4758       }
4759     }
4760     *upstream = FALSE;
4761   } else {
4762     *upstream = TRUE;
4763   }
4764
4765   return res;
4766 }
4767
4768 static const GstQueryType *
4769 gst_base_sink_get_query_types (GstElement * element)
4770 {
4771   static const GstQueryType query_types[] = {
4772     GST_QUERY_DURATION,
4773     GST_QUERY_POSITION,
4774     GST_QUERY_SEGMENT,
4775     GST_QUERY_LATENCY,
4776     0
4777   };
4778
4779   return query_types;
4780 }
4781
4782 static gboolean
4783 gst_base_sink_query (GstElement * element, GstQuery * query)
4784 {
4785   gboolean res = FALSE;
4786
4787   GstBaseSink *basesink = GST_BASE_SINK (element);
4788
4789   switch (GST_QUERY_TYPE (query)) {
4790     case GST_QUERY_POSITION:
4791     {
4792       gint64 cur = 0;
4793       GstFormat format;
4794       gboolean upstream = FALSE;
4795
4796       gst_query_parse_position (query, &format, NULL);
4797
4798       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4799           gst_format_get_name (format));
4800
4801       /* first try to get the position based on the clock */
4802       if ((res =
4803               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4804         gst_query_set_position (query, format, cur);
4805       } else if (upstream) {
4806         /* fallback to peer query */
4807         res = gst_pad_peer_query (basesink->sinkpad, query);
4808       }
4809       if (!res) {
4810         /* we can handle a few things if upstream failed */
4811         if (format == GST_FORMAT_PERCENT) {
4812           gint64 dur = 0;
4813           GstFormat uformat = GST_FORMAT_TIME;
4814
4815           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4816               &upstream);
4817           if (!res && upstream) {
4818             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4819                 &cur);
4820           }
4821           if (res) {
4822             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4823                 &upstream);
4824             if (!res && upstream) {
4825               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4826                   &dur);
4827             }
4828           }
4829           if (res) {
4830             gint64 pos;
4831
4832             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4833                 dur);
4834             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4835           }
4836         }
4837       }
4838       break;
4839     }
4840     case GST_QUERY_DURATION:
4841     {
4842       gint64 dur = 0;
4843       GstFormat format;
4844       gboolean upstream = FALSE;
4845
4846       gst_query_parse_duration (query, &format, NULL);
4847
4848       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4849           gst_format_get_name (format));
4850
4851       if ((res =
4852               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4853         gst_query_set_duration (query, format, dur);
4854       } else if (upstream) {
4855         /* fallback to peer query */
4856         res = gst_pad_peer_query (basesink->sinkpad, query);
4857       }
4858       if (!res) {
4859         /* we can handle a few things if upstream failed */
4860         if (format == GST_FORMAT_PERCENT) {
4861           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4862               GST_FORMAT_PERCENT_MAX);
4863           res = TRUE;
4864         }
4865       }
4866       break;
4867     }
4868     case GST_QUERY_LATENCY:
4869     {
4870       gboolean live, us_live;
4871       GstClockTime min, max;
4872
4873       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4874                   &max))) {
4875         gst_query_set_latency (query, live, min, max);
4876       }
4877       break;
4878     }
4879     case GST_QUERY_JITTER:
4880       break;
4881     case GST_QUERY_RATE:
4882       /* gst_query_set_rate (query, basesink->segment_rate); */
4883       res = TRUE;
4884       break;
4885     case GST_QUERY_SEGMENT:
4886     {
4887       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4888         gst_query_set_segment (query, basesink->segment.rate,
4889             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4890         res = TRUE;
4891       } else {
4892         res = gst_pad_peer_query (basesink->sinkpad, query);
4893       }
4894       break;
4895     }
4896     case GST_QUERY_SEEKING:
4897     case GST_QUERY_CONVERT:
4898     case GST_QUERY_FORMATS:
4899     default:
4900       res = gst_pad_peer_query (basesink->sinkpad, query);
4901       break;
4902   }
4903   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4904       GST_QUERY_TYPE_NAME (query), res);
4905   return res;
4906 }
4907
4908 static GstStateChangeReturn
4909 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4910 {
4911   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4912   GstBaseSink *basesink = GST_BASE_SINK (element);
4913   GstBaseSinkClass *bclass;
4914   GstBaseSinkPrivate *priv;
4915
4916   priv = basesink->priv;
4917
4918   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4919
4920   switch (transition) {
4921     case GST_STATE_CHANGE_NULL_TO_READY:
4922       if (bclass->start)
4923         if (!bclass->start (basesink))
4924           goto start_failed;
4925       break;
4926     case GST_STATE_CHANGE_READY_TO_PAUSED:
4927       /* need to complete preroll before this state change completes, there
4928        * is no data flow in READY so we can safely assume we need to preroll. */
4929       GST_BASE_SINK_PREROLL_LOCK (basesink);
4930       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4931       basesink->have_newsegment = FALSE;
4932       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4933       gst_segment_init (basesink->clip_segment, GST_FORMAT_UNDEFINED);
4934       basesink->offset = 0;
4935       basesink->have_preroll = FALSE;
4936       priv->step_unlock = FALSE;
4937       basesink->need_preroll = TRUE;
4938       basesink->playing_async = TRUE;
4939       priv->current_sstart = GST_CLOCK_TIME_NONE;
4940       priv->current_sstop = GST_CLOCK_TIME_NONE;
4941       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4942       priv->latency = 0;
4943       basesink->eos = FALSE;
4944       priv->received_eos = FALSE;
4945       gst_base_sink_reset_qos (basesink);
4946       priv->commited = FALSE;
4947       priv->call_preroll = TRUE;
4948       priv->current_step.valid = FALSE;
4949       priv->pending_step.valid = FALSE;
4950       if (priv->async_enabled) {
4951         GST_DEBUG_OBJECT (basesink, "doing async state change");
4952         /* when async enabled, post async-start message and return ASYNC from
4953          * the state change function */
4954         ret = GST_STATE_CHANGE_ASYNC;
4955         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4956             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4957       } else {
4958         priv->have_latency = TRUE;
4959       }
4960       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4961       break;
4962     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4963       GST_BASE_SINK_PREROLL_LOCK (basesink);
4964       if (!gst_base_sink_needs_preroll (basesink)) {
4965         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4966         /* no preroll needed anymore now. */
4967         basesink->playing_async = FALSE;
4968         basesink->need_preroll = FALSE;
4969         if (basesink->eos) {
4970           GstMessage *message;
4971
4972           /* need to post EOS message here */
4973           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4974           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4975           gst_message_set_seqnum (message, basesink->priv->seqnum);
4976           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4977         } else {
4978           GST_DEBUG_OBJECT (basesink, "signal preroll");
4979           GST_BASE_SINK_PREROLL_SIGNAL (basesink);
4980         }
4981       } else {
4982         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4983         basesink->need_preroll = TRUE;
4984         basesink->playing_async = TRUE;
4985         priv->call_preroll = TRUE;
4986         priv->commited = FALSE;
4987         if (priv->async_enabled) {
4988           GST_DEBUG_OBJECT (basesink, "doing async state change");
4989           ret = GST_STATE_CHANGE_ASYNC;
4990           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4991               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4992         }
4993       }
4994       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4995       break;
4996     default:
4997       break;
4998   }
4999
5000   {
5001     GstStateChangeReturn bret;
5002
5003     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5004     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5005       goto activate_failed;
5006   }
5007
5008   switch (transition) {
5009     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5010       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5011       /* FIXME, make sure we cannot enter _render first */
5012
5013       /* we need to call ::unlock before locking PREROLL_LOCK
5014        * since we lock it before going into ::render */
5015       if (bclass->unlock)
5016         bclass->unlock (basesink);
5017
5018       GST_BASE_SINK_PREROLL_LOCK (basesink);
5019       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5020       /* now that we have the PREROLL lock, clear our unlock request */
5021       if (bclass->unlock_stop)
5022         bclass->unlock_stop (basesink);
5023
5024       /* we need preroll again and we set the flag before unlocking the clockid
5025        * because if the clockid is unlocked before a current buffer expired, we
5026        * can use that buffer to preroll with */
5027       basesink->need_preroll = TRUE;
5028
5029       if (basesink->clock_id) {
5030         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5031         gst_clock_id_unschedule (basesink->clock_id);
5032       }
5033
5034       /* if we don't have a preroll buffer we need to wait for a preroll and
5035        * return ASYNC. */
5036       if (!gst_base_sink_needs_preroll (basesink)) {
5037         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5038         basesink->playing_async = FALSE;
5039       } else {
5040         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5041           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5042           ret = GST_STATE_CHANGE_SUCCESS;
5043         } else {
5044           GST_DEBUG_OBJECT (basesink,
5045               "PLAYING to PAUSED, we are not prerolled");
5046           basesink->playing_async = TRUE;
5047           priv->commited = FALSE;
5048           priv->call_preroll = TRUE;
5049           if (priv->async_enabled) {
5050             GST_DEBUG_OBJECT (basesink, "doing async state change");
5051             ret = GST_STATE_CHANGE_ASYNC;
5052             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5053                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5054                     FALSE));
5055           }
5056         }
5057       }
5058       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5059           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5060
5061       gst_base_sink_reset_qos (basesink);
5062       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
5063       break;
5064     case GST_STATE_CHANGE_PAUSED_TO_READY:
5065       GST_BASE_SINK_PREROLL_LOCK (basesink);
5066       /* start by reseting our position state with the object lock so that the
5067        * position query gets the right idea. We do this before we post the
5068        * messages so that the message handlers pick this up. */
5069       GST_OBJECT_LOCK (basesink);
5070       basesink->have_newsegment = FALSE;
5071       priv->current_sstart = GST_CLOCK_TIME_NONE;
5072       priv->current_sstop = GST_CLOCK_TIME_NONE;
5073       priv->have_latency = FALSE;
5074       if (priv->cached_clock_id) {
5075         gst_clock_id_unref (priv->cached_clock_id);
5076         priv->cached_clock_id = NULL;
5077       }
5078       GST_OBJECT_UNLOCK (basesink);
5079
5080       gst_base_sink_set_last_buffer (basesink, NULL);
5081       priv->call_preroll = FALSE;
5082
5083       if (!priv->commited) {
5084         if (priv->async_enabled) {
5085           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5086
5087           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5088               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5089                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5090
5091           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5092               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5093         }
5094         priv->commited = TRUE;
5095       } else {
5096         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5097       }
5098       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
5099       break;
5100     case GST_STATE_CHANGE_READY_TO_NULL:
5101       if (bclass->stop) {
5102         if (!bclass->stop (basesink)) {
5103           GST_WARNING_OBJECT (basesink, "failed to stop");
5104         }
5105       }
5106       gst_base_sink_set_last_buffer (basesink, NULL);
5107       priv->call_preroll = FALSE;
5108       break;
5109     default:
5110       break;
5111   }
5112
5113   return ret;
5114
5115   /* ERRORS */
5116 start_failed:
5117   {
5118     GST_DEBUG_OBJECT (basesink, "failed to start");
5119     return GST_STATE_CHANGE_FAILURE;
5120   }
5121 activate_failed:
5122   {
5123     GST_DEBUG_OBJECT (basesink,
5124         "element failed to change states -- activation problem?");
5125     return GST_STATE_CHANGE_FAILURE;
5126   }
5127 }