Remove pad_alloc, this can now be done better
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its class_init function, like so:
40  * |[
41  * static void
42  * my_element_class_init (GstMyElementClass *klass)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * The #GstBaseSinkClass.unlock() method is called when the elements should
109  * unblock any blocking operations they perform in the
110  * #GstBaseSinkClass.render() method. This is mostly useful when the
111  * #GstBaseSinkClass.render() method performs a blocking write on a file
112  * descriptor, for example.
113  *
114  * The #GstBaseSink:max-lateness property affects how the sink deals with
115  * buffers that arrive too late in the sink. A buffer arrives too late in the
116  * sink when the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the
122  * #GstBaseSinkClass.get_times() method does not return a valid start time or
123  * max-lateness is set to -1 (the default).
124  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
125  * max-lateness value.
126  *
127  * The #GstBaseSink:qos property will enable the quality-of-service features of
128  * the basesink which gather statistics about the real-time performance of the
129  * clock synchronisation. For each buffer received in the sink, statistics are
130  * gathered and a QOS event is sent upstream with these numbers. This
131  * information can then be used by upstream elements to reduce their processing
132  * rate, for example.
133  *
134  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
135  * sink to never perform an ASYNC state change. This feature is mostly usable
136  * when dealing with non-synchronized streams or sparse streams.
137  *
138  * Last reviewed on 2007-08-29 (0.10.15)
139  */
140
141 #ifdef HAVE_CONFIG_H
142 #  include "config.h"
143 #endif
144
145 #include <gst/gst_private.h>
146
147 #include "gstbasesink.h"
148 #include <gst/gstmarshal.h>
149 #include <gst/gst-i18n-lib.h>
150
151 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
152 #define GST_CAT_DEFAULT gst_base_sink_debug
153
154 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
155    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
156
157 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
158
159 typedef struct
160 {
161   gboolean valid;               /* if this info is valid */
162   guint32 seqnum;               /* the seqnum of the STEP event */
163   GstFormat format;             /* the format of the amount */
164   guint64 amount;               /* the total amount of data to skip */
165   guint64 position;             /* the position in the stepped data */
166   guint64 duration;             /* the duration in time of the skipped data */
167   guint64 start;                /* running_time of the start */
168   gdouble rate;                 /* rate of skipping */
169   gdouble start_rate;           /* rate before skipping */
170   guint64 start_start;          /* start position skipping */
171   guint64 start_stop;           /* stop position skipping */
172   gboolean flush;               /* if this was a flushing step */
173   gboolean intermediate;        /* if this is an intermediate step */
174   gboolean need_preroll;        /* if we need preroll after this step */
175 } GstStepInfo;
176
177 /* FIXME, some stuff in ABI.data and other in Private...
178  * Make up your mind please.
179  */
180 struct _GstBaseSinkPrivate
181 {
182   gint qos_enabled;             /* ATOMIC */
183   gboolean async_enabled;
184   GstClockTimeDiff ts_offset;
185   GstClockTime render_delay;
186
187   /* start, stop of current buffer, stream time, used to report position */
188   GstClockTime current_sstart;
189   GstClockTime current_sstop;
190
191   /* start, stop and jitter of current buffer, running time */
192   GstClockTime current_rstart;
193   GstClockTime current_rstop;
194   GstClockTimeDiff current_jitter;
195   /* the running time of the previous buffer */
196   GstClockTime prev_rstart;
197
198   /* EOS sync time in running time */
199   GstClockTime eos_rtime;
200
201   /* last buffer that arrived in time, running time */
202   GstClockTime last_render_time;
203   /* when the last buffer left the sink, running time */
204   GstClockTime last_left;
205
206   /* running averages go here these are done on running time */
207   GstClockTime avg_pt;
208   GstClockTime avg_duration;
209   gdouble avg_rate;
210   GstClockTime avg_in_diff;
211
212   /* these are done on system time. avg_jitter and avg_render are
213    * compared to eachother to see if the rendering time takes a
214    * huge amount of the processing, If so we are flooded with
215    * buffers. */
216   GstClockTime last_left_systime;
217   GstClockTime avg_jitter;
218   GstClockTime start, stop;
219   GstClockTime avg_render;
220
221   /* number of rendered and dropped frames */
222   guint64 rendered;
223   guint64 dropped;
224
225   /* latency stuff */
226   GstClockTime latency;
227
228   /* if we already commited the state */
229   gboolean commited;
230
231   /* when we received EOS */
232   gboolean received_eos;
233
234   /* when we are prerolled and able to report latency */
235   gboolean have_latency;
236
237   /* the last buffer we prerolled or rendered. Useful for making snapshots */
238   gint enable_last_buffer;      /* atomic */
239   GstBuffer *last_buffer;
240
241   /* caps for pull based scheduling */
242   GstCaps *pull_caps;
243
244   /* blocksize for pulling */
245   guint blocksize;
246
247   gboolean discont;
248
249   /* seqnum of the stream */
250   guint32 seqnum;
251
252   gboolean call_preroll;
253   gboolean step_unlock;
254
255   /* we have a pending and a current step operation */
256   GstStepInfo current_step;
257   GstStepInfo pending_step;
258
259   /* Cached GstClockID */
260   GstClockID cached_clock_id;
261
262   /* for throttling and QoS */
263   GstClockTime earliest_in_time;
264   GstClockTime throttle_time;
265 };
266
267 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
268
269 /* generic running average, this has a neutral window size */
270 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
271
272 /* the windows for these running averages are experimentally obtained.
273  * possitive values get averaged more while negative values use a small
274  * window so we can react faster to badness. */
275 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
276 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
277
278 enum
279 {
280   _PR_IS_NOTHING = 1 << 0,
281   _PR_IS_BUFFER = 1 << 1,
282   _PR_IS_BUFFERLIST = 1 << 2,
283   _PR_IS_EVENT = 1 << 3
284 } PrivateObjectType;
285
286 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
287 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
288 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
289 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
290
291 /* BaseSink properties */
292
293 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
294 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
295
296 #define DEFAULT_PREROLL_QUEUE_LEN   0
297 #define DEFAULT_SYNC                TRUE
298 #define DEFAULT_MAX_LATENESS        -1
299 #define DEFAULT_QOS                 FALSE
300 #define DEFAULT_ASYNC               TRUE
301 #define DEFAULT_TS_OFFSET           0
302 #define DEFAULT_BLOCKSIZE           4096
303 #define DEFAULT_RENDER_DELAY        0
304 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
305 #define DEFAULT_THROTTLE_TIME       0
306
307 enum
308 {
309   PROP_0,
310   PROP_PREROLL_QUEUE_LEN,
311   PROP_SYNC,
312   PROP_MAX_LATENESS,
313   PROP_QOS,
314   PROP_ASYNC,
315   PROP_TS_OFFSET,
316   PROP_ENABLE_LAST_BUFFER,
317   PROP_LAST_BUFFER,
318   PROP_BLOCKSIZE,
319   PROP_RENDER_DELAY,
320   PROP_THROTTLE_TIME,
321   PROP_LAST
322 };
323
324 static GstElementClass *parent_class = NULL;
325
326 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
327 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
328 static void gst_base_sink_finalize (GObject * object);
329
330 GType
331 gst_base_sink_get_type (void)
332 {
333   static volatile gsize base_sink_type = 0;
334
335   if (g_once_init_enter (&base_sink_type)) {
336     GType _type;
337     static const GTypeInfo base_sink_info = {
338       sizeof (GstBaseSinkClass),
339       NULL,
340       NULL,
341       (GClassInitFunc) gst_base_sink_class_init,
342       NULL,
343       NULL,
344       sizeof (GstBaseSink),
345       0,
346       (GInstanceInitFunc) gst_base_sink_init,
347     };
348
349     _type = g_type_register_static (GST_TYPE_ELEMENT,
350         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
351     g_once_init_leave (&base_sink_type, _type);
352   }
353   return base_sink_type;
354 }
355
356 static void gst_base_sink_set_property (GObject * object, guint prop_id,
357     const GValue * value, GParamSpec * pspec);
358 static void gst_base_sink_get_property (GObject * object, guint prop_id,
359     GValue * value, GParamSpec * pspec);
360
361 static gboolean gst_base_sink_send_event (GstElement * element,
362     GstEvent * event);
363 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
364 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
365
366 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
367 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
368 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
369     GstClockTime * start, GstClockTime * end);
370 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
371     GstPad * pad, gboolean flushing);
372 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
373     gboolean active);
374 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
375     GstSegment * segment);
376 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
377     GstEvent * event, GstSegment * segment);
378
379 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
380     GstStateChange transition);
381
382 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
383 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
384     GstBufferList * list);
385
386 static void gst_base_sink_loop (GstPad * pad);
387 static gboolean gst_base_sink_pad_activate (GstPad * pad);
388 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
389 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
390 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
391
392 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
393 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
394 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
395 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
396
397 /* check if an object was too late */
398 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
399     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
400     GstClockReturn status, GstClockTimeDiff jitter);
401 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
402     guint8 obj_type, GstMiniObject * obj);
403
404 static void
405 gst_base_sink_class_init (GstBaseSinkClass * klass)
406 {
407   GObjectClass *gobject_class;
408   GstElementClass *gstelement_class;
409
410   gobject_class = G_OBJECT_CLASS (klass);
411   gstelement_class = GST_ELEMENT_CLASS (klass);
412
413   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
414       "basesink element");
415
416   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
417
418   parent_class = g_type_class_peek_parent (klass);
419
420   gobject_class->finalize = gst_base_sink_finalize;
421   gobject_class->set_property = gst_base_sink_set_property;
422   gobject_class->get_property = gst_base_sink_get_property;
423
424   /* FIXME, this next value should be configured using an event from the
425    * upstream element, ie, the BUFFER_SIZE event. */
426   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
427       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
428           "Number of buffers to queue during preroll", 0, G_MAXUINT,
429           DEFAULT_PREROLL_QUEUE_LEN,
430           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
431
432   g_object_class_install_property (gobject_class, PROP_SYNC,
433       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
434           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435
436   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
437       g_param_spec_int64 ("max-lateness", "Max Lateness",
438           "Maximum number of nanoseconds that a buffer can be late before it "
439           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
440           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
441
442   g_object_class_install_property (gobject_class, PROP_QOS,
443       g_param_spec_boolean ("qos", "Qos",
444           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   /**
447    * GstBaseSink:async
448    *
449    * If set to #TRUE, the basesink will perform asynchronous state changes.
450    * When set to #FALSE, the sink will not signal the parent when it prerolls.
451    * Use this option when dealing with sparse streams or when synchronisation is
452    * not required.
453    *
454    * Since: 0.10.15
455    */
456   g_object_class_install_property (gobject_class, PROP_ASYNC,
457       g_param_spec_boolean ("async", "Async",
458           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
459           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
460   /**
461    * GstBaseSink:ts-offset
462    *
463    * Controls the final synchronisation, a negative value will render the buffer
464    * earlier while a positive value delays playback. This property can be
465    * used to fix synchronisation in bad files.
466    *
467    * Since: 0.10.15
468    */
469   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
470       g_param_spec_int64 ("ts-offset", "TS Offset",
471           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
472           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
473
474   /**
475    * GstBaseSink:enable-last-buffer
476    *
477    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
478    * reference to the last buffer arrived and the last-buffer property is always
479    * set to NULL. This can be useful if you need buffers to be released as soon
480    * as possible, eg. if you're using a buffer pool.
481    *
482    * Since: 0.10.30
483    */
484   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
485       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
486           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
487           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
488
489   /**
490    * GstBaseSink:last-buffer
491    *
492    * The last buffer that arrived in the sink and was used for preroll or for
493    * rendering. This property can be used to generate thumbnails. This property
494    * can be NULL when the sink has not yet received a bufer.
495    *
496    * Since: 0.10.15
497    */
498   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
499       g_param_spec_boxed ("last-buffer", "Last Buffer",
500           "The last buffer received in the sink", GST_TYPE_BUFFER,
501           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
502   /**
503    * GstBaseSink:blocksize
504    *
505    * The amount of bytes to pull when operating in pull mode.
506    *
507    * Since: 0.10.22
508    */
509   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
510   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
511       g_param_spec_uint ("blocksize", "Block size",
512           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
513           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
514   /**
515    * GstBaseSink:render-delay
516    *
517    * The additional delay between synchronisation and actual rendering of the
518    * media. This property will add additional latency to the device in order to
519    * make other sinks compensate for the delay.
520    *
521    * Since: 0.10.22
522    */
523   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
524       g_param_spec_uint64 ("render-delay", "Render Delay",
525           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
526           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
527   /**
528    * GstBaseSink:throttle-time
529    *
530    * The time to insert between buffers. This property can be used to control
531    * the maximum amount of buffers per second to render. Setting this property
532    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
533    *
534    * Since: 0.10.33
535    */
536   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
537       g_param_spec_uint64 ("throttle-time", "Throttle time",
538           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
539           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
540
541   gstelement_class->change_state =
542       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
543   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
544   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
545   gstelement_class->get_query_types =
546       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
547
548   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
549   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
550   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
551   klass->activate_pull =
552       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
553
554   /* Registering debug symbols for function pointers */
555   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
556   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
557   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
558   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
559   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
560   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
561   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
562   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
563   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
564 }
565
566 static GstCaps *
567 gst_base_sink_pad_getcaps (GstPad * pad)
568 {
569   GstBaseSinkClass *bclass;
570   GstBaseSink *bsink;
571   GstCaps *caps = NULL;
572
573   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
574   bclass = GST_BASE_SINK_GET_CLASS (bsink);
575
576   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
577     /* if we are operating in pull mode we only accept the negotiated caps */
578     GST_OBJECT_LOCK (pad);
579     if ((caps = GST_PAD_CAPS (pad)))
580       gst_caps_ref (caps);
581     GST_OBJECT_UNLOCK (pad);
582   }
583   if (caps == NULL) {
584     if (bclass->get_caps)
585       caps = bclass->get_caps (bsink);
586
587     if (caps == NULL) {
588       GstPadTemplate *pad_template;
589
590       pad_template =
591           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
592           "sink");
593       if (pad_template != NULL) {
594         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
595       }
596     }
597   }
598   gst_object_unref (bsink);
599
600   return caps;
601 }
602
603 static gboolean
604 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
605 {
606   GstBaseSinkClass *bclass;
607   GstBaseSink *bsink;
608   gboolean res = TRUE;
609
610   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
611   bclass = GST_BASE_SINK_GET_CLASS (bsink);
612
613   if (res && bclass->set_caps)
614     res = bclass->set_caps (bsink, caps);
615
616   gst_object_unref (bsink);
617
618   return res;
619 }
620
621 static void
622 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
623 {
624   GstBaseSinkClass *bclass;
625   GstBaseSink *bsink;
626
627   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
628   bclass = GST_BASE_SINK_GET_CLASS (bsink);
629
630   if (bclass->fixate)
631     bclass->fixate (bsink, caps);
632
633   gst_object_unref (bsink);
634 }
635
636 static void
637 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
638 {
639   GstPadTemplate *pad_template;
640   GstBaseSinkPrivate *priv;
641
642   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
643
644   pad_template =
645       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
646   g_return_if_fail (pad_template != NULL);
647
648   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
649
650   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
651   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
652   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
653   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
654   gst_pad_set_activatepush_function (basesink->sinkpad,
655       gst_base_sink_pad_activate_push);
656   gst_pad_set_activatepull_function (basesink->sinkpad,
657       gst_base_sink_pad_activate_pull);
658   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
659   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
660   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
661   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
662
663   basesink->pad_mode = GST_ACTIVATE_NONE;
664   basesink->preroll_lock = g_mutex_new ();
665   basesink->preroll_cond = g_cond_new ();
666   basesink->preroll_queue = g_queue_new ();
667   basesink->clip_segment = gst_segment_new ();
668   priv->have_latency = FALSE;
669
670   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
671   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
672
673   basesink->sync = DEFAULT_SYNC;
674   basesink->max_lateness = DEFAULT_MAX_LATENESS;
675   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
676   priv->async_enabled = DEFAULT_ASYNC;
677   priv->ts_offset = DEFAULT_TS_OFFSET;
678   priv->render_delay = DEFAULT_RENDER_DELAY;
679   priv->blocksize = DEFAULT_BLOCKSIZE;
680   priv->cached_clock_id = NULL;
681   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
682   priv->throttle_time = DEFAULT_THROTTLE_TIME;
683
684   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
685 }
686
687 static void
688 gst_base_sink_finalize (GObject * object)
689 {
690   GstBaseSink *basesink;
691
692   basesink = GST_BASE_SINK (object);
693
694   g_mutex_free (basesink->preroll_lock);
695   g_cond_free (basesink->preroll_cond);
696   g_queue_free (basesink->preroll_queue);
697   gst_segment_free (basesink->clip_segment);
698
699   G_OBJECT_CLASS (parent_class)->finalize (object);
700 }
701
702 /**
703  * gst_base_sink_set_sync:
704  * @sink: the sink
705  * @sync: the new sync value.
706  *
707  * Configures @sink to synchronize on the clock or not. When
708  * @sync is FALSE, incomming samples will be played as fast as
709  * possible. If @sync is TRUE, the timestamps of the incomming
710  * buffers will be used to schedule the exact render time of its
711  * contents.
712  *
713  * Since: 0.10.4
714  */
715 void
716 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
717 {
718   g_return_if_fail (GST_IS_BASE_SINK (sink));
719
720   GST_OBJECT_LOCK (sink);
721   sink->sync = sync;
722   GST_OBJECT_UNLOCK (sink);
723 }
724
725 /**
726  * gst_base_sink_get_sync:
727  * @sink: the sink
728  *
729  * Checks if @sink is currently configured to synchronize against the
730  * clock.
731  *
732  * Returns: TRUE if the sink is configured to synchronize against the clock.
733  *
734  * Since: 0.10.4
735  */
736 gboolean
737 gst_base_sink_get_sync (GstBaseSink * sink)
738 {
739   gboolean res;
740
741   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
742
743   GST_OBJECT_LOCK (sink);
744   res = sink->sync;
745   GST_OBJECT_UNLOCK (sink);
746
747   return res;
748 }
749
750 /**
751  * gst_base_sink_set_max_lateness:
752  * @sink: the sink
753  * @max_lateness: the new max lateness value.
754  *
755  * Sets the new max lateness value to @max_lateness. This value is
756  * used to decide if a buffer should be dropped or not based on the
757  * buffer timestamp and the current clock time. A value of -1 means
758  * an unlimited time.
759  *
760  * Since: 0.10.4
761  */
762 void
763 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
764 {
765   g_return_if_fail (GST_IS_BASE_SINK (sink));
766
767   GST_OBJECT_LOCK (sink);
768   sink->max_lateness = max_lateness;
769   GST_OBJECT_UNLOCK (sink);
770 }
771
772 /**
773  * gst_base_sink_get_max_lateness:
774  * @sink: the sink
775  *
776  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
777  * more details.
778  *
779  * Returns: The maximum time in nanoseconds that a buffer can be late
780  * before it is dropped and not rendered. A value of -1 means an
781  * unlimited time.
782  *
783  * Since: 0.10.4
784  */
785 gint64
786 gst_base_sink_get_max_lateness (GstBaseSink * sink)
787 {
788   gint64 res;
789
790   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
791
792   GST_OBJECT_LOCK (sink);
793   res = sink->max_lateness;
794   GST_OBJECT_UNLOCK (sink);
795
796   return res;
797 }
798
799 /**
800  * gst_base_sink_set_qos_enabled:
801  * @sink: the sink
802  * @enabled: the new qos value.
803  *
804  * Configures @sink to send Quality-of-Service events upstream.
805  *
806  * Since: 0.10.5
807  */
808 void
809 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
810 {
811   g_return_if_fail (GST_IS_BASE_SINK (sink));
812
813   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
814 }
815
816 /**
817  * gst_base_sink_is_qos_enabled:
818  * @sink: the sink
819  *
820  * Checks if @sink is currently configured to send Quality-of-Service events
821  * upstream.
822  *
823  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
824  *
825  * Since: 0.10.5
826  */
827 gboolean
828 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
829 {
830   gboolean res;
831
832   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
833
834   res = g_atomic_int_get (&sink->priv->qos_enabled);
835
836   return res;
837 }
838
839 /**
840  * gst_base_sink_set_async_enabled:
841  * @sink: the sink
842  * @enabled: the new async value.
843  *
844  * Configures @sink to perform all state changes asynchronusly. When async is
845  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
846  * preroll buffer. This feature is usefull if the sink does not synchronize
847  * against the clock or when it is dealing with sparse streams.
848  *
849  * Since: 0.10.15
850  */
851 void
852 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
853 {
854   g_return_if_fail (GST_IS_BASE_SINK (sink));
855
856   GST_BASE_SINK_PREROLL_LOCK (sink);
857   g_atomic_int_set (&sink->priv->async_enabled, enabled);
858   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
859   GST_BASE_SINK_PREROLL_UNLOCK (sink);
860 }
861
862 /**
863  * gst_base_sink_is_async_enabled:
864  * @sink: the sink
865  *
866  * Checks if @sink is currently configured to perform asynchronous state
867  * changes to PAUSED.
868  *
869  * Returns: TRUE if the sink is configured to perform asynchronous state
870  * changes.
871  *
872  * Since: 0.10.15
873  */
874 gboolean
875 gst_base_sink_is_async_enabled (GstBaseSink * sink)
876 {
877   gboolean res;
878
879   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
880
881   res = g_atomic_int_get (&sink->priv->async_enabled);
882
883   return res;
884 }
885
886 /**
887  * gst_base_sink_set_ts_offset:
888  * @sink: the sink
889  * @offset: the new offset
890  *
891  * Adjust the synchronisation of @sink with @offset. A negative value will
892  * render buffers earlier than their timestamp. A positive value will delay
893  * rendering. This function can be used to fix playback of badly timestamped
894  * buffers.
895  *
896  * Since: 0.10.15
897  */
898 void
899 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
900 {
901   g_return_if_fail (GST_IS_BASE_SINK (sink));
902
903   GST_OBJECT_LOCK (sink);
904   sink->priv->ts_offset = offset;
905   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
906   GST_OBJECT_UNLOCK (sink);
907 }
908
909 /**
910  * gst_base_sink_get_ts_offset:
911  * @sink: the sink
912  *
913  * Get the synchronisation offset of @sink.
914  *
915  * Returns: The synchronisation offset.
916  *
917  * Since: 0.10.15
918  */
919 GstClockTimeDiff
920 gst_base_sink_get_ts_offset (GstBaseSink * sink)
921 {
922   GstClockTimeDiff res;
923
924   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
925
926   GST_OBJECT_LOCK (sink);
927   res = sink->priv->ts_offset;
928   GST_OBJECT_UNLOCK (sink);
929
930   return res;
931 }
932
933 /**
934  * gst_base_sink_get_last_buffer:
935  * @sink: the sink
936  *
937  * Get the last buffer that arrived in the sink and was used for preroll or for
938  * rendering. This property can be used to generate thumbnails.
939  *
940  * The #GstCaps on the buffer can be used to determine the type of the buffer.
941  *
942  * Free-function: gst_buffer_unref
943  *
944  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
945  *     This function returns NULL when no buffer has arrived in the sink yet
946  *     or when the sink is not in PAUSED or PLAYING.
947  *
948  * Since: 0.10.15
949  */
950 GstBuffer *
951 gst_base_sink_get_last_buffer (GstBaseSink * sink)
952 {
953   GstBuffer *res;
954
955   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
956
957   GST_OBJECT_LOCK (sink);
958   if ((res = sink->priv->last_buffer))
959     gst_buffer_ref (res);
960   GST_OBJECT_UNLOCK (sink);
961
962   return res;
963 }
964
965 /* with OBJECT_LOCK */
966 static void
967 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
968 {
969   GstBuffer *old;
970
971   old = sink->priv->last_buffer;
972   if (G_LIKELY (old != buffer)) {
973     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
974     if (G_LIKELY (buffer))
975       gst_buffer_ref (buffer);
976     sink->priv->last_buffer = buffer;
977   } else {
978     old = NULL;
979   }
980   /* avoid unreffing with the lock because cleanup code might want to take the
981    * lock too */
982   if (G_LIKELY (old)) {
983     GST_OBJECT_UNLOCK (sink);
984     gst_buffer_unref (old);
985     GST_OBJECT_LOCK (sink);
986   }
987 }
988
989 static void
990 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
991 {
992   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
993     return;
994
995   GST_OBJECT_LOCK (sink);
996   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
997   GST_OBJECT_UNLOCK (sink);
998 }
999
1000 /**
1001  * gst_base_sink_set_last_buffer_enabled:
1002  * @sink: the sink
1003  * @enabled: the new enable-last-buffer value.
1004  *
1005  * Configures @sink to store the last received buffer in the last-buffer
1006  * property.
1007  *
1008  * Since: 0.10.30
1009  */
1010 void
1011 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1012 {
1013   g_return_if_fail (GST_IS_BASE_SINK (sink));
1014
1015   /* Only take lock if we change the value */
1016   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1017           !enabled, enabled) && !enabled) {
1018     GST_OBJECT_LOCK (sink);
1019     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1020     GST_OBJECT_UNLOCK (sink);
1021   }
1022 }
1023
1024 /**
1025  * gst_base_sink_is_last_buffer_enabled:
1026  * @sink: the sink
1027  *
1028  * Checks if @sink is currently configured to store the last received buffer in
1029  * the last-buffer property.
1030  *
1031  * Returns: TRUE if the sink is configured to store the last received buffer.
1032  *
1033  * Since: 0.10.30
1034  */
1035 gboolean
1036 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1037 {
1038   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1039
1040   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1041 }
1042
1043 /**
1044  * gst_base_sink_get_latency:
1045  * @sink: the sink
1046  *
1047  * Get the currently configured latency.
1048  *
1049  * Returns: The configured latency.
1050  *
1051  * Since: 0.10.12
1052  */
1053 GstClockTime
1054 gst_base_sink_get_latency (GstBaseSink * sink)
1055 {
1056   GstClockTime res;
1057
1058   GST_OBJECT_LOCK (sink);
1059   res = sink->priv->latency;
1060   GST_OBJECT_UNLOCK (sink);
1061
1062   return res;
1063 }
1064
1065 /**
1066  * gst_base_sink_query_latency:
1067  * @sink: the sink
1068  * @live: (out) (allow-none): if the sink is live
1069  * @upstream_live: (out) (allow-none): if an upstream element is live
1070  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1071  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1072  *
1073  * Query the sink for the latency parameters. The latency will be queried from
1074  * the upstream elements. @live will be TRUE if @sink is configured to
1075  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1076  * element is live.
1077  *
1078  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1079  * for the latency introduced by the upstream elements by setting the
1080  * @min_latency to a strictly possitive value.
1081  *
1082  * This function is mostly used by subclasses.
1083  *
1084  * Returns: TRUE if the query succeeded.
1085  *
1086  * Since: 0.10.12
1087  */
1088 gboolean
1089 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1090     gboolean * upstream_live, GstClockTime * min_latency,
1091     GstClockTime * max_latency)
1092 {
1093   gboolean l, us_live, res, have_latency;
1094   GstClockTime min, max, render_delay;
1095   GstQuery *query;
1096   GstClockTime us_min, us_max;
1097
1098   /* we are live when we sync to the clock */
1099   GST_OBJECT_LOCK (sink);
1100   l = sink->sync;
1101   have_latency = sink->priv->have_latency;
1102   render_delay = sink->priv->render_delay;
1103   GST_OBJECT_UNLOCK (sink);
1104
1105   /* assume no latency */
1106   min = 0;
1107   max = -1;
1108   us_live = FALSE;
1109
1110   if (have_latency) {
1111     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1112     /* we are ready for a latency query this is when we preroll or when we are
1113      * not async. */
1114     query = gst_query_new_latency ();
1115
1116     /* ask the peer for the latency */
1117     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1118       /* get upstream min and max latency */
1119       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1120
1121       if (us_live) {
1122         /* upstream live, use its latency, subclasses should use these
1123          * values to create the complete latency. */
1124         min = us_min;
1125         max = us_max;
1126       }
1127       if (l) {
1128         /* we need to add the render delay if we are live */
1129         if (min != -1)
1130           min += render_delay;
1131         if (max != -1)
1132           max += render_delay;
1133       }
1134     }
1135     gst_query_unref (query);
1136   } else {
1137     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1138     res = FALSE;
1139   }
1140
1141   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1142   if (!res) {
1143     if (!l) {
1144       res = TRUE;
1145       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1146     } else {
1147       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1148     }
1149   }
1150
1151   if (res) {
1152     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1153         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1154         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1155
1156     if (live)
1157       *live = l;
1158     if (upstream_live)
1159       *upstream_live = us_live;
1160     if (min_latency)
1161       *min_latency = min;
1162     if (max_latency)
1163       *max_latency = max;
1164   }
1165   return res;
1166 }
1167
1168 /**
1169  * gst_base_sink_set_render_delay:
1170  * @sink: a #GstBaseSink
1171  * @delay: the new delay
1172  *
1173  * Set the render delay in @sink to @delay. The render delay is the time
1174  * between actual rendering of a buffer and its synchronisation time. Some
1175  * devices might delay media rendering which can be compensated for with this
1176  * function.
1177  *
1178  * After calling this function, this sink will report additional latency and
1179  * other sinks will adjust their latency to delay the rendering of their media.
1180  *
1181  * This function is usually called by subclasses.
1182  *
1183  * Since: 0.10.21
1184  */
1185 void
1186 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1187 {
1188   GstClockTime old_render_delay;
1189
1190   g_return_if_fail (GST_IS_BASE_SINK (sink));
1191
1192   GST_OBJECT_LOCK (sink);
1193   old_render_delay = sink->priv->render_delay;
1194   sink->priv->render_delay = delay;
1195   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1196       GST_TIME_ARGS (delay));
1197   GST_OBJECT_UNLOCK (sink);
1198
1199   if (delay != old_render_delay) {
1200     GST_DEBUG_OBJECT (sink, "posting latency changed");
1201     gst_element_post_message (GST_ELEMENT_CAST (sink),
1202         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1203   }
1204 }
1205
1206 /**
1207  * gst_base_sink_get_render_delay:
1208  * @sink: a #GstBaseSink
1209  *
1210  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1211  * information about the render delay.
1212  *
1213  * Returns: the render delay of @sink.
1214  *
1215  * Since: 0.10.21
1216  */
1217 GstClockTime
1218 gst_base_sink_get_render_delay (GstBaseSink * sink)
1219 {
1220   GstClockTimeDiff res;
1221
1222   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1223
1224   GST_OBJECT_LOCK (sink);
1225   res = sink->priv->render_delay;
1226   GST_OBJECT_UNLOCK (sink);
1227
1228   return res;
1229 }
1230
1231 /**
1232  * gst_base_sink_set_blocksize:
1233  * @sink: a #GstBaseSink
1234  * @blocksize: the blocksize in bytes
1235  *
1236  * Set the number of bytes that the sink will pull when it is operating in pull
1237  * mode.
1238  *
1239  * Since: 0.10.22
1240  */
1241 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1242 void
1243 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1244 {
1245   g_return_if_fail (GST_IS_BASE_SINK (sink));
1246
1247   GST_OBJECT_LOCK (sink);
1248   sink->priv->blocksize = blocksize;
1249   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1250   GST_OBJECT_UNLOCK (sink);
1251 }
1252
1253 /**
1254  * gst_base_sink_get_blocksize:
1255  * @sink: a #GstBaseSink
1256  *
1257  * Get the number of bytes that the sink will pull when it is operating in pull
1258  * mode.
1259  *
1260  * Returns: the number of bytes @sink will pull in pull mode.
1261  *
1262  * Since: 0.10.22
1263  */
1264 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1265 guint
1266 gst_base_sink_get_blocksize (GstBaseSink * sink)
1267 {
1268   guint res;
1269
1270   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1271
1272   GST_OBJECT_LOCK (sink);
1273   res = sink->priv->blocksize;
1274   GST_OBJECT_UNLOCK (sink);
1275
1276   return res;
1277 }
1278
1279 /**
1280  * gst_base_sink_set_throttle_time:
1281  * @sink: a #GstBaseSink
1282  * @throttle: the throttle time in nanoseconds
1283  *
1284  * Set the time that will be inserted between rendered buffers. This
1285  * can be used to control the maximum buffers per second that the sink
1286  * will render. 
1287  *
1288  * Since: 0.10.33
1289  */
1290 void
1291 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1292 {
1293   g_return_if_fail (GST_IS_BASE_SINK (sink));
1294
1295   GST_OBJECT_LOCK (sink);
1296   sink->priv->throttle_time = throttle;
1297   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1298   GST_OBJECT_UNLOCK (sink);
1299 }
1300
1301 /**
1302  * gst_base_sink_get_throttle_time:
1303  * @sink: a #GstBaseSink
1304  *
1305  * Get the time that will be inserted between frames to control the 
1306  * maximum buffers per second.
1307  *
1308  * Returns: the number of nanoseconds @sink will put between frames.
1309  *
1310  * Since: 0.10.33
1311  */
1312 guint64
1313 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1314 {
1315   guint64 res;
1316
1317   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1318
1319   GST_OBJECT_LOCK (sink);
1320   res = sink->priv->throttle_time;
1321   GST_OBJECT_UNLOCK (sink);
1322
1323   return res;
1324 }
1325
1326 static void
1327 gst_base_sink_set_property (GObject * object, guint prop_id,
1328     const GValue * value, GParamSpec * pspec)
1329 {
1330   GstBaseSink *sink = GST_BASE_SINK (object);
1331
1332   switch (prop_id) {
1333     case PROP_PREROLL_QUEUE_LEN:
1334       /* preroll lock necessary to serialize with finish_preroll */
1335       GST_BASE_SINK_PREROLL_LOCK (sink);
1336       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1337       GST_BASE_SINK_PREROLL_UNLOCK (sink);
1338       break;
1339     case PROP_SYNC:
1340       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1341       break;
1342     case PROP_MAX_LATENESS:
1343       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1344       break;
1345     case PROP_QOS:
1346       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1347       break;
1348     case PROP_ASYNC:
1349       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1350       break;
1351     case PROP_TS_OFFSET:
1352       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1353       break;
1354     case PROP_BLOCKSIZE:
1355       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1356       break;
1357     case PROP_RENDER_DELAY:
1358       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1359       break;
1360     case PROP_ENABLE_LAST_BUFFER:
1361       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1362       break;
1363     case PROP_THROTTLE_TIME:
1364       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1365       break;
1366     default:
1367       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1368       break;
1369   }
1370 }
1371
1372 static void
1373 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1374     GParamSpec * pspec)
1375 {
1376   GstBaseSink *sink = GST_BASE_SINK (object);
1377
1378   switch (prop_id) {
1379     case PROP_PREROLL_QUEUE_LEN:
1380       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1381       break;
1382     case PROP_SYNC:
1383       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1384       break;
1385     case PROP_MAX_LATENESS:
1386       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1387       break;
1388     case PROP_QOS:
1389       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1390       break;
1391     case PROP_ASYNC:
1392       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1393       break;
1394     case PROP_TS_OFFSET:
1395       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1396       break;
1397     case PROP_LAST_BUFFER:
1398       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1399       break;
1400     case PROP_ENABLE_LAST_BUFFER:
1401       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1402       break;
1403     case PROP_BLOCKSIZE:
1404       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1405       break;
1406     case PROP_RENDER_DELAY:
1407       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1408       break;
1409     case PROP_THROTTLE_TIME:
1410       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1411       break;
1412     default:
1413       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1414       break;
1415   }
1416 }
1417
1418
1419 static GstCaps *
1420 gst_base_sink_get_caps (GstBaseSink * sink)
1421 {
1422   return NULL;
1423 }
1424
1425 static gboolean
1426 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1427 {
1428   return TRUE;
1429 }
1430
1431 /* with PREROLL_LOCK, STREAM_LOCK */
1432 static void
1433 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1434 {
1435   GstMiniObject *obj;
1436
1437   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1438   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1439     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1440     gst_mini_object_unref (obj);
1441   }
1442   /* we can't have EOS anymore now */
1443   basesink->eos = FALSE;
1444   basesink->priv->received_eos = FALSE;
1445   basesink->have_preroll = FALSE;
1446   basesink->priv->step_unlock = FALSE;
1447   basesink->eos_queued = FALSE;
1448   basesink->preroll_queued = 0;
1449   basesink->buffers_queued = 0;
1450   basesink->events_queued = 0;
1451   /* can't report latency anymore until we preroll again */
1452   if (basesink->priv->async_enabled) {
1453     GST_OBJECT_LOCK (basesink);
1454     basesink->priv->have_latency = FALSE;
1455     GST_OBJECT_UNLOCK (basesink);
1456   }
1457   /* and signal any waiters now */
1458   GST_BASE_SINK_PREROLL_SIGNAL (basesink);
1459 }
1460
1461 /* with STREAM_LOCK, configures given segment with the event information. */
1462 static void
1463 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1464     GstEvent * event, GstSegment * segment)
1465 {
1466   gboolean update;
1467   gdouble rate, arate;
1468   GstFormat format;
1469   gint64 start;
1470   gint64 stop;
1471   gint64 time;
1472
1473   /* the newsegment event is needed to bring the buffer timestamps to the
1474    * stream time and to drop samples outside of the playback segment. */
1475   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1476       &start, &stop, &time);
1477
1478   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1479    * We protect with the OBJECT_LOCK so that we can use the values to
1480    * safely answer a POSITION query. */
1481   GST_OBJECT_LOCK (basesink);
1482   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1483       stop, time);
1484
1485   if (format == GST_FORMAT_TIME) {
1486     GST_DEBUG_OBJECT (basesink,
1487         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1488         "format GST_FORMAT_TIME, "
1489         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1490         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1491         update, rate, arate, GST_TIME_ARGS (segment->start),
1492         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1493         GST_TIME_ARGS (segment->accum));
1494   } else {
1495     GST_DEBUG_OBJECT (basesink,
1496         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1497         "format %d, "
1498         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1499         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1500         segment->format, segment->start, segment->stop, segment->time,
1501         segment->accum);
1502   }
1503   GST_OBJECT_UNLOCK (basesink);
1504 }
1505
1506 /* with PREROLL_LOCK, STREAM_LOCK */
1507 static gboolean
1508 gst_base_sink_commit_state (GstBaseSink * basesink)
1509 {
1510   /* commit state and proceed to next pending state */
1511   GstState current, next, pending, post_pending;
1512   gboolean post_paused = FALSE;
1513   gboolean post_async_done = FALSE;
1514   gboolean post_playing = FALSE;
1515
1516   /* we are certainly not playing async anymore now */
1517   basesink->playing_async = FALSE;
1518
1519   GST_OBJECT_LOCK (basesink);
1520   current = GST_STATE (basesink);
1521   next = GST_STATE_NEXT (basesink);
1522   pending = GST_STATE_PENDING (basesink);
1523   post_pending = pending;
1524
1525   switch (pending) {
1526     case GST_STATE_PLAYING:
1527     {
1528       GstBaseSinkClass *bclass;
1529
1530       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1531
1532       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1533
1534       basesink->need_preroll = FALSE;
1535       post_async_done = TRUE;
1536       basesink->priv->commited = TRUE;
1537       post_playing = TRUE;
1538       /* post PAUSED too when we were READY */
1539       if (current == GST_STATE_READY) {
1540         post_paused = TRUE;
1541       }
1542       break;
1543     }
1544     case GST_STATE_PAUSED:
1545       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1546       post_paused = TRUE;
1547       post_async_done = TRUE;
1548       basesink->priv->commited = TRUE;
1549       post_pending = GST_STATE_VOID_PENDING;
1550       break;
1551     case GST_STATE_READY:
1552     case GST_STATE_NULL:
1553       goto stopping;
1554     case GST_STATE_VOID_PENDING:
1555       goto nothing_pending;
1556     default:
1557       break;
1558   }
1559
1560   /* we can report latency queries now */
1561   basesink->priv->have_latency = TRUE;
1562
1563   GST_STATE (basesink) = pending;
1564   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1565   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1566   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1567   GST_OBJECT_UNLOCK (basesink);
1568
1569   if (post_paused) {
1570     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1571     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1572         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1573             current, next, post_pending));
1574   }
1575   if (post_async_done) {
1576     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1577     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1578         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1579   }
1580   if (post_playing) {
1581     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1582     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1583         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1584             next, pending, GST_STATE_VOID_PENDING));
1585   }
1586
1587   GST_STATE_BROADCAST (basesink);
1588
1589   return TRUE;
1590
1591 nothing_pending:
1592   {
1593     /* Depending on the state, set our vars. We get in this situation when the
1594      * state change function got a change to update the state vars before the
1595      * streaming thread did. This is fine but we need to make sure that we
1596      * update the need_preroll var since it was TRUE when we got here and might
1597      * become FALSE if we got to PLAYING. */
1598     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1599         gst_element_state_get_name (current));
1600     switch (current) {
1601       case GST_STATE_PLAYING:
1602         basesink->need_preroll = FALSE;
1603         break;
1604       case GST_STATE_PAUSED:
1605         basesink->need_preroll = TRUE;
1606         break;
1607       default:
1608         basesink->need_preroll = FALSE;
1609         basesink->flushing = TRUE;
1610         break;
1611     }
1612     /* we can report latency queries now */
1613     basesink->priv->have_latency = TRUE;
1614     GST_OBJECT_UNLOCK (basesink);
1615     return TRUE;
1616   }
1617 stopping:
1618   {
1619     /* app is going to READY */
1620     GST_DEBUG_OBJECT (basesink, "stopping");
1621     basesink->need_preroll = FALSE;
1622     basesink->flushing = TRUE;
1623     GST_OBJECT_UNLOCK (basesink);
1624     return FALSE;
1625   }
1626 }
1627
1628 static void
1629 start_stepping (GstBaseSink * sink, GstSegment * segment,
1630     GstStepInfo * pending, GstStepInfo * current)
1631 {
1632   gint64 end;
1633   GstMessage *message;
1634
1635   GST_DEBUG_OBJECT (sink, "update pending step");
1636
1637   GST_OBJECT_LOCK (sink);
1638   memcpy (current, pending, sizeof (GstStepInfo));
1639   pending->valid = FALSE;
1640   GST_OBJECT_UNLOCK (sink);
1641
1642   /* post message first */
1643   message =
1644       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1645       current->amount, current->rate, current->flush, current->intermediate);
1646   gst_message_set_seqnum (message, current->seqnum);
1647   gst_element_post_message (GST_ELEMENT (sink), message);
1648
1649   /* get the running time of where we paused and remember it */
1650   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1651   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1652
1653   /* set the new rate for the remainder of the segment */
1654   current->start_rate = segment->rate;
1655   segment->rate *= current->rate;
1656   segment->abs_rate = ABS (segment->rate);
1657
1658   /* save values */
1659   if (segment->rate > 0.0)
1660     current->start_stop = segment->stop;
1661   else
1662     current->start_start = segment->start;
1663
1664   if (current->format == GST_FORMAT_TIME) {
1665     end = current->start + current->amount;
1666     if (!current->flush) {
1667       /* update the segment clipping regions for non-flushing seeks */
1668       if (segment->rate > 0.0) {
1669         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1670         segment->last_stop = segment->stop;
1671       } else {
1672         gint64 position;
1673
1674         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1675         segment->time = position;
1676         segment->start = position;
1677         segment->last_stop = position;
1678       }
1679     }
1680   }
1681
1682   GST_DEBUG_OBJECT (sink,
1683       "segment now rate %lf, applied rate %lf, "
1684       "format GST_FORMAT_TIME, "
1685       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1686       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1687       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1688       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1689       GST_TIME_ARGS (segment->accum));
1690
1691   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1692       GST_TIME_ARGS (current->start));
1693
1694   if (current->amount == -1) {
1695     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1696     current->valid = FALSE;
1697   } else {
1698     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1699         "rate: %f", current->amount, gst_format_get_name (current->format),
1700         current->rate);
1701   }
1702 }
1703
1704 static void
1705 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1706     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1707 {
1708   gint64 stop, position;
1709   GstMessage *message;
1710
1711   GST_DEBUG_OBJECT (sink, "step complete");
1712
1713   if (segment->rate > 0.0)
1714     stop = rstart;
1715   else
1716     stop = rstop;
1717
1718   GST_DEBUG_OBJECT (sink,
1719       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1720
1721   if (stop == -1)
1722     current->duration = current->position;
1723   else
1724     current->duration = stop - current->start;
1725
1726   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1727       GST_TIME_ARGS (current->duration));
1728
1729   position = current->start + current->duration;
1730
1731   /* now move the segment to the new running time */
1732   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1733
1734   if (current->flush) {
1735     /* and remove the accumulated time we flushed, start time did not change */
1736     segment->accum = current->start;
1737   } else {
1738     /* start time is now the stepped position */
1739     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1740   }
1741
1742   /* restore the previous rate */
1743   segment->rate = current->start_rate;
1744   segment->abs_rate = ABS (segment->rate);
1745
1746   if (segment->rate > 0.0)
1747     segment->stop = current->start_stop;
1748   else
1749     segment->start = current->start_start;
1750
1751   /* the clip segment is used for position report in paused... */
1752   memcpy (sink->clip_segment, segment, sizeof (GstSegment));
1753
1754   /* post the step done when we know the stepped duration in TIME */
1755   message =
1756       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1757       current->amount, current->rate, current->flush, current->intermediate,
1758       current->duration, eos);
1759   gst_message_set_seqnum (message, current->seqnum);
1760   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1761
1762   if (!current->intermediate)
1763     sink->need_preroll = current->need_preroll;
1764
1765   /* and the current step info finished and becomes invalid */
1766   current->valid = FALSE;
1767 }
1768
1769 static gboolean
1770 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1771     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1772     gint64 * rstop)
1773 {
1774   gboolean step_end = FALSE;
1775
1776   /* see if we need to skip this buffer because of stepping */
1777   switch (current->format) {
1778     case GST_FORMAT_TIME:
1779     {
1780       guint64 end;
1781       gint64 first, last;
1782
1783       if (segment->rate > 0.0) {
1784         if (segment->stop == *cstop)
1785           *rstop = *rstart + current->amount;
1786
1787         first = *rstart;
1788         last = *rstop;
1789       } else {
1790         if (segment->start == *cstart)
1791           *rstart = *rstop + current->amount;
1792
1793         first = *rstop;
1794         last = *rstart;
1795       }
1796
1797       end = current->start + current->amount;
1798       current->position = first - current->start;
1799
1800       if (G_UNLIKELY (segment->abs_rate != 1.0))
1801         current->position /= segment->abs_rate;
1802
1803       GST_DEBUG_OBJECT (sink,
1804           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1805           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1806       GST_DEBUG_OBJECT (sink,
1807           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1808           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1809           GST_TIME_ARGS (last - current->start),
1810           GST_TIME_ARGS (current->amount));
1811
1812       if ((current->flush && current->position >= current->amount)
1813           || last >= end) {
1814         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1815         step_end = TRUE;
1816         if (segment->rate > 0.0) {
1817           *rstart = end;
1818           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1819         } else {
1820           *rstop = end;
1821           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1822         }
1823       }
1824       GST_DEBUG_OBJECT (sink,
1825           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1826           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1827       GST_DEBUG_OBJECT (sink,
1828           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1829           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1830       break;
1831     }
1832     case GST_FORMAT_BUFFERS:
1833       GST_DEBUG_OBJECT (sink,
1834           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1835           current->position, current->amount);
1836
1837       if (current->position < current->amount) {
1838         current->position++;
1839       } else {
1840         step_end = TRUE;
1841       }
1842       break;
1843     case GST_FORMAT_DEFAULT:
1844     default:
1845       GST_DEBUG_OBJECT (sink,
1846           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1847           current->position, current->amount);
1848       break;
1849   }
1850   return step_end;
1851 }
1852
1853 /* with STREAM_LOCK, PREROLL_LOCK
1854  *
1855  * Returns TRUE if the object needs synchronisation and takes therefore
1856  * part in prerolling.
1857  *
1858  * rsstart/rsstop contain the start/stop in stream time.
1859  * rrstart/rrstop contain the start/stop in running time.
1860  */
1861 static gboolean
1862 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1863     GstClockTime * rsstart, GstClockTime * rsstop,
1864     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1865     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1866     gboolean * step_end, guint8 obj_type)
1867 {
1868   GstBaseSinkClass *bclass;
1869   GstBuffer *buffer;
1870   GstClockTime start, stop;     /* raw start/stop timestamps */
1871   gint64 cstart, cstop;         /* clipped raw timestamps */
1872   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1873   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1874   GstFormat format;
1875   GstBaseSinkPrivate *priv;
1876   gboolean eos;
1877
1878   priv = basesink->priv;
1879
1880   /* start with nothing */
1881   start = stop = GST_CLOCK_TIME_NONE;
1882
1883   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1884     GstEvent *event = GST_EVENT_CAST (obj);
1885
1886     switch (GST_EVENT_TYPE (event)) {
1887         /* EOS event needs syncing */
1888       case GST_EVENT_EOS:
1889       {
1890         if (basesink->segment.rate >= 0.0) {
1891           sstart = sstop = priv->current_sstop;
1892           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1893             /* we have not seen a buffer yet, use the segment values */
1894             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1895                 basesink->segment.format, basesink->segment.stop);
1896           }
1897         } else {
1898           sstart = sstop = priv->current_sstart;
1899           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1900             /* we have not seen a buffer yet, use the segment values */
1901             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1902                 basesink->segment.format, basesink->segment.start);
1903           }
1904         }
1905
1906         rstart = rstop = priv->eos_rtime;
1907         *do_sync = rstart != -1;
1908         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1909             GST_TIME_ARGS (rstart));
1910         /* if we are stepping, we end now */
1911         *step_end = step->valid;
1912         eos = TRUE;
1913         goto eos_done;
1914       }
1915       default:
1916         /* other events do not need syncing */
1917         /* FIXME, maybe NEWSEGMENT might need synchronisation
1918          * since the POSITION query depends on accumulated times and
1919          * we cannot accumulate the current segment before the previous
1920          * one completed.
1921          */
1922         return FALSE;
1923     }
1924   }
1925
1926   eos = FALSE;
1927
1928 again:
1929   /* else do buffer sync code */
1930   buffer = GST_BUFFER_CAST (obj);
1931
1932   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1933
1934   /* just get the times to see if we need syncing, if the start returns -1 we
1935    * don't sync. */
1936   if (bclass->get_times)
1937     bclass->get_times (basesink, buffer, &start, &stop);
1938
1939   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1940     /* we don't need to sync but we still want to get the timestamps for
1941      * tracking the position */
1942     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1943     *do_sync = FALSE;
1944   } else {
1945     *do_sync = TRUE;
1946   }
1947
1948   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1949       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1950       GST_TIME_ARGS (stop), *do_sync);
1951
1952   /* collect segment and format for code clarity */
1953   format = segment->format;
1954
1955   /* no timestamp clipping if we did not get a TIME segment format */
1956   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1957     cstart = start;
1958     cstop = stop;
1959     /* do running and stream time in TIME format */
1960     format = GST_FORMAT_TIME;
1961     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1962     goto do_times;
1963   }
1964
1965   /* clip, only when we know about time */
1966   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1967               (gint64) start, (gint64) stop, &cstart, &cstop))) {
1968     if (step->valid) {
1969       GST_DEBUG_OBJECT (basesink, "step out of segment");
1970       /* when we are stepping, pretend we're at the end of the segment */
1971       if (segment->rate > 0.0) {
1972         cstart = segment->stop;
1973         cstop = segment->stop;
1974       } else {
1975         cstart = segment->start;
1976         cstop = segment->start;
1977       }
1978       goto do_times;
1979     }
1980     goto out_of_segment;
1981   }
1982
1983   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1984     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1985         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1986         GST_TIME_ARGS (cstop));
1987   }
1988
1989   /* set last stop position */
1990   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
1991     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1992   else
1993     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1994
1995 do_times:
1996   rstart = gst_segment_to_running_time (segment, format, cstart);
1997   rstop = gst_segment_to_running_time (segment, format, cstop);
1998
1999   if (G_UNLIKELY (step->valid)) {
2000     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2001                 &rstart, &rstop))) {
2002       /* step is still busy, we discard data when we are flushing */
2003       *stepped = step->flush;
2004       GST_DEBUG_OBJECT (basesink, "stepping busy");
2005     }
2006   }
2007   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2008    * upstream is behaving very badly */
2009   sstart = gst_segment_to_stream_time (segment, format, cstart);
2010   sstop = gst_segment_to_stream_time (segment, format, cstop);
2011
2012 eos_done:
2013   /* eos_done label only called when doing EOS, we also stop stepping then */
2014   if (*step_end && step->flush) {
2015     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2016     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2017     *step_end = FALSE;
2018     /* re-determine running start times for adjusted segment
2019      * (which has a flushed amount of running/accumulated time removed) */
2020     if (!GST_IS_EVENT (obj)) {
2021       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2022       goto again;
2023     }
2024   }
2025
2026   /* save times */
2027   *rsstart = sstart;
2028   *rsstop = sstop;
2029   *rrstart = rstart;
2030   *rrstop = rstop;
2031
2032   /* buffers and EOS always need syncing and preroll */
2033   return TRUE;
2034
2035   /* special cases */
2036 out_of_segment:
2037   {
2038     /* we usually clip in the chain function already but stepping could cause
2039      * the segment to be updated later. we return FALSE so that we don't try
2040      * to sync on it. */
2041     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2042     return FALSE;
2043   }
2044 }
2045
2046 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2047  * adjust a timestamp with the latency and timestamp offset. This function does
2048  * not adjust for the render delay. */
2049 static GstClockTime
2050 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2051 {
2052   GstClockTimeDiff ts_offset;
2053
2054   /* don't do anything funny with invalid timestamps */
2055   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2056     return time;
2057
2058   time += basesink->priv->latency;
2059
2060   /* apply offset, be carefull for underflows */
2061   ts_offset = basesink->priv->ts_offset;
2062   if (ts_offset < 0) {
2063     ts_offset = -ts_offset;
2064     if (ts_offset < time)
2065       time -= ts_offset;
2066     else
2067       time = 0;
2068   } else
2069     time += ts_offset;
2070
2071   /* subtract the render delay again, which was included in the latency */
2072   if (time > basesink->priv->render_delay)
2073     time -= basesink->priv->render_delay;
2074   else
2075     time = 0;
2076
2077   return time;
2078 }
2079
2080 /**
2081  * gst_base_sink_wait_clock:
2082  * @sink: the sink
2083  * @time: the running_time to be reached
2084  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2085  *
2086  * This function will block until @time is reached. It is usually called by
2087  * subclasses that use their own internal synchronisation.
2088  *
2089  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2090  * returned. Likewise, if synchronisation is disabled in the element or there
2091  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2092  *
2093  * This function should only be called with the PREROLL_LOCK held, like when
2094  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2095  * receiving a buffer in
2096  * the #GstBaseSinkClass.render() vmethod.
2097  *
2098  * The @time argument should be the running_time of when this method should
2099  * return and is not adjusted with any latency or offset configured in the
2100  * sink.
2101  *
2102  * Since: 0.10.20
2103  *
2104  * Returns: #GstClockReturn
2105  */
2106 GstClockReturn
2107 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2108     GstClockTimeDiff * jitter)
2109 {
2110   GstClockReturn ret;
2111   GstClock *clock;
2112   GstClockTime base_time;
2113
2114   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2115     goto invalid_time;
2116
2117   GST_OBJECT_LOCK (sink);
2118   if (G_UNLIKELY (!sink->sync))
2119     goto no_sync;
2120
2121   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2122     goto no_clock;
2123
2124   base_time = GST_ELEMENT_CAST (sink)->base_time;
2125   GST_LOG_OBJECT (sink,
2126       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2127       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2128
2129   /* add base_time to running_time to get the time against the clock */
2130   time += base_time;
2131
2132   /* Re-use existing clockid if available */
2133   if (G_LIKELY (sink->priv->cached_clock_id != NULL)) {
2134     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2135             time)) {
2136       gst_clock_id_unref (sink->priv->cached_clock_id);
2137       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2138     }
2139   } else
2140     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2141   GST_OBJECT_UNLOCK (sink);
2142
2143   /* A blocking wait is performed on the clock. We save the ClockID
2144    * so we can unlock the entry at any time. While we are blocking, we
2145    * release the PREROLL_LOCK so that other threads can interrupt the
2146    * entry. */
2147   sink->clock_id = sink->priv->cached_clock_id;
2148   /* release the preroll lock while waiting */
2149   GST_BASE_SINK_PREROLL_UNLOCK (sink);
2150
2151   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2152
2153   GST_BASE_SINK_PREROLL_LOCK (sink);
2154   sink->clock_id = NULL;
2155
2156   return ret;
2157
2158   /* no syncing needed */
2159 invalid_time:
2160   {
2161     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2162     return GST_CLOCK_BADTIME;
2163   }
2164 no_sync:
2165   {
2166     GST_DEBUG_OBJECT (sink, "sync disabled");
2167     GST_OBJECT_UNLOCK (sink);
2168     return GST_CLOCK_BADTIME;
2169   }
2170 no_clock:
2171   {
2172     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2173     GST_OBJECT_UNLOCK (sink);
2174     return GST_CLOCK_BADTIME;
2175   }
2176 }
2177
2178 /**
2179  * gst_base_sink_wait_preroll:
2180  * @sink: the sink
2181  *
2182  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2183  * against the clock it must unblock when going from PLAYING to the PAUSED state
2184  * and call this method before continuing to render the remaining data.
2185  *
2186  * This function will block until a state change to PLAYING happens (in which
2187  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2188  * to a state change to READY or a FLUSH event (in which case this function
2189  * returns #GST_FLOW_WRONG_STATE).
2190  *
2191  * This function should only be called with the PREROLL_LOCK held, like in the
2192  * render function.
2193  *
2194  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2195  * continue. Any other return value should be returned from the render vmethod.
2196  *
2197  * Since: 0.10.11
2198  */
2199 GstFlowReturn
2200 gst_base_sink_wait_preroll (GstBaseSink * sink)
2201 {
2202   sink->have_preroll = TRUE;
2203   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2204   /* block until the state changes, or we get a flush, or something */
2205   GST_BASE_SINK_PREROLL_WAIT (sink);
2206   sink->have_preroll = FALSE;
2207   if (G_UNLIKELY (sink->flushing))
2208     goto stopping;
2209   if (G_UNLIKELY (sink->priv->step_unlock))
2210     goto step_unlocked;
2211   GST_DEBUG_OBJECT (sink, "continue after preroll");
2212
2213   return GST_FLOW_OK;
2214
2215   /* ERRORS */
2216 stopping:
2217   {
2218     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2219     return GST_FLOW_WRONG_STATE;
2220   }
2221 step_unlocked:
2222   {
2223     sink->priv->step_unlock = FALSE;
2224     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2225     return GST_FLOW_STEP;
2226   }
2227 }
2228
2229 static inline guint8
2230 get_object_type (GstMiniObject * obj)
2231 {
2232   guint8 obj_type;
2233
2234   if (G_LIKELY (GST_IS_BUFFER (obj)))
2235     obj_type = _PR_IS_BUFFER;
2236   else if (GST_IS_EVENT (obj))
2237     obj_type = _PR_IS_EVENT;
2238   else if (GST_IS_BUFFER_LIST (obj))
2239     obj_type = _PR_IS_BUFFERLIST;
2240   else
2241     obj_type = _PR_IS_NOTHING;
2242
2243   return obj_type;
2244 }
2245
2246 /**
2247  * gst_base_sink_do_preroll:
2248  * @sink: the sink
2249  * @obj: (transfer none): the mini object that caused the preroll
2250  *
2251  * If the @sink spawns its own thread for pulling buffers from upstream it
2252  * should call this method after it has pulled a buffer. If the element needed
2253  * to preroll, this function will perform the preroll and will then block
2254  * until the element state is changed.
2255  *
2256  * This function should be called with the PREROLL_LOCK held.
2257  *
2258  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2259  * continue. Any other return value should be returned from the render vmethod.
2260  *
2261  * Since: 0.10.22
2262  */
2263 GstFlowReturn
2264 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2265 {
2266   GstFlowReturn ret;
2267
2268   while (G_UNLIKELY (sink->need_preroll)) {
2269     guint8 obj_type;
2270     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2271
2272     obj_type = get_object_type (obj);
2273
2274     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2275     if (ret != GST_FLOW_OK)
2276       goto preroll_failed;
2277
2278     /* need to recheck here because the commit state could have
2279      * made us not need the preroll anymore */
2280     if (G_LIKELY (sink->need_preroll)) {
2281       /* block until the state changes, or we get a flush, or something */
2282       ret = gst_base_sink_wait_preroll (sink);
2283       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2284         goto preroll_failed;
2285     }
2286   }
2287   return GST_FLOW_OK;
2288
2289   /* ERRORS */
2290 preroll_failed:
2291   {
2292     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2293     return ret;
2294   }
2295 }
2296
2297 /**
2298  * gst_base_sink_wait_eos:
2299  * @sink: the sink
2300  * @time: the running_time to be reached
2301  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2302  *
2303  * This function will block until @time is reached. It is usually called by
2304  * subclasses that use their own internal synchronisation but want to let the
2305  * EOS be handled by the base class.
2306  *
2307  * This function should only be called with the PREROLL_LOCK held, like when
2308  * receiving an EOS event in the ::event vmethod.
2309  *
2310  * The @time argument should be the running_time of when the EOS should happen
2311  * and will be adjusted with any latency and offset configured in the sink.
2312  *
2313  * Returns: #GstFlowReturn
2314  *
2315  * Since: 0.10.15
2316  */
2317 GstFlowReturn
2318 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2319     GstClockTimeDiff * jitter)
2320 {
2321   GstClockReturn status;
2322   GstFlowReturn ret;
2323
2324   do {
2325     GstClockTime stime;
2326
2327     GST_DEBUG_OBJECT (sink, "checking preroll");
2328
2329     /* first wait for the playing state before we can continue */
2330     while (G_UNLIKELY (sink->need_preroll)) {
2331       ret = gst_base_sink_wait_preroll (sink);
2332       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2333         goto flushing;
2334     }
2335
2336     /* preroll done, we can sync since we are in PLAYING now. */
2337     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2338         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2339
2340     /* compensate for latency and ts_offset. We don't adjust for render delay
2341      * because we don't interact with the device on EOS normally. */
2342     stime = gst_base_sink_adjust_time (sink, time);
2343
2344     /* wait for the clock, this can be interrupted because we got shut down or
2345      * we PAUSED. */
2346     status = gst_base_sink_wait_clock (sink, stime, jitter);
2347
2348     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2349
2350     /* invalid time, no clock or sync disabled, just continue then */
2351     if (status == GST_CLOCK_BADTIME)
2352       break;
2353
2354     /* waiting could have been interrupted and we can be flushing now */
2355     if (G_UNLIKELY (sink->flushing))
2356       goto flushing;
2357
2358     /* retry if we got unscheduled, which means we did not reach the timeout
2359      * yet. if some other error occures, we continue. */
2360   } while (status == GST_CLOCK_UNSCHEDULED);
2361
2362   GST_DEBUG_OBJECT (sink, "end of stream");
2363
2364   return GST_FLOW_OK;
2365
2366   /* ERRORS */
2367 flushing:
2368   {
2369     GST_DEBUG_OBJECT (sink, "we are flushing");
2370     return GST_FLOW_WRONG_STATE;
2371   }
2372 }
2373
2374 /* with STREAM_LOCK, PREROLL_LOCK
2375  *
2376  * Make sure we are in PLAYING and synchronize an object to the clock.
2377  *
2378  * If we need preroll, we are not in PLAYING. We try to commit the state
2379  * if needed and then block if we still are not PLAYING.
2380  *
2381  * We start waiting on the clock in PLAYING. If we got interrupted, we
2382  * immediatly try to re-preroll.
2383  *
2384  * Some objects do not need synchronisation (most events) and so this function
2385  * immediatly returns GST_FLOW_OK.
2386  *
2387  * for objects that arrive later than max-lateness to be synchronized to the
2388  * clock have the @late boolean set to TRUE.
2389  *
2390  * This function keeps a running average of the jitter (the diff between the
2391  * clock time and the requested sync time). The jitter is negative for
2392  * objects that arrive in time and positive for late buffers.
2393  *
2394  * does not take ownership of obj.
2395  */
2396 static GstFlowReturn
2397 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2398     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2399 {
2400   GstClockTimeDiff jitter = 0;
2401   gboolean syncable;
2402   GstClockReturn status = GST_CLOCK_OK;
2403   GstClockTime rstart, rstop, sstart, sstop, stime;
2404   gboolean do_sync;
2405   GstBaseSinkPrivate *priv;
2406   GstFlowReturn ret;
2407   GstStepInfo *current, *pending;
2408   gboolean stepped;
2409
2410   priv = basesink->priv;
2411
2412 do_step:
2413   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2414   do_sync = TRUE;
2415   stepped = FALSE;
2416
2417   priv->current_rstart = GST_CLOCK_TIME_NONE;
2418
2419   /* get stepping info */
2420   current = &priv->current_step;
2421   pending = &priv->pending_step;
2422
2423   /* get timing information for this object against the render segment */
2424   syncable = gst_base_sink_get_sync_times (basesink, obj,
2425       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2426       current, step_end, obj_type);
2427
2428   if (G_UNLIKELY (stepped))
2429     goto step_skipped;
2430
2431   /* a syncable object needs to participate in preroll and
2432    * clocking. All buffers and EOS are syncable. */
2433   if (G_UNLIKELY (!syncable))
2434     goto not_syncable;
2435
2436   /* store timing info for current object */
2437   priv->current_rstart = rstart;
2438   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2439
2440   /* save sync time for eos when the previous object needed sync */
2441   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2442
2443   /* calculate inter frame spacing */
2444   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2445     GstClockTime in_diff;
2446
2447     in_diff = rstart - priv->prev_rstart;
2448
2449     if (priv->avg_in_diff == -1)
2450       priv->avg_in_diff = in_diff;
2451     else
2452       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2453
2454     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2455         GST_TIME_ARGS (priv->avg_in_diff));
2456
2457   }
2458   priv->prev_rstart = rstart;
2459
2460   if (G_UNLIKELY (priv->earliest_in_time != -1
2461           && rstart < priv->earliest_in_time))
2462     goto qos_dropped;
2463
2464 again:
2465   /* first do preroll, this makes sure we commit our state
2466    * to PAUSED and can continue to PLAYING. We cannot perform
2467    * any clock sync in PAUSED because there is no clock. */
2468   ret = gst_base_sink_do_preroll (basesink, obj);
2469   if (G_UNLIKELY (ret != GST_FLOW_OK))
2470     goto preroll_failed;
2471
2472   /* update the segment with a pending step if the current one is invalid and we
2473    * have a new pending one. We only accept new step updates after a preroll */
2474   if (G_UNLIKELY (pending->valid && !current->valid)) {
2475     start_stepping (basesink, &basesink->segment, pending, current);
2476     goto do_step;
2477   }
2478
2479   /* After rendering we store the position of the last buffer so that we can use
2480    * it to report the position. We need to take the lock here. */
2481   GST_OBJECT_LOCK (basesink);
2482   priv->current_sstart = sstart;
2483   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2484   GST_OBJECT_UNLOCK (basesink);
2485
2486   if (!do_sync)
2487     goto done;
2488
2489   /* adjust for latency */
2490   stime = gst_base_sink_adjust_time (basesink, rstart);
2491
2492   /* adjust for render-delay, avoid underflows */
2493   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2494     if (stime > priv->render_delay)
2495       stime -= priv->render_delay;
2496     else
2497       stime = 0;
2498   }
2499
2500   /* preroll done, we can sync since we are in PLAYING now. */
2501   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2502       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2503       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2504
2505   /* This function will return immediatly if start == -1, no clock
2506    * or sync is disabled with GST_CLOCK_BADTIME. */
2507   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2508
2509   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2510       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2511
2512   /* invalid time, no clock or sync disabled, just render */
2513   if (status == GST_CLOCK_BADTIME)
2514     goto done;
2515
2516   /* waiting could have been interrupted and we can be flushing now */
2517   if (G_UNLIKELY (basesink->flushing))
2518     goto flushing;
2519
2520   /* check for unlocked by a state change, we are not flushing so
2521    * we can try to preroll on the current buffer. */
2522   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2523     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2524     priv->call_preroll = TRUE;
2525     goto again;
2526   }
2527
2528   /* successful syncing done, record observation */
2529   priv->current_jitter = jitter;
2530
2531   /* check if the object should be dropped */
2532   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2533       status, jitter);
2534
2535 done:
2536   return GST_FLOW_OK;
2537
2538   /* ERRORS */
2539 step_skipped:
2540   {
2541     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2542     *late = TRUE;
2543     return GST_FLOW_OK;
2544   }
2545 not_syncable:
2546   {
2547     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2548     return GST_FLOW_OK;
2549   }
2550 qos_dropped:
2551   {
2552     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2553     *late = TRUE;
2554     return GST_FLOW_OK;
2555   }
2556 flushing:
2557   {
2558     GST_DEBUG_OBJECT (basesink, "we are flushing");
2559     return GST_FLOW_WRONG_STATE;
2560   }
2561 preroll_failed:
2562   {
2563     GST_DEBUG_OBJECT (basesink, "preroll failed");
2564     *step_end = FALSE;
2565     return ret;
2566   }
2567 }
2568
2569 static gboolean
2570 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2571     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2572 {
2573   GstEvent *event;
2574   gboolean res;
2575
2576   /* generate Quality-of-Service event */
2577   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2578       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2579       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2580
2581   event = gst_event_new_qos_full (type, proportion, diff, time);
2582
2583   /* send upstream */
2584   res = gst_pad_push_event (basesink->sinkpad, event);
2585
2586   return res;
2587 }
2588
2589 static void
2590 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2591 {
2592   GstBaseSinkPrivate *priv;
2593   GstClockTime start, stop;
2594   GstClockTimeDiff jitter;
2595   GstClockTime pt, entered, left;
2596   GstClockTime duration;
2597   gdouble rate;
2598
2599   priv = sink->priv;
2600
2601   start = priv->current_rstart;
2602
2603   if (priv->current_step.valid)
2604     return;
2605
2606   /* if Quality-of-Service disabled, do nothing */
2607   if (!g_atomic_int_get (&priv->qos_enabled) ||
2608       !GST_CLOCK_TIME_IS_VALID (start))
2609     return;
2610
2611   stop = priv->current_rstop;
2612   jitter = priv->current_jitter;
2613
2614   if (jitter < 0) {
2615     /* this is the time the buffer entered the sink */
2616     if (start < -jitter)
2617       entered = 0;
2618     else
2619       entered = start + jitter;
2620     left = start;
2621   } else {
2622     /* this is the time the buffer entered the sink */
2623     entered = start + jitter;
2624     /* this is the time the buffer left the sink */
2625     left = start + jitter;
2626   }
2627
2628   /* calculate duration of the buffer */
2629   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2630     duration = stop - start;
2631   else
2632     duration = priv->avg_in_diff;
2633
2634   /* if we have the time when the last buffer left us, calculate
2635    * processing time */
2636   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2637     if (entered > priv->last_left) {
2638       pt = entered - priv->last_left;
2639     } else {
2640       pt = 0;
2641     }
2642   } else {
2643     pt = priv->avg_pt;
2644   }
2645
2646   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2647       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2648       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2649       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2650       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2651       GST_TIME_ARGS (duration), jitter);
2652
2653   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2654       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2655       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2656       priv->avg_rate);
2657
2658   /* collect running averages. for first observations, we copy the
2659    * values */
2660   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2661     priv->avg_duration = duration;
2662   else
2663     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2664
2665   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2666     priv->avg_pt = pt;
2667   else
2668     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2669
2670   if (priv->avg_duration != 0)
2671     rate =
2672         gst_guint64_to_gdouble (priv->avg_pt) /
2673         gst_guint64_to_gdouble (priv->avg_duration);
2674   else
2675     rate = 1.0;
2676
2677   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2678     if (dropped || priv->avg_rate < 0.0) {
2679       priv->avg_rate = rate;
2680     } else {
2681       if (rate > 1.0)
2682         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2683       else
2684         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2685     }
2686   }
2687
2688   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2689       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2690       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2691       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2692
2693
2694   if (priv->avg_rate >= 0.0) {
2695     GstQOSType type;
2696     GstClockTimeDiff diff;
2697
2698     /* if we have a valid rate, start sending QoS messages */
2699     if (priv->current_jitter < 0) {
2700       /* make sure we never go below 0 when adding the jitter to the
2701        * timestamp. */
2702       if (priv->current_rstart < -priv->current_jitter)
2703         priv->current_jitter = -priv->current_rstart;
2704     }
2705
2706     if (priv->throttle_time > 0) {
2707       diff = priv->throttle_time;
2708       type = GST_QOS_TYPE_THROTTLE;
2709     } else {
2710       diff = priv->current_jitter;
2711       if (diff <= 0)
2712         type = GST_QOS_TYPE_OVERFLOW;
2713       else
2714         type = GST_QOS_TYPE_UNDERFLOW;
2715     }
2716
2717     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2718         diff);
2719   }
2720
2721   /* record when this buffer will leave us */
2722   priv->last_left = left;
2723 }
2724
2725 /* reset all qos measuring */
2726 static void
2727 gst_base_sink_reset_qos (GstBaseSink * sink)
2728 {
2729   GstBaseSinkPrivate *priv;
2730
2731   priv = sink->priv;
2732
2733   priv->last_render_time = GST_CLOCK_TIME_NONE;
2734   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2735   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2736   priv->last_left = GST_CLOCK_TIME_NONE;
2737   priv->avg_duration = GST_CLOCK_TIME_NONE;
2738   priv->avg_pt = GST_CLOCK_TIME_NONE;
2739   priv->avg_rate = -1.0;
2740   priv->avg_render = GST_CLOCK_TIME_NONE;
2741   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2742   priv->rendered = 0;
2743   priv->dropped = 0;
2744
2745 }
2746
2747 /* Checks if the object was scheduled too late.
2748  *
2749  * rstart/rstop contain the running_time start and stop values
2750  * of the object.
2751  *
2752  * status and jitter contain the return values from the clock wait.
2753  *
2754  * returns TRUE if the buffer was too late.
2755  */
2756 static gboolean
2757 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2758     GstClockTime rstart, GstClockTime rstop,
2759     GstClockReturn status, GstClockTimeDiff jitter)
2760 {
2761   gboolean late;
2762   gint64 max_lateness;
2763   GstBaseSinkPrivate *priv;
2764
2765   priv = basesink->priv;
2766
2767   late = FALSE;
2768
2769   /* only for objects that were too late */
2770   if (G_LIKELY (status != GST_CLOCK_EARLY))
2771     goto in_time;
2772
2773   max_lateness = basesink->max_lateness;
2774
2775   /* check if frame dropping is enabled */
2776   if (max_lateness == -1)
2777     goto no_drop;
2778
2779   /* only check for buffers */
2780   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2781     goto not_buffer;
2782
2783   /* can't do check if we don't have a timestamp */
2784   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2785     goto no_timestamp;
2786
2787   /* we can add a valid stop time */
2788   if (GST_CLOCK_TIME_IS_VALID (rstop))
2789     max_lateness += rstop;
2790   else {
2791     max_lateness += rstart;
2792     /* no stop time, use avg frame diff */
2793     if (priv->avg_in_diff != -1)
2794       max_lateness += priv->avg_in_diff;
2795   }
2796
2797   /* if the jitter bigger than duration and lateness we are too late */
2798   if ((late = rstart + jitter > max_lateness)) {
2799     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2800         "buffer is too late %" GST_TIME_FORMAT
2801         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2802         GST_TIME_ARGS (max_lateness));
2803     /* !!emergency!!, if we did not receive anything valid for more than a
2804      * second, render it anyway so the user sees something */
2805     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2806         rstart - priv->last_render_time > GST_SECOND) {
2807       late = FALSE;
2808       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2809           (_("A lot of buffers are being dropped.")),
2810           ("There may be a timestamping problem, or this computer is too slow."));
2811       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2812           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2813           GST_TIME_ARGS (priv->last_render_time));
2814     }
2815   }
2816
2817 done:
2818   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2819     priv->last_render_time = rstart;
2820     /* the next allowed input timestamp */
2821     if (priv->throttle_time > 0)
2822       priv->earliest_in_time = rstart + priv->throttle_time;
2823   }
2824   return late;
2825
2826   /* all is fine */
2827 in_time:
2828   {
2829     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2830     goto done;
2831   }
2832 no_drop:
2833   {
2834     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2835     goto done;
2836   }
2837 not_buffer:
2838   {
2839     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2840     return FALSE;
2841   }
2842 no_timestamp:
2843   {
2844     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2845     return FALSE;
2846   }
2847 }
2848
2849 /* called before and after calling the render vmethod. It keeps track of how
2850  * much time was spent in the render method and is used to check if we are
2851  * flooded */
2852 static void
2853 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2854 {
2855   GstBaseSinkPrivate *priv;
2856
2857   priv = basesink->priv;
2858
2859   if (start) {
2860     priv->start = gst_util_get_timestamp ();
2861   } else {
2862     GstClockTime elapsed;
2863
2864     priv->stop = gst_util_get_timestamp ();
2865
2866     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2867
2868     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2869       priv->avg_render = elapsed;
2870     else
2871       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2872
2873     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2874         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2875   }
2876 }
2877
2878 /* with STREAM_LOCK, PREROLL_LOCK,
2879  *
2880  * Synchronize the object on the clock and then render it.
2881  *
2882  * takes ownership of obj.
2883  */
2884 static GstFlowReturn
2885 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2886     guint8 obj_type, gpointer obj)
2887 {
2888   GstFlowReturn ret;
2889   GstBaseSinkClass *bclass;
2890   gboolean late, step_end;
2891   gpointer sync_obj;
2892   GstBaseSinkPrivate *priv;
2893
2894   priv = basesink->priv;
2895
2896   if (OBJ_IS_BUFFERLIST (obj_type)) {
2897     /*
2898      * If buffer list, use the first group buffer within the list
2899      * for syncing
2900      */
2901     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
2902     g_assert (NULL != sync_obj);
2903   } else {
2904     sync_obj = obj;
2905   }
2906
2907 again:
2908   late = FALSE;
2909   step_end = FALSE;
2910
2911   /* synchronize this object, non syncable objects return OK
2912    * immediatly. */
2913   ret =
2914       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2915       obj_type);
2916   if (G_UNLIKELY (ret != GST_FLOW_OK))
2917     goto sync_failed;
2918
2919   /* and now render, event or buffer/buffer list. */
2920   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2921     /* drop late buffers unconditionally, let's hope it's unlikely */
2922     if (G_UNLIKELY (late))
2923       goto dropped;
2924
2925     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2926
2927     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2928             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2929       gint do_qos;
2930
2931       /* read once, to get same value before and after */
2932       do_qos = g_atomic_int_get (&priv->qos_enabled);
2933
2934       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2935
2936       /* record rendering time for QoS and stats */
2937       if (do_qos)
2938         gst_base_sink_do_render_stats (basesink, TRUE);
2939
2940       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2941         GstBuffer *buf;
2942
2943         /* For buffer lists do not set last buffer. Creating buffer
2944          * with meaningful data can be done only with memcpy which will
2945          * significantly affect performance */
2946         buf = GST_BUFFER_CAST (obj);
2947         gst_base_sink_set_last_buffer (basesink, buf);
2948
2949         ret = bclass->render (basesink, buf);
2950       } else {
2951         GstBufferList *buflist;
2952
2953         buflist = GST_BUFFER_LIST_CAST (obj);
2954
2955         ret = bclass->render_list (basesink, buflist);
2956       }
2957
2958       if (do_qos)
2959         gst_base_sink_do_render_stats (basesink, FALSE);
2960
2961       if (ret == GST_FLOW_STEP)
2962         goto again;
2963
2964       if (G_UNLIKELY (basesink->flushing))
2965         goto flushing;
2966
2967       priv->rendered++;
2968     }
2969   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
2970     GstEvent *event = GST_EVENT_CAST (obj);
2971     gboolean event_res = TRUE;
2972     GstEventType type;
2973
2974     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2975
2976     type = GST_EVENT_TYPE (event);
2977
2978     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2979         gst_event_type_get_name (type));
2980
2981     if (bclass->event)
2982       event_res = bclass->event (basesink, event);
2983
2984     /* when we get here we could be flushing again when the event handler calls
2985      * _wait_eos(). We have to ignore this object in that case. */
2986     if (G_UNLIKELY (basesink->flushing))
2987       goto flushing;
2988
2989     if (G_LIKELY (event_res)) {
2990       guint32 seqnum;
2991
2992       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2993       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2994
2995       switch (type) {
2996         case GST_EVENT_EOS:
2997         {
2998           GstMessage *message;
2999
3000           /* the EOS event is completely handled so we mark
3001            * ourselves as being in the EOS state. eos is also
3002            * protected by the object lock so we can read it when
3003            * answering the POSITION query. */
3004           GST_OBJECT_LOCK (basesink);
3005           basesink->eos = TRUE;
3006           GST_OBJECT_UNLOCK (basesink);
3007
3008           /* ok, now we can post the message */
3009           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3010
3011           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3012           gst_message_set_seqnum (message, seqnum);
3013           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3014           break;
3015         }
3016         case GST_EVENT_NEWSEGMENT:
3017           /* configure the segment */
3018           gst_base_sink_configure_segment (basesink, pad, event,
3019               &basesink->segment);
3020           break;
3021         case GST_EVENT_SINK_MESSAGE:{
3022           GstMessage *msg = NULL;
3023
3024           gst_event_parse_sink_message (event, &msg);
3025
3026           if (msg)
3027             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3028         }
3029         default:
3030           break;
3031       }
3032     }
3033   } else {
3034     g_return_val_if_reached (GST_FLOW_ERROR);
3035   }
3036
3037 done:
3038   if (step_end) {
3039     /* the step ended, check if we need to activate a new step */
3040     GST_DEBUG_OBJECT (basesink, "step ended");
3041     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3042         priv->current_rstart, priv->current_rstop, basesink->eos);
3043     goto again;
3044   }
3045
3046   gst_base_sink_perform_qos (basesink, late);
3047
3048   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3049   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3050   return ret;
3051
3052   /* ERRORS */
3053 sync_failed:
3054   {
3055     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3056     goto done;
3057   }
3058 dropped:
3059   {
3060     priv->dropped++;
3061     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3062
3063     if (g_atomic_int_get (&priv->qos_enabled)) {
3064       GstMessage *qos_msg;
3065       GstClockTime timestamp, duration;
3066
3067       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3068       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3069
3070       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3071           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3072           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3073           GST_TIME_ARGS (priv->current_rstart),
3074           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3075           GST_TIME_ARGS (duration));
3076       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3077           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3078           priv->rendered, priv->dropped);
3079
3080       qos_msg =
3081           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3082           priv->current_rstart, priv->current_sstart, timestamp, duration);
3083       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3084           1000000);
3085       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3086           priv->dropped);
3087       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3088     }
3089     goto done;
3090   }
3091 flushing:
3092   {
3093     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3094     gst_mini_object_unref (obj);
3095     return GST_FLOW_WRONG_STATE;
3096   }
3097 }
3098
3099 /* with STREAM_LOCK, PREROLL_LOCK
3100  *
3101  * Perform preroll on the given object. For buffers this means
3102  * calling the preroll subclass method.
3103  * If that succeeds, the state will be commited.
3104  *
3105  * function does not take ownership of obj.
3106  */
3107 static GstFlowReturn
3108 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3109     GstMiniObject * obj)
3110 {
3111   GstFlowReturn ret;
3112
3113   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3114
3115   /* if it's a buffer, we need to call the preroll method */
3116   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3117     GstBaseSinkClass *bclass;
3118     GstBuffer *buf;
3119     GstClockTime timestamp;
3120
3121     if (OBJ_IS_BUFFERLIST (obj_type)) {
3122       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
3123       g_assert (NULL != buf);
3124     } else {
3125       buf = GST_BUFFER_CAST (obj);
3126     }
3127
3128     timestamp = GST_BUFFER_TIMESTAMP (buf);
3129
3130     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3131         GST_TIME_ARGS (timestamp));
3132
3133     /*
3134      * For buffer lists do not set last buffer. Creating buffer
3135      * with meaningful data can be done only with memcpy which will
3136      * significantly affect performance
3137      */
3138     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3139       gst_base_sink_set_last_buffer (basesink, buf);
3140     }
3141
3142     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3143     if (bclass->preroll)
3144       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3145         goto preroll_failed;
3146
3147     basesink->priv->call_preroll = FALSE;
3148   }
3149
3150   /* commit state */
3151   if (G_LIKELY (basesink->playing_async)) {
3152     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3153       goto stopping;
3154   }
3155
3156   return GST_FLOW_OK;
3157
3158   /* ERRORS */
3159 preroll_failed:
3160   {
3161     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3162     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3163     return ret;
3164   }
3165 stopping:
3166   {
3167     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3168     return GST_FLOW_WRONG_STATE;
3169   }
3170 }
3171
3172 /* with STREAM_LOCK, PREROLL_LOCK
3173  *
3174  * Queue an object for rendering.
3175  * The first prerollable object queued will complete the preroll. If the
3176  * preroll queue if filled, we render all the objects in the queue.
3177  *
3178  * This function takes ownership of the object.
3179  */
3180 static GstFlowReturn
3181 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3182     guint8 obj_type, gpointer obj, gboolean prerollable)
3183 {
3184   GstFlowReturn ret = GST_FLOW_OK;
3185   gint length;
3186   GQueue *q;
3187
3188   if (G_UNLIKELY (basesink->need_preroll)) {
3189     if (G_LIKELY (prerollable))
3190       basesink->preroll_queued++;
3191
3192     length = basesink->preroll_queued;
3193
3194     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3195
3196     /* first prerollable item needs to finish the preroll */
3197     if (length == 1) {
3198       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3199       if (G_UNLIKELY (ret != GST_FLOW_OK))
3200         goto preroll_failed;
3201     }
3202     /* need to recheck if we need preroll, commmit state during preroll
3203      * could have made us not need more preroll. */
3204     if (G_UNLIKELY (basesink->need_preroll)) {
3205       /* see if we can render now, if we can't add the object to the preroll
3206        * queue. */
3207       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3208         goto more_preroll;
3209     }
3210   }
3211   /* we can start rendering (or blocking) the queued object
3212    * if any. */
3213   q = basesink->preroll_queue;
3214   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3215     GstMiniObject *o;
3216     guint8 ot;
3217
3218     o = g_queue_pop_head (q);
3219     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3220
3221     ot = get_object_type (o);
3222
3223     /* do something with the return value */
3224     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3225     if (ret != GST_FLOW_OK)
3226       goto dequeue_failed;
3227   }
3228
3229   /* now render the object */
3230   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3231   basesink->preroll_queued = 0;
3232
3233   return ret;
3234
3235   /* special cases */
3236 preroll_failed:
3237   {
3238     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3239         gst_flow_get_name (ret));
3240     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3241     return ret;
3242   }
3243 more_preroll:
3244   {
3245     /* add object to the queue and return */
3246     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3247         length, basesink->preroll_queue_max_len);
3248     g_queue_push_tail (basesink->preroll_queue, obj);
3249     return GST_FLOW_OK;
3250   }
3251 dequeue_failed:
3252   {
3253     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3254         gst_flow_get_name (ret));
3255     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3256     return ret;
3257   }
3258 }
3259
3260 /* with STREAM_LOCK
3261  *
3262  * This function grabs the PREROLL_LOCK and adds the object to
3263  * the queue.
3264  *
3265  * This function takes ownership of obj.
3266  *
3267  * Note: Only GstEvent seem to be passed to this private method
3268  */
3269 static GstFlowReturn
3270 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3271     GstMiniObject * obj, gboolean prerollable)
3272 {
3273   GstFlowReturn ret;
3274
3275   GST_BASE_SINK_PREROLL_LOCK (basesink);
3276   if (G_UNLIKELY (basesink->flushing))
3277     goto flushing;
3278
3279   if (G_UNLIKELY (basesink->priv->received_eos))
3280     goto was_eos;
3281
3282   ret =
3283       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3284       prerollable);
3285   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3286
3287   return ret;
3288
3289   /* ERRORS */
3290 flushing:
3291   {
3292     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3293     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3294     gst_mini_object_unref (obj);
3295     return GST_FLOW_WRONG_STATE;
3296   }
3297 was_eos:
3298   {
3299     GST_DEBUG_OBJECT (basesink,
3300         "we are EOS, dropping object, return UNEXPECTED");
3301     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3302     gst_mini_object_unref (obj);
3303     return GST_FLOW_UNEXPECTED;
3304   }
3305 }
3306
3307 static void
3308 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3309 {
3310   /* make sure we are not blocked on the clock also clear any pending
3311    * eos state. */
3312   gst_base_sink_set_flushing (basesink, pad, TRUE);
3313
3314   /* we grab the stream lock but that is not needed since setting the
3315    * sink to flushing would make sure no state commit is being done
3316    * anymore */
3317   GST_PAD_STREAM_LOCK (pad);
3318   gst_base_sink_reset_qos (basesink);
3319   /* and we need to commit our state again on the next
3320    * prerolled buffer */
3321   basesink->playing_async = TRUE;
3322   if (basesink->priv->async_enabled) {
3323     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3324   } else {
3325     basesink->priv->have_latency = TRUE;
3326   }
3327   gst_base_sink_set_last_buffer (basesink, NULL);
3328   GST_PAD_STREAM_UNLOCK (pad);
3329 }
3330
3331 static void
3332 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3333 {
3334   /* unset flushing so we can accept new data, this also flushes out any EOS
3335    * event. */
3336   gst_base_sink_set_flushing (basesink, pad, FALSE);
3337
3338   /* for position reporting */
3339   GST_OBJECT_LOCK (basesink);
3340   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3341   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3342   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3343   basesink->priv->call_preroll = TRUE;
3344   basesink->priv->current_step.valid = FALSE;
3345   basesink->priv->pending_step.valid = FALSE;
3346   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3347     /* we need new segment info after the flush. */
3348     basesink->have_newsegment = FALSE;
3349     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3350     gst_segment_init (basesink->clip_segment, GST_FORMAT_UNDEFINED);
3351   }
3352   GST_OBJECT_UNLOCK (basesink);
3353 }
3354
3355 static gboolean
3356 gst_base_sink_event (GstPad * pad, GstEvent * event)
3357 {
3358   GstBaseSink *basesink;
3359   gboolean result = TRUE;
3360   GstBaseSinkClass *bclass;
3361
3362   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3363   if (G_UNLIKELY (basesink == NULL)) {
3364     gst_event_unref (event);
3365     return FALSE;
3366   }
3367
3368   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3369
3370   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3371       event);
3372
3373   switch (GST_EVENT_TYPE (event)) {
3374     case GST_EVENT_EOS:
3375     {
3376       GstFlowReturn ret;
3377
3378       GST_BASE_SINK_PREROLL_LOCK (basesink);
3379       if (G_UNLIKELY (basesink->flushing))
3380         goto flushing;
3381
3382       if (G_UNLIKELY (basesink->priv->received_eos)) {
3383         /* we can't accept anything when we are EOS */
3384         result = FALSE;
3385         gst_event_unref (event);
3386       } else {
3387         /* we set the received EOS flag here so that we can use it when testing if
3388          * we are prerolled and to refuse more buffers. */
3389         basesink->priv->received_eos = TRUE;
3390
3391         /* EOS is a prerollable object, we call the unlocked version because it
3392          * does not check the received_eos flag. */
3393         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3394             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3395         if (G_UNLIKELY (ret != GST_FLOW_OK))
3396           result = FALSE;
3397       }
3398       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3399       break;
3400     }
3401     case GST_EVENT_NEWSEGMENT:
3402     {
3403       GstFlowReturn ret;
3404       gboolean update;
3405
3406       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3407
3408       GST_BASE_SINK_PREROLL_LOCK (basesink);
3409       if (G_UNLIKELY (basesink->flushing))
3410         goto flushing;
3411
3412       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3413           NULL, NULL);
3414
3415       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3416         /* we can't accept anything when we are EOS */
3417         result = FALSE;
3418         gst_event_unref (event);
3419       } else {
3420         /* the new segment is a non prerollable item and does not block anything,
3421          * we need to configure the current clipping segment and insert the event
3422          * in the queue to serialize it with the buffers for rendering. */
3423         gst_base_sink_configure_segment (basesink, pad, event,
3424             basesink->clip_segment);
3425
3426         ret =
3427             gst_base_sink_queue_object_unlocked (basesink, pad,
3428             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3429         if (G_UNLIKELY (ret != GST_FLOW_OK))
3430           result = FALSE;
3431         else {
3432           GST_OBJECT_LOCK (basesink);
3433           basesink->have_newsegment = TRUE;
3434           GST_OBJECT_UNLOCK (basesink);
3435         }
3436       }
3437       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3438       break;
3439     }
3440     case GST_EVENT_FLUSH_START:
3441       if (bclass->event)
3442         bclass->event (basesink, event);
3443
3444       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3445
3446       gst_base_sink_flush_start (basesink, pad);
3447
3448       gst_event_unref (event);
3449       break;
3450     case GST_EVENT_FLUSH_STOP:
3451       if (bclass->event)
3452         bclass->event (basesink, event);
3453
3454       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3455
3456       gst_base_sink_flush_stop (basesink, pad);
3457
3458       gst_event_unref (event);
3459       break;
3460     default:
3461       /* other events are sent to queue or subclass depending on if they
3462        * are serialized. */
3463       if (GST_EVENT_IS_SERIALIZED (event)) {
3464         gst_base_sink_queue_object (basesink, pad,
3465             GST_MINI_OBJECT_CAST (event), FALSE);
3466       } else {
3467         if (bclass->event)
3468           bclass->event (basesink, event);
3469         gst_event_unref (event);
3470       }
3471       break;
3472   }
3473 done:
3474   gst_object_unref (basesink);
3475
3476   return result;
3477
3478   /* ERRORS */
3479 flushing:
3480   {
3481     GST_DEBUG_OBJECT (basesink, "we are flushing");
3482     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3483     result = FALSE;
3484     gst_event_unref (event);
3485     goto done;
3486   }
3487 }
3488
3489 /* default implementation to calculate the start and end
3490  * timestamps on a buffer, subclasses can override
3491  */
3492 static void
3493 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3494     GstClockTime * start, GstClockTime * end)
3495 {
3496   GstClockTime timestamp, duration;
3497
3498   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3499   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3500
3501     /* get duration to calculate end time */
3502     duration = GST_BUFFER_DURATION (buffer);
3503     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3504       *end = timestamp + duration;
3505     }
3506     *start = timestamp;
3507   }
3508 }
3509
3510 /* must be called with PREROLL_LOCK */
3511 static gboolean
3512 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3513 {
3514   gboolean is_prerolled, res;
3515
3516   /* we have 2 cases where the PREROLL_LOCK is released:
3517    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3518    *  2) we are syncing on the clock
3519    */
3520   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3521   res = !is_prerolled;
3522
3523   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3524       basesink->have_preroll, basesink->priv->received_eos, res);
3525
3526   return res;
3527 }
3528
3529 /* with STREAM_LOCK, PREROLL_LOCK
3530  *
3531  * Takes a buffer and compare the timestamps with the last segment.
3532  * If the buffer falls outside of the segment boundaries, drop it.
3533  * Else queue the buffer for preroll and rendering.
3534  *
3535  * This function takes ownership of the buffer.
3536  */
3537 static GstFlowReturn
3538 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3539     guint8 obj_type, gpointer obj)
3540 {
3541   GstBaseSinkClass *bclass;
3542   GstFlowReturn result;
3543   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3544   GstSegment *clip_segment;
3545   GstBuffer *time_buf;
3546
3547   if (G_UNLIKELY (basesink->flushing))
3548     goto flushing;
3549
3550   if (G_UNLIKELY (basesink->priv->received_eos))
3551     goto was_eos;
3552
3553   if (OBJ_IS_BUFFERLIST (obj_type)) {
3554     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
3555     g_assert (NULL != time_buf);
3556   } else {
3557     time_buf = GST_BUFFER_CAST (obj);
3558   }
3559
3560   /* for code clarity */
3561   clip_segment = basesink->clip_segment;
3562
3563   if (G_UNLIKELY (!basesink->have_newsegment)) {
3564     gboolean sync;
3565
3566     sync = gst_base_sink_get_sync (basesink);
3567     if (sync) {
3568       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3569           (_("Internal data flow problem.")),
3570           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3571     }
3572
3573     /* this means this sink will assume timestamps start from 0 */
3574     GST_OBJECT_LOCK (basesink);
3575     clip_segment->start = 0;
3576     clip_segment->stop = -1;
3577     basesink->segment.start = 0;
3578     basesink->segment.stop = -1;
3579     basesink->have_newsegment = TRUE;
3580     GST_OBJECT_UNLOCK (basesink);
3581   }
3582
3583   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3584
3585   /* check if the buffer needs to be dropped, we first ask the subclass for the
3586    * start and end */
3587   if (bclass->get_times)
3588     bclass->get_times (basesink, time_buf, &start, &end);
3589
3590   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3591     /* if the subclass does not want sync, we use our own values so that we at
3592      * least clip the buffer to the segment */
3593     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3594   }
3595
3596   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3597       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3598
3599   /* a dropped buffer does not participate in anything */
3600   if (GST_CLOCK_TIME_IS_VALID (start) &&
3601       (clip_segment->format == GST_FORMAT_TIME)) {
3602     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3603                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3604       goto out_of_segment;
3605   }
3606
3607   /* now we can process the buffer in the queue, this function takes ownership
3608    * of the buffer */
3609   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3610       obj_type, obj, TRUE);
3611   return result;
3612
3613   /* ERRORS */
3614 flushing:
3615   {
3616     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3617     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3618     return GST_FLOW_WRONG_STATE;
3619   }
3620 was_eos:
3621   {
3622     GST_DEBUG_OBJECT (basesink,
3623         "we are EOS, dropping object, return UNEXPECTED");
3624     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3625     return GST_FLOW_UNEXPECTED;
3626   }
3627 out_of_segment:
3628   {
3629     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3630     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3631     return GST_FLOW_OK;
3632   }
3633 }
3634
3635 /* with STREAM_LOCK
3636  */
3637 static GstFlowReturn
3638 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3639     guint8 obj_type, gpointer obj)
3640 {
3641   GstFlowReturn result;
3642
3643   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3644     goto wrong_mode;
3645
3646   GST_BASE_SINK_PREROLL_LOCK (basesink);
3647   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3648   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3649
3650 done:
3651   return result;
3652
3653   /* ERRORS */
3654 wrong_mode:
3655   {
3656     GST_OBJECT_LOCK (pad);
3657     GST_WARNING_OBJECT (basesink,
3658         "Push on pad %s:%s, but it was not activated in push mode",
3659         GST_DEBUG_PAD_NAME (pad));
3660     GST_OBJECT_UNLOCK (pad);
3661     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3662     /* we don't post an error message this will signal to the peer
3663      * pushing that EOS is reached. */
3664     result = GST_FLOW_UNEXPECTED;
3665     goto done;
3666   }
3667 }
3668
3669 static GstFlowReturn
3670 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3671 {
3672   GstBaseSink *basesink;
3673
3674   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3675
3676   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3677 }
3678
3679 static GstFlowReturn
3680 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3681 {
3682   GstBaseSink *basesink;
3683   GstBaseSinkClass *bclass;
3684   GstFlowReturn result;
3685
3686   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3687   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3688
3689   if (G_LIKELY (bclass->render_list)) {
3690     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3691   } else {
3692     guint i, len;
3693     GstBuffer *buffer;
3694
3695     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3696
3697     len = gst_buffer_list_len (list);
3698
3699     result = GST_FLOW_OK;
3700     for (i = 0; i < len; i++) {
3701       buffer = gst_buffer_list_get (list, 0);
3702       result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3703           gst_buffer_ref (buffer));
3704       if (result != GST_FLOW_OK)
3705         break;
3706     }
3707     gst_buffer_list_unref (list);
3708   }
3709   return result;
3710 }
3711
3712
3713 static gboolean
3714 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3715 {
3716   gboolean res = TRUE;
3717
3718   /* update our offset if the start/stop position was updated */
3719   if (segment->format == GST_FORMAT_BYTES) {
3720     segment->time = segment->start;
3721   } else if (segment->start == 0) {
3722     /* seek to start, we can implement a default for this. */
3723     segment->time = 0;
3724   } else {
3725     res = FALSE;
3726     GST_INFO_OBJECT (sink, "Can't do a default seek");
3727   }
3728
3729   return res;
3730 }
3731
3732 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3733
3734 static gboolean
3735 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3736     GstEvent * event, GstSegment * segment)
3737 {
3738   /* By default, we try one of 2 things:
3739    *   - For absolute seek positions, convert the requested position to our
3740    *     configured processing format and place it in the output segment \
3741    *   - For relative seek positions, convert our current (input) values to the
3742    *     seek format, adjust by the relative seek offset and then convert back to
3743    *     the processing format
3744    */
3745   GstSeekType cur_type, stop_type;
3746   gint64 cur, stop;
3747   GstSeekFlags flags;
3748   GstFormat seek_format, dest_format;
3749   gdouble rate;
3750   gboolean update;
3751   gboolean res = TRUE;
3752
3753   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3754       &cur_type, &cur, &stop_type, &stop);
3755   dest_format = segment->format;
3756
3757   if (seek_format == dest_format) {
3758     gst_segment_set_seek (segment, rate, seek_format, flags,
3759         cur_type, cur, stop_type, stop, &update);
3760     return TRUE;
3761   }
3762
3763   if (cur_type != GST_SEEK_TYPE_NONE) {
3764     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3765     res =
3766         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3767         &cur);
3768     cur_type = GST_SEEK_TYPE_SET;
3769   }
3770
3771   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3772     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3773     res =
3774         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3775         &stop);
3776     stop_type = GST_SEEK_TYPE_SET;
3777   }
3778
3779   /* And finally, configure our output segment in the desired format */
3780   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3781       stop_type, stop, &update);
3782
3783   if (!res)
3784     goto no_format;
3785
3786   return res;
3787
3788 no_format:
3789   {
3790     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3791     return FALSE;
3792   }
3793 }
3794
3795 /* perform a seek, only executed in pull mode */
3796 static gboolean
3797 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3798 {
3799   gboolean flush;
3800   gdouble rate;
3801   GstFormat seek_format, dest_format;
3802   GstSeekFlags flags;
3803   GstSeekType cur_type, stop_type;
3804   gboolean seekseg_configured = FALSE;
3805   gint64 cur, stop;
3806   gboolean update, res = TRUE;
3807   GstSegment seeksegment;
3808
3809   dest_format = sink->segment.format;
3810
3811   if (event) {
3812     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3813     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3814         &cur_type, &cur, &stop_type, &stop);
3815
3816     flush = flags & GST_SEEK_FLAG_FLUSH;
3817   } else {
3818     GST_DEBUG_OBJECT (sink, "performing seek without event");
3819     flush = FALSE;
3820   }
3821
3822   if (flush) {
3823     GST_DEBUG_OBJECT (sink, "flushing upstream");
3824     gst_pad_push_event (pad, gst_event_new_flush_start ());
3825     gst_base_sink_flush_start (sink, pad);
3826   } else {
3827     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3828   }
3829
3830   GST_PAD_STREAM_LOCK (pad);
3831
3832   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3833    * copy the current segment info into the temp segment that we can actually
3834    * attempt the seek with. We only update the real segment if the seek suceeds. */
3835   if (!seekseg_configured) {
3836     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3837
3838     /* now configure the final seek segment */
3839     if (event) {
3840       if (sink->segment.format != seek_format) {
3841         /* OK, here's where we give the subclass a chance to convert the relative
3842          * seek into an absolute one in the processing format. We set up any
3843          * absolute seek above, before taking the stream lock. */
3844         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3845                 &seeksegment)) {
3846           GST_DEBUG_OBJECT (sink,
3847               "Preparing the seek failed after flushing. " "Aborting seek");
3848           res = FALSE;
3849         }
3850       } else {
3851         /* The seek format matches our processing format, no need to ask the
3852          * the subclass to configure the segment. */
3853         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3854             cur_type, cur, stop_type, stop, &update);
3855       }
3856     }
3857     /* Else, no seek event passed, so we're just (re)starting the
3858        current segment. */
3859   }
3860
3861   if (res) {
3862     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3863         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3864         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3865
3866     /* do the seek, segment.last_stop contains the new position. */
3867     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3868   }
3869
3870
3871   if (flush) {
3872     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3873     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3874     gst_base_sink_flush_stop (sink, pad);
3875   } else if (res && sink->running) {
3876     /* we are running the current segment and doing a non-flushing seek,
3877      * close the segment first based on the last_stop. */
3878     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3879         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3880   }
3881
3882   /* The subclass must have converted the segment to the processing format
3883    * by now */
3884   if (res && seeksegment.format != dest_format) {
3885     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3886         "in the correct format. Aborting seek.");
3887     res = FALSE;
3888   }
3889
3890   /* if successfull seek, we update our real segment and push
3891    * out the new segment. */
3892   if (res) {
3893     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3894
3895     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3896       gst_element_post_message (GST_ELEMENT (sink),
3897           gst_message_new_segment_start (GST_OBJECT (sink),
3898               sink->segment.format, sink->segment.last_stop));
3899     }
3900   }
3901
3902   sink->priv->discont = TRUE;
3903   sink->running = TRUE;
3904
3905   GST_PAD_STREAM_UNLOCK (pad);
3906
3907   return res;
3908 }
3909
3910 static void
3911 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3912     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3913     gboolean flush, gboolean intermediate)
3914 {
3915   GST_OBJECT_LOCK (sink);
3916   pending->seqnum = seqnum;
3917   pending->format = format;
3918   pending->amount = amount;
3919   pending->position = 0;
3920   pending->rate = rate;
3921   pending->flush = flush;
3922   pending->intermediate = intermediate;
3923   pending->valid = TRUE;
3924   /* flush invalidates the current stepping segment */
3925   if (flush)
3926     current->valid = FALSE;
3927   GST_OBJECT_UNLOCK (sink);
3928 }
3929
3930 static gboolean
3931 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3932 {
3933   GstBaseSinkPrivate *priv;
3934   GstBaseSinkClass *bclass;
3935   gboolean flush, intermediate;
3936   gdouble rate;
3937   GstFormat format;
3938   guint64 amount;
3939   guint seqnum;
3940   GstStepInfo *pending, *current;
3941   GstMessage *message;
3942
3943   bclass = GST_BASE_SINK_GET_CLASS (sink);
3944   priv = sink->priv;
3945
3946   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3947
3948   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3949   seqnum = gst_event_get_seqnum (event);
3950
3951   pending = &priv->pending_step;
3952   current = &priv->current_step;
3953
3954   /* post message first */
3955   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3956       amount, rate, flush, intermediate);
3957   gst_message_set_seqnum (message, seqnum);
3958   gst_element_post_message (GST_ELEMENT (sink), message);
3959
3960   if (flush) {
3961     /* we need to call ::unlock before locking PREROLL_LOCK
3962      * since we lock it before going into ::render */
3963     if (bclass->unlock)
3964       bclass->unlock (sink);
3965
3966     GST_BASE_SINK_PREROLL_LOCK (sink);
3967     /* now that we have the PREROLL lock, clear our unlock request */
3968     if (bclass->unlock_stop)
3969       bclass->unlock_stop (sink);
3970
3971     /* update the stepinfo and make it valid */
3972     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3973         intermediate);
3974
3975     if (sink->priv->async_enabled) {
3976       /* and we need to commit our state again on the next
3977        * prerolled buffer */
3978       sink->playing_async = TRUE;
3979       priv->pending_step.need_preroll = TRUE;
3980       sink->need_preroll = FALSE;
3981       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3982     } else {
3983       sink->priv->have_latency = TRUE;
3984       sink->need_preroll = FALSE;
3985     }
3986     priv->current_sstart = GST_CLOCK_TIME_NONE;
3987     priv->current_sstop = GST_CLOCK_TIME_NONE;
3988     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3989     priv->call_preroll = TRUE;
3990     gst_base_sink_set_last_buffer (sink, NULL);
3991     gst_base_sink_reset_qos (sink);
3992
3993     if (sink->clock_id) {
3994       gst_clock_id_unschedule (sink->clock_id);
3995     }
3996
3997     if (sink->have_preroll) {
3998       GST_DEBUG_OBJECT (sink, "signal waiter");
3999       priv->step_unlock = TRUE;
4000       GST_BASE_SINK_PREROLL_SIGNAL (sink);
4001     }
4002     GST_BASE_SINK_PREROLL_UNLOCK (sink);
4003   } else {
4004     /* update the stepinfo and make it valid */
4005     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4006         intermediate);
4007   }
4008
4009   return TRUE;
4010 }
4011
4012 /* with STREAM_LOCK
4013  */
4014 static void
4015 gst_base_sink_loop (GstPad * pad)
4016 {
4017   GstBaseSink *basesink;
4018   GstBuffer *buf = NULL;
4019   GstFlowReturn result;
4020   guint blocksize;
4021   guint64 offset;
4022
4023   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4024
4025   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4026
4027   if ((blocksize = basesink->priv->blocksize) == 0)
4028     blocksize = -1;
4029
4030   offset = basesink->segment.last_stop;
4031
4032   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4033       offset, blocksize);
4034
4035   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4036   if (G_UNLIKELY (result != GST_FLOW_OK))
4037     goto paused;
4038
4039   if (G_UNLIKELY (buf == NULL))
4040     goto no_buffer;
4041
4042   offset += gst_buffer_get_size (buf);
4043
4044   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4045
4046   GST_BASE_SINK_PREROLL_LOCK (basesink);
4047   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4048   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4049   if (G_UNLIKELY (result != GST_FLOW_OK))
4050     goto paused;
4051
4052   return;
4053
4054   /* ERRORS */
4055 paused:
4056   {
4057     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4058         gst_flow_get_name (result));
4059     gst_pad_pause_task (pad);
4060     if (result == GST_FLOW_UNEXPECTED) {
4061       /* perform EOS logic */
4062       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4063         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4064             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4065                 basesink->segment.format, basesink->segment.last_stop));
4066       } else {
4067         gst_base_sink_event (pad, gst_event_new_eos ());
4068       }
4069     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4070       /* for fatal errors we post an error message, post the error
4071        * first so the app knows about the error first. 
4072        * wrong-state is not a fatal error because it happens due to
4073        * flushing and posting an error message in that case is the
4074        * wrong thing to do, e.g. when basesrc is doing a flushing
4075        * seek. */
4076       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4077           (_("Internal data stream error.")),
4078           ("stream stopped, reason %s", gst_flow_get_name (result)));
4079       gst_base_sink_event (pad, gst_event_new_eos ());
4080     }
4081     return;
4082   }
4083 no_buffer:
4084   {
4085     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4086     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4087         (_("Internal data flow error.")), ("element returned NULL buffer"));
4088     result = GST_FLOW_ERROR;
4089     goto paused;
4090   }
4091 }
4092
4093 static gboolean
4094 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4095     gboolean flushing)
4096 {
4097   GstBaseSinkClass *bclass;
4098
4099   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4100
4101   if (flushing) {
4102     /* unlock any subclasses, we need to do this before grabbing the
4103      * PREROLL_LOCK since we hold this lock before going into ::render. */
4104     if (bclass->unlock)
4105       bclass->unlock (basesink);
4106   }
4107
4108   GST_BASE_SINK_PREROLL_LOCK (basesink);
4109   basesink->flushing = flushing;
4110   if (flushing) {
4111     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4112     if (bclass->unlock_stop)
4113       bclass->unlock_stop (basesink);
4114
4115     /* set need_preroll before we unblock the clock. If the clock is unblocked
4116      * before timing out, we can reuse the buffer for preroll. */
4117     basesink->need_preroll = TRUE;
4118
4119     /* step 2, unblock clock sync (if any) or any other blocking thing */
4120     if (basesink->clock_id) {
4121       gst_clock_id_unschedule (basesink->clock_id);
4122     }
4123
4124     /* flush out the data thread if it's locked in finish_preroll, this will
4125      * also flush out the EOS state */
4126     GST_DEBUG_OBJECT (basesink,
4127         "flushing out data thread, need preroll to TRUE");
4128     gst_base_sink_preroll_queue_flush (basesink, pad);
4129   }
4130   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4131
4132   return TRUE;
4133 }
4134
4135 static gboolean
4136 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4137 {
4138   gboolean result;
4139
4140   if (active) {
4141     /* start task */
4142     result = gst_pad_start_task (basesink->sinkpad,
4143         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4144   } else {
4145     /* step 2, make sure streaming finishes */
4146     result = gst_pad_stop_task (basesink->sinkpad);
4147   }
4148
4149   return result;
4150 }
4151
4152 static gboolean
4153 gst_base_sink_pad_activate (GstPad * pad)
4154 {
4155   gboolean result = FALSE;
4156   GstBaseSink *basesink;
4157
4158   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4159
4160   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4161
4162   gst_base_sink_set_flushing (basesink, pad, FALSE);
4163
4164   /* we need to have the pull mode enabled */
4165   if (!basesink->can_activate_pull) {
4166     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4167     goto fallback;
4168   }
4169
4170   /* check if downstreams supports pull mode at all */
4171   if (!gst_pad_check_pull_range (pad)) {
4172     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4173     goto fallback;
4174   }
4175
4176   /* set the pad mode before starting the task so that it's in the
4177    * correct state for the new thread. also the sink set_caps and get_caps
4178    * function checks this */
4179   basesink->pad_mode = GST_ACTIVATE_PULL;
4180
4181   /* we first try to negotiate a format so that when we try to activate
4182    * downstream, it knows about our format */
4183   if (!gst_base_sink_negotiate_pull (basesink)) {
4184     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4185     goto fallback;
4186   }
4187
4188   /* ok activate now */
4189   if (!gst_pad_activate_pull (pad, TRUE)) {
4190     /* clear any pending caps */
4191     GST_OBJECT_LOCK (basesink);
4192     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4193     GST_OBJECT_UNLOCK (basesink);
4194     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4195     goto fallback;
4196   }
4197
4198   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4199   result = TRUE;
4200   goto done;
4201
4202   /* push mode fallback */
4203 fallback:
4204   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4205   if ((result = gst_pad_activate_push (pad, TRUE))) {
4206     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4207   }
4208
4209 done:
4210   if (!result) {
4211     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4212     gst_base_sink_set_flushing (basesink, pad, TRUE);
4213   }
4214
4215   gst_object_unref (basesink);
4216
4217   return result;
4218 }
4219
4220 static gboolean
4221 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4222 {
4223   gboolean result;
4224   GstBaseSink *basesink;
4225
4226   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4227
4228   if (active) {
4229     if (!basesink->can_activate_push) {
4230       result = FALSE;
4231       basesink->pad_mode = GST_ACTIVATE_NONE;
4232     } else {
4233       result = TRUE;
4234       basesink->pad_mode = GST_ACTIVATE_PUSH;
4235     }
4236   } else {
4237     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4238       g_warning ("Internal GStreamer activation error!!!");
4239       result = FALSE;
4240     } else {
4241       gst_base_sink_set_flushing (basesink, pad, TRUE);
4242       result = TRUE;
4243       basesink->pad_mode = GST_ACTIVATE_NONE;
4244     }
4245   }
4246
4247   gst_object_unref (basesink);
4248
4249   return result;
4250 }
4251
4252 static gboolean
4253 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4254 {
4255   GstCaps *caps;
4256   gboolean result;
4257
4258   result = FALSE;
4259
4260   /* this returns the intersection between our caps and the peer caps. If there
4261    * is no peer, it returns NULL and we can't operate in pull mode so we can
4262    * fail the negotiation. */
4263   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4264   if (caps == NULL || gst_caps_is_empty (caps))
4265     goto no_caps_possible;
4266
4267   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4268
4269   caps = gst_caps_make_writable (caps);
4270   /* get the first (prefered) format */
4271   gst_caps_truncate (caps);
4272
4273   GST_DEBUG_OBJECT (basesink, "have caps: %" GST_PTR_FORMAT, caps);
4274
4275   if (gst_caps_is_any (caps)) {
4276     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4277         "allowing pull()");
4278     /* neither side has template caps in this case, so they are prepared for
4279        pull() without setcaps() */
4280     result = TRUE;
4281   } else {
4282     /* try to fixate */
4283     gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4284     GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4285
4286     if (gst_caps_is_fixed (caps)) {
4287       if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4288         goto could_not_set_caps;
4289
4290       GST_OBJECT_LOCK (basesink);
4291       gst_caps_replace (&basesink->priv->pull_caps, caps);
4292       GST_OBJECT_UNLOCK (basesink);
4293
4294       result = TRUE;
4295     }
4296   }
4297
4298   gst_caps_unref (caps);
4299
4300   return result;
4301
4302 no_caps_possible:
4303   {
4304     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4305     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4306     if (caps)
4307       gst_caps_unref (caps);
4308     return FALSE;
4309   }
4310 could_not_set_caps:
4311   {
4312     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4313     gst_caps_unref (caps);
4314     return FALSE;
4315   }
4316 }
4317
4318 /* this won't get called until we implement an activate function */
4319 static gboolean
4320 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4321 {
4322   gboolean result = FALSE;
4323   GstBaseSink *basesink;
4324   GstBaseSinkClass *bclass;
4325
4326   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4327   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4328
4329   if (active) {
4330     GstFormat format;
4331     gint64 duration;
4332
4333     /* we mark we have a newsegment here because pull based
4334      * mode works just fine without having a newsegment before the
4335      * first buffer */
4336     format = GST_FORMAT_BYTES;
4337
4338     gst_segment_init (&basesink->segment, format);
4339     gst_segment_init (basesink->clip_segment, format);
4340     GST_OBJECT_LOCK (basesink);
4341     basesink->have_newsegment = TRUE;
4342     GST_OBJECT_UNLOCK (basesink);
4343
4344     /* get the peer duration in bytes */
4345     result = gst_pad_query_peer_duration (pad, &format, &duration);
4346     if (result) {
4347       GST_DEBUG_OBJECT (basesink,
4348           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4349       gst_segment_set_duration (basesink->clip_segment, format, duration);
4350       gst_segment_set_duration (&basesink->segment, format, duration);
4351     } else {
4352       GST_DEBUG_OBJECT (basesink, "unknown duration");
4353     }
4354
4355     if (bclass->activate_pull)
4356       result = bclass->activate_pull (basesink, TRUE);
4357     else
4358       result = FALSE;
4359
4360     if (!result)
4361       goto activate_failed;
4362
4363   } else {
4364     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4365       g_warning ("Internal GStreamer activation error!!!");
4366       result = FALSE;
4367     } else {
4368       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4369       if (bclass->activate_pull)
4370         result &= bclass->activate_pull (basesink, FALSE);
4371       basesink->pad_mode = GST_ACTIVATE_NONE;
4372       /* clear any pending caps */
4373       GST_OBJECT_LOCK (basesink);
4374       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4375       GST_OBJECT_UNLOCK (basesink);
4376     }
4377   }
4378   gst_object_unref (basesink);
4379
4380   return result;
4381
4382   /* ERRORS */
4383 activate_failed:
4384   {
4385     /* reset, as starting the thread failed */
4386     basesink->pad_mode = GST_ACTIVATE_NONE;
4387
4388     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4389     return FALSE;
4390   }
4391 }
4392
4393 /* send an event to our sinkpad peer. */
4394 static gboolean
4395 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4396 {
4397   GstPad *pad;
4398   GstBaseSink *basesink = GST_BASE_SINK (element);
4399   gboolean forward, result = TRUE;
4400   GstActivateMode mode;
4401
4402   GST_OBJECT_LOCK (element);
4403   /* get the pad and the scheduling mode */
4404   pad = gst_object_ref (basesink->sinkpad);
4405   mode = basesink->pad_mode;
4406   GST_OBJECT_UNLOCK (element);
4407
4408   /* only push UPSTREAM events upstream */
4409   forward = GST_EVENT_IS_UPSTREAM (event);
4410
4411   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4412       event);
4413
4414   switch (GST_EVENT_TYPE (event)) {
4415     case GST_EVENT_LATENCY:
4416     {
4417       GstClockTime latency;
4418
4419       gst_event_parse_latency (event, &latency);
4420
4421       /* store the latency. We use this to adjust the running_time before syncing
4422        * it to the clock. */
4423       GST_OBJECT_LOCK (element);
4424       basesink->priv->latency = latency;
4425       if (!basesink->priv->have_latency)
4426         forward = FALSE;
4427       GST_OBJECT_UNLOCK (element);
4428       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4429           GST_TIME_ARGS (latency));
4430
4431       /* We forward this event so that all elements know about the global pipeline
4432        * latency. This is interesting for an element when it wants to figure out
4433        * when a particular piece of data will be rendered. */
4434       break;
4435     }
4436     case GST_EVENT_SEEK:
4437       /* in pull mode we will execute the seek */
4438       if (mode == GST_ACTIVATE_PULL)
4439         result = gst_base_sink_perform_seek (basesink, pad, event);
4440       break;
4441     case GST_EVENT_STEP:
4442       result = gst_base_sink_perform_step (basesink, pad, event);
4443       forward = FALSE;
4444       break;
4445     default:
4446       break;
4447   }
4448
4449   if (forward) {
4450     result = gst_pad_push_event (pad, event);
4451   } else {
4452     /* not forwarded, unref the event */
4453     gst_event_unref (event);
4454   }
4455
4456   gst_object_unref (pad);
4457   return result;
4458 }
4459
4460 static gboolean
4461 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4462     gint64 * cur, gboolean * upstream)
4463 {
4464   GstClock *clock = NULL;
4465   gboolean res = FALSE;
4466   GstFormat oformat, tformat;
4467   GstSegment *segment;
4468   GstClockTime now, latency;
4469   GstClockTimeDiff base;
4470   gint64 time, accum, duration;
4471   gdouble rate;
4472   gint64 last;
4473   gboolean last_seen, with_clock, in_paused;
4474
4475   GST_OBJECT_LOCK (basesink);
4476   /* we can only get the segment when we are not NULL or READY */
4477   if (!basesink->have_newsegment)
4478     goto wrong_state;
4479
4480   in_paused = FALSE;
4481   /* when not in PLAYING or when we're busy with a state change, we
4482    * cannot read from the clock so we report time based on the
4483    * last seen timestamp. */
4484   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4485       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4486     in_paused = TRUE;
4487   }
4488
4489   /* we don't use the clip segment in pull mode, when seeking we update the
4490    * main segment directly with the new segment values without it having to be
4491    * activated by the rendering after preroll */
4492   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4493     segment = basesink->clip_segment;
4494   else
4495     segment = &basesink->segment;
4496
4497   /* our intermediate time format */
4498   tformat = GST_FORMAT_TIME;
4499   /* get the format in the segment */
4500   oformat = segment->format;
4501
4502   /* report with last seen position when EOS */
4503   last_seen = basesink->eos;
4504
4505   /* assume we will use the clock for getting the current position */
4506   with_clock = TRUE;
4507   if (basesink->sync == FALSE)
4508     with_clock = FALSE;
4509
4510   /* and we need a clock */
4511   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4512     with_clock = FALSE;
4513   else
4514     gst_object_ref (clock);
4515
4516   /* collect all data we need holding the lock */
4517   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4518     time = segment->time;
4519   else
4520     time = 0;
4521
4522   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4523     duration = segment->stop - segment->start;
4524   else
4525     duration = 0;
4526
4527   accum = segment->accum;
4528   rate = segment->rate * segment->applied_rate;
4529   latency = basesink->priv->latency;
4530
4531   if (oformat == GST_FORMAT_TIME) {
4532     gint64 start, stop;
4533
4534     start = basesink->priv->current_sstart;
4535     stop = basesink->priv->current_sstop;
4536
4537     if (in_paused) {
4538       /* in paused we use the last position as a lower bound */
4539       if (stop == -1 || segment->rate > 0.0)
4540         last = start;
4541       else
4542         last = stop;
4543     } else {
4544       /* in playing, use last stop time as upper bound */
4545       if (start == -1 || segment->rate > 0.0)
4546         last = stop;
4547       else
4548         last = start;
4549     }
4550   } else {
4551     /* convert last stop to stream time */
4552     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4553   }
4554
4555   if (in_paused) {
4556     /* in paused, use start_time */
4557     base = GST_ELEMENT_START_TIME (basesink);
4558     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4559         GST_TIME_ARGS (base));
4560   } else if (with_clock) {
4561     /* else use clock when needed */
4562     base = GST_ELEMENT_CAST (basesink)->base_time;
4563     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4564         GST_TIME_ARGS (base));
4565   } else {
4566     /* else, no sync or clock -> no base time */
4567     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4568     base = -1;
4569   }
4570
4571   /* no base, we can't calculate running_time, use last seem timestamp to report
4572    * time */
4573   if (base == -1)
4574     last_seen = TRUE;
4575
4576   /* need to release the object lock before we can get the time,
4577    * a clock might take the LOCK of the provider, which could be
4578    * a basesink subclass. */
4579   GST_OBJECT_UNLOCK (basesink);
4580
4581   if (last_seen) {
4582     /* in EOS or when no valid stream_time, report the value of last seen
4583      * timestamp */
4584     if (last == -1) {
4585       /* no timestamp, we need to ask upstream */
4586       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4587       res = FALSE;
4588       *upstream = TRUE;
4589       goto done;
4590     }
4591     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4592         GST_TIME_ARGS (last));
4593     *cur = last;
4594   } else {
4595     if (oformat != tformat) {
4596       /* convert accum, time and duration to time */
4597       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4598               &accum))
4599         goto convert_failed;
4600       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4601               &tformat, &duration))
4602         goto convert_failed;
4603       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4604               &time))
4605         goto convert_failed;
4606       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4607               &last))
4608         goto convert_failed;
4609
4610       /* assume time format from now on */
4611       oformat = tformat;
4612     }
4613
4614     if (!in_paused && with_clock) {
4615       now = gst_clock_get_time (clock);
4616     } else {
4617       now = base;
4618       base = 0;
4619     }
4620
4621     /* subtract base time and accumulated time from the clock time.
4622      * Make sure we don't go negative. This is the current time in
4623      * the segment which we need to scale with the combined
4624      * rate and applied rate. */
4625     base += accum;
4626     base += latency;
4627     if (GST_CLOCK_DIFF (base, now) < 0)
4628       base = now;
4629
4630     /* for negative rates we need to count back from the segment
4631      * duration. */
4632     if (rate < 0.0)
4633       time += duration;
4634
4635     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4636
4637     if (in_paused) {
4638       /* never report less than segment values in paused */
4639       if (last != -1)
4640         *cur = MAX (last, *cur);
4641     } else {
4642       /* never report more than last seen position in playing */
4643       if (last != -1)
4644         *cur = MIN (last, *cur);
4645     }
4646
4647     GST_DEBUG_OBJECT (basesink,
4648         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4649         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4650         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4651         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4652   }
4653
4654   if (oformat != format) {
4655     /* convert to final format */
4656     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4657       goto convert_failed;
4658   }
4659
4660   res = TRUE;
4661
4662 done:
4663   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4664       res, GST_TIME_ARGS (*cur));
4665
4666   if (clock)
4667     gst_object_unref (clock);
4668
4669   return res;
4670
4671   /* special cases */
4672 wrong_state:
4673   {
4674     /* in NULL or READY we always return FALSE and -1 */
4675     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4676     res = FALSE;
4677     *cur = -1;
4678     GST_OBJECT_UNLOCK (basesink);
4679     goto done;
4680   }
4681 convert_failed:
4682   {
4683     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4684     *upstream = TRUE;
4685     res = FALSE;
4686     goto done;
4687   }
4688 }
4689
4690 static gboolean
4691 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4692     gint64 * dur, gboolean * upstream)
4693 {
4694   gboolean res = FALSE;
4695
4696   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4697     GstFormat uformat = GST_FORMAT_BYTES;
4698     gint64 uduration;
4699
4700     /* get the duration in bytes, in pull mode that's all we are sure to
4701      * know. We have to explicitly get this value from upstream instead of
4702      * using our cached value because it might change. Duration caching
4703      * should be done at a higher level. */
4704     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4705     if (res) {
4706       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4707       if (format != uformat) {
4708         /* convert to the requested format */
4709         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4710             &format, dur);
4711       } else {
4712         *dur = uduration;
4713       }
4714     }
4715     *upstream = FALSE;
4716   } else {
4717     *upstream = TRUE;
4718   }
4719
4720   return res;
4721 }
4722
4723 static const GstQueryType *
4724 gst_base_sink_get_query_types (GstElement * element)
4725 {
4726   static const GstQueryType query_types[] = {
4727     GST_QUERY_DURATION,
4728     GST_QUERY_POSITION,
4729     GST_QUERY_SEGMENT,
4730     GST_QUERY_LATENCY,
4731     0
4732   };
4733
4734   return query_types;
4735 }
4736
4737 static gboolean
4738 gst_base_sink_query (GstElement * element, GstQuery * query)
4739 {
4740   gboolean res = FALSE;
4741
4742   GstBaseSink *basesink = GST_BASE_SINK (element);
4743
4744   switch (GST_QUERY_TYPE (query)) {
4745     case GST_QUERY_POSITION:
4746     {
4747       gint64 cur = 0;
4748       GstFormat format;
4749       gboolean upstream = FALSE;
4750
4751       gst_query_parse_position (query, &format, NULL);
4752
4753       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4754           gst_format_get_name (format));
4755
4756       /* first try to get the position based on the clock */
4757       if ((res =
4758               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4759         gst_query_set_position (query, format, cur);
4760       } else if (upstream) {
4761         /* fallback to peer query */
4762         res = gst_pad_peer_query (basesink->sinkpad, query);
4763       }
4764       if (!res) {
4765         /* we can handle a few things if upstream failed */
4766         if (format == GST_FORMAT_PERCENT) {
4767           gint64 dur = 0;
4768           GstFormat uformat = GST_FORMAT_TIME;
4769
4770           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4771               &upstream);
4772           if (!res && upstream) {
4773             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4774                 &cur);
4775           }
4776           if (res) {
4777             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4778                 &upstream);
4779             if (!res && upstream) {
4780               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4781                   &dur);
4782             }
4783           }
4784           if (res) {
4785             gint64 pos;
4786
4787             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4788                 dur);
4789             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4790           }
4791         }
4792       }
4793       break;
4794     }
4795     case GST_QUERY_DURATION:
4796     {
4797       gint64 dur = 0;
4798       GstFormat format;
4799       gboolean upstream = FALSE;
4800
4801       gst_query_parse_duration (query, &format, NULL);
4802
4803       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4804           gst_format_get_name (format));
4805
4806       if ((res =
4807               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4808         gst_query_set_duration (query, format, dur);
4809       } else if (upstream) {
4810         /* fallback to peer query */
4811         res = gst_pad_peer_query (basesink->sinkpad, query);
4812       }
4813       if (!res) {
4814         /* we can handle a few things if upstream failed */
4815         if (format == GST_FORMAT_PERCENT) {
4816           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4817               GST_FORMAT_PERCENT_MAX);
4818           res = TRUE;
4819         }
4820       }
4821       break;
4822     }
4823     case GST_QUERY_LATENCY:
4824     {
4825       gboolean live, us_live;
4826       GstClockTime min, max;
4827
4828       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4829                   &max))) {
4830         gst_query_set_latency (query, live, min, max);
4831       }
4832       break;
4833     }
4834     case GST_QUERY_JITTER:
4835       break;
4836     case GST_QUERY_RATE:
4837       /* gst_query_set_rate (query, basesink->segment_rate); */
4838       res = TRUE;
4839       break;
4840     case GST_QUERY_SEGMENT:
4841     {
4842       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4843         gst_query_set_segment (query, basesink->segment.rate,
4844             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4845         res = TRUE;
4846       } else {
4847         res = gst_pad_peer_query (basesink->sinkpad, query);
4848       }
4849       break;
4850     }
4851     case GST_QUERY_SEEKING:
4852     case GST_QUERY_CONVERT:
4853     case GST_QUERY_FORMATS:
4854     default:
4855       res = gst_pad_peer_query (basesink->sinkpad, query);
4856       break;
4857   }
4858   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4859       GST_QUERY_TYPE_NAME (query), res);
4860   return res;
4861 }
4862
4863 static GstStateChangeReturn
4864 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4865 {
4866   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4867   GstBaseSink *basesink = GST_BASE_SINK (element);
4868   GstBaseSinkClass *bclass;
4869   GstBaseSinkPrivate *priv;
4870
4871   priv = basesink->priv;
4872
4873   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4874
4875   switch (transition) {
4876     case GST_STATE_CHANGE_NULL_TO_READY:
4877       if (bclass->start)
4878         if (!bclass->start (basesink))
4879           goto start_failed;
4880       break;
4881     case GST_STATE_CHANGE_READY_TO_PAUSED:
4882       /* need to complete preroll before this state change completes, there
4883        * is no data flow in READY so we can safely assume we need to preroll. */
4884       GST_BASE_SINK_PREROLL_LOCK (basesink);
4885       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4886       basesink->have_newsegment = FALSE;
4887       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4888       gst_segment_init (basesink->clip_segment, GST_FORMAT_UNDEFINED);
4889       basesink->offset = 0;
4890       basesink->have_preroll = FALSE;
4891       priv->step_unlock = FALSE;
4892       basesink->need_preroll = TRUE;
4893       basesink->playing_async = TRUE;
4894       priv->current_sstart = GST_CLOCK_TIME_NONE;
4895       priv->current_sstop = GST_CLOCK_TIME_NONE;
4896       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4897       priv->latency = 0;
4898       basesink->eos = FALSE;
4899       priv->received_eos = FALSE;
4900       gst_base_sink_reset_qos (basesink);
4901       priv->commited = FALSE;
4902       priv->call_preroll = TRUE;
4903       priv->current_step.valid = FALSE;
4904       priv->pending_step.valid = FALSE;
4905       if (priv->async_enabled) {
4906         GST_DEBUG_OBJECT (basesink, "doing async state change");
4907         /* when async enabled, post async-start message and return ASYNC from
4908          * the state change function */
4909         ret = GST_STATE_CHANGE_ASYNC;
4910         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4911             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4912       } else {
4913         priv->have_latency = TRUE;
4914       }
4915       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4916       break;
4917     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4918       GST_BASE_SINK_PREROLL_LOCK (basesink);
4919       if (!gst_base_sink_needs_preroll (basesink)) {
4920         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4921         /* no preroll needed anymore now. */
4922         basesink->playing_async = FALSE;
4923         basesink->need_preroll = FALSE;
4924         if (basesink->eos) {
4925           GstMessage *message;
4926
4927           /* need to post EOS message here */
4928           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4929           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4930           gst_message_set_seqnum (message, basesink->priv->seqnum);
4931           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4932         } else {
4933           GST_DEBUG_OBJECT (basesink, "signal preroll");
4934           GST_BASE_SINK_PREROLL_SIGNAL (basesink);
4935         }
4936       } else {
4937         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4938         basesink->need_preroll = TRUE;
4939         basesink->playing_async = TRUE;
4940         priv->call_preroll = TRUE;
4941         priv->commited = FALSE;
4942         if (priv->async_enabled) {
4943           GST_DEBUG_OBJECT (basesink, "doing async state change");
4944           ret = GST_STATE_CHANGE_ASYNC;
4945           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4946               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4947         }
4948       }
4949       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4950       break;
4951     default:
4952       break;
4953   }
4954
4955   {
4956     GstStateChangeReturn bret;
4957
4958     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4959     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4960       goto activate_failed;
4961   }
4962
4963   switch (transition) {
4964     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4965       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4966       /* FIXME, make sure we cannot enter _render first */
4967
4968       /* we need to call ::unlock before locking PREROLL_LOCK
4969        * since we lock it before going into ::render */
4970       if (bclass->unlock)
4971         bclass->unlock (basesink);
4972
4973       GST_BASE_SINK_PREROLL_LOCK (basesink);
4974       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4975       /* now that we have the PREROLL lock, clear our unlock request */
4976       if (bclass->unlock_stop)
4977         bclass->unlock_stop (basesink);
4978
4979       /* we need preroll again and we set the flag before unlocking the clockid
4980        * because if the clockid is unlocked before a current buffer expired, we
4981        * can use that buffer to preroll with */
4982       basesink->need_preroll = TRUE;
4983
4984       if (basesink->clock_id) {
4985         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4986         gst_clock_id_unschedule (basesink->clock_id);
4987       }
4988
4989       /* if we don't have a preroll buffer we need to wait for a preroll and
4990        * return ASYNC. */
4991       if (!gst_base_sink_needs_preroll (basesink)) {
4992         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4993         basesink->playing_async = FALSE;
4994       } else {
4995         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4996           GST_DEBUG_OBJECT (basesink, "element is <= READY");
4997           ret = GST_STATE_CHANGE_SUCCESS;
4998         } else {
4999           GST_DEBUG_OBJECT (basesink,
5000               "PLAYING to PAUSED, we are not prerolled");
5001           basesink->playing_async = TRUE;
5002           priv->commited = FALSE;
5003           priv->call_preroll = TRUE;
5004           if (priv->async_enabled) {
5005             GST_DEBUG_OBJECT (basesink, "doing async state change");
5006             ret = GST_STATE_CHANGE_ASYNC;
5007             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5008                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5009                     FALSE));
5010           }
5011         }
5012       }
5013       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5014           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5015
5016       gst_base_sink_reset_qos (basesink);
5017       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
5018       break;
5019     case GST_STATE_CHANGE_PAUSED_TO_READY:
5020       GST_BASE_SINK_PREROLL_LOCK (basesink);
5021       /* start by reseting our position state with the object lock so that the
5022        * position query gets the right idea. We do this before we post the
5023        * messages so that the message handlers pick this up. */
5024       GST_OBJECT_LOCK (basesink);
5025       basesink->have_newsegment = FALSE;
5026       priv->current_sstart = GST_CLOCK_TIME_NONE;
5027       priv->current_sstop = GST_CLOCK_TIME_NONE;
5028       priv->have_latency = FALSE;
5029       if (priv->cached_clock_id) {
5030         gst_clock_id_unref (priv->cached_clock_id);
5031         priv->cached_clock_id = NULL;
5032       }
5033       GST_OBJECT_UNLOCK (basesink);
5034
5035       gst_base_sink_set_last_buffer (basesink, NULL);
5036       priv->call_preroll = FALSE;
5037
5038       if (!priv->commited) {
5039         if (priv->async_enabled) {
5040           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5041
5042           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5043               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5044                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5045
5046           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5047               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5048         }
5049         priv->commited = TRUE;
5050       } else {
5051         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5052       }
5053       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
5054       break;
5055     case GST_STATE_CHANGE_READY_TO_NULL:
5056       if (bclass->stop) {
5057         if (!bclass->stop (basesink)) {
5058           GST_WARNING_OBJECT (basesink, "failed to stop");
5059         }
5060       }
5061       gst_base_sink_set_last_buffer (basesink, NULL);
5062       priv->call_preroll = FALSE;
5063       break;
5064     default:
5065       break;
5066   }
5067
5068   return ret;
5069
5070   /* ERRORS */
5071 start_failed:
5072   {
5073     GST_DEBUG_OBJECT (basesink, "failed to start");
5074     return GST_STATE_CHANGE_FAILURE;
5075   }
5076 activate_failed:
5077   {
5078     GST_DEBUG_OBJECT (basesink,
5079         "element failed to change states -- activation problem?");
5080     return GST_STATE_CHANGE_FAILURE;
5081   }
5082 }