basesink: don't compensate for render-delay twice
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
150  * with newer GLib versions (>= 2.31.0) */
151 #define GLIB_DISABLE_DEPRECATION_WARNINGS
152 #include <gst/gst_private.h>
153
154 #include "gstbasesink.h"
155 #include <gst/gstmarshal.h>
156 #include <gst/gst-i18n-lib.h>
157
158 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
159 #define GST_CAT_DEFAULT gst_base_sink_debug
160
161 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
162    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
163
164 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
165
166 typedef struct
167 {
168   gboolean valid;               /* if this info is valid */
169   guint32 seqnum;               /* the seqnum of the STEP event */
170   GstFormat format;             /* the format of the amount */
171   guint64 amount;               /* the total amount of data to skip */
172   guint64 position;             /* the position in the stepped data */
173   guint64 duration;             /* the duration in time of the skipped data */
174   guint64 start;                /* running_time of the start */
175   gdouble rate;                 /* rate of skipping */
176   gdouble start_rate;           /* rate before skipping */
177   guint64 start_start;          /* start position skipping */
178   guint64 start_stop;           /* stop position skipping */
179   gboolean flush;               /* if this was a flushing step */
180   gboolean intermediate;        /* if this is an intermediate step */
181   gboolean need_preroll;        /* if we need preroll after this step */
182 } GstStepInfo;
183
184 /* FIXME, some stuff in ABI.data and other in Private...
185  * Make up your mind please.
186  */
187 struct _GstBaseSinkPrivate
188 {
189   gint qos_enabled;             /* ATOMIC */
190   gboolean async_enabled;
191   GstClockTimeDiff ts_offset;
192   GstClockTime render_delay;
193
194   /* start, stop of current buffer, stream time, used to report position */
195   GstClockTime current_sstart;
196   GstClockTime current_sstop;
197
198   /* start, stop and jitter of current buffer, running time */
199   GstClockTime current_rstart;
200   GstClockTime current_rstop;
201   GstClockTimeDiff current_jitter;
202   /* the running time of the previous buffer */
203   GstClockTime prev_rstart;
204
205   /* EOS sync time in running time */
206   GstClockTime eos_rtime;
207
208   /* last buffer that arrived in time, running time */
209   GstClockTime last_render_time;
210   /* when the last buffer left the sink, running time */
211   GstClockTime last_left;
212
213   /* running averages go here these are done on running time */
214   GstClockTime avg_pt;
215   GstClockTime avg_duration;
216   gdouble avg_rate;
217   GstClockTime avg_in_diff;
218
219   /* these are done on system time. avg_jitter and avg_render are
220    * compared to eachother to see if the rendering time takes a
221    * huge amount of the processing, If so we are flooded with
222    * buffers. */
223   GstClockTime last_left_systime;
224   GstClockTime avg_jitter;
225   GstClockTime start, stop;
226   GstClockTime avg_render;
227
228   /* number of rendered and dropped frames */
229   guint64 rendered;
230   guint64 dropped;
231
232   /* latency stuff */
233   GstClockTime latency;
234
235   /* if we already commited the state */
236   gboolean commited;
237   /* state change to playing ongoing */
238   gboolean to_playing;
239
240   /* when we received EOS */
241   gboolean received_eos;
242
243   /* when we are prerolled and able to report latency */
244   gboolean have_latency;
245
246   /* the last buffer we prerolled or rendered. Useful for making snapshots */
247   gint enable_last_buffer;      /* atomic */
248   GstBuffer *last_buffer;
249
250   /* caps for pull based scheduling */
251   GstCaps *pull_caps;
252
253   /* blocksize for pulling */
254   guint blocksize;
255
256   gboolean discont;
257
258   /* seqnum of the stream */
259   guint32 seqnum;
260
261   gboolean call_preroll;
262   gboolean step_unlock;
263
264   /* we have a pending and a current step operation */
265   GstStepInfo current_step;
266   GstStepInfo pending_step;
267
268   /* Cached GstClockID */
269   GstClockID cached_clock_id;
270
271   /* for throttling and QoS */
272   GstClockTime earliest_in_time;
273   GstClockTime throttle_time;
274 };
275
276 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
277
278 /* generic running average, this has a neutral window size */
279 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
280
281 /* the windows for these running averages are experimentally obtained.
282  * possitive values get averaged more while negative values use a small
283  * window so we can react faster to badness. */
284 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
285 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
286
287 enum
288 {
289   _PR_IS_NOTHING = 1 << 0,
290   _PR_IS_BUFFER = 1 << 1,
291   _PR_IS_BUFFERLIST = 1 << 2,
292   _PR_IS_EVENT = 1 << 3
293 } PrivateObjectType;
294
295 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
296 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
297 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
298 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
299
300 /* BaseSink properties */
301
302 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
303 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
304
305 #define DEFAULT_PREROLL_QUEUE_LEN   0
306 #define DEFAULT_SYNC                TRUE
307 #define DEFAULT_MAX_LATENESS        -1
308 #define DEFAULT_QOS                 FALSE
309 #define DEFAULT_ASYNC               TRUE
310 #define DEFAULT_TS_OFFSET           0
311 #define DEFAULT_BLOCKSIZE           4096
312 #define DEFAULT_RENDER_DELAY        0
313 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
314 #define DEFAULT_THROTTLE_TIME       0
315
316 enum
317 {
318   PROP_0,
319   PROP_PREROLL_QUEUE_LEN,
320   PROP_SYNC,
321   PROP_MAX_LATENESS,
322   PROP_QOS,
323   PROP_ASYNC,
324   PROP_TS_OFFSET,
325   PROP_ENABLE_LAST_BUFFER,
326   PROP_LAST_BUFFER,
327   PROP_BLOCKSIZE,
328   PROP_RENDER_DELAY,
329   PROP_THROTTLE_TIME,
330   PROP_LAST
331 };
332
333 static GstElementClass *parent_class = NULL;
334
335 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
336 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
337 static void gst_base_sink_finalize (GObject * object);
338
339 GType
340 gst_base_sink_get_type (void)
341 {
342   static volatile gsize base_sink_type = 0;
343
344   if (g_once_init_enter (&base_sink_type)) {
345     GType _type;
346     static const GTypeInfo base_sink_info = {
347       sizeof (GstBaseSinkClass),
348       NULL,
349       NULL,
350       (GClassInitFunc) gst_base_sink_class_init,
351       NULL,
352       NULL,
353       sizeof (GstBaseSink),
354       0,
355       (GInstanceInitFunc) gst_base_sink_init,
356     };
357
358     _type = g_type_register_static (GST_TYPE_ELEMENT,
359         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
360     g_once_init_leave (&base_sink_type, _type);
361   }
362   return base_sink_type;
363 }
364
365 static void gst_base_sink_set_property (GObject * object, guint prop_id,
366     const GValue * value, GParamSpec * pspec);
367 static void gst_base_sink_get_property (GObject * object, guint prop_id,
368     GValue * value, GParamSpec * pspec);
369
370 static gboolean gst_base_sink_send_event (GstElement * element,
371     GstEvent * event);
372 static gboolean default_element_query (GstElement * element, GstQuery * query);
373 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
374
375 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
376 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
377 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
378     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
379 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
380     GstClockTime * start, GstClockTime * end);
381 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
382     GstPad * pad, gboolean flushing);
383 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
384     gboolean active);
385 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
386     GstSegment * segment);
387 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
388     GstEvent * event, GstSegment * segment);
389
390 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
391     GstStateChange transition);
392
393 static gboolean gst_base_sink_sink_query (GstPad * pad, GstQuery * query);
394 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
395 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
396     GstBufferList * list);
397
398 static void gst_base_sink_loop (GstPad * pad);
399 static gboolean gst_base_sink_pad_activate (GstPad * pad);
400 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
401 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
402 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
403
404 static gboolean default_sink_query (GstBaseSink * sink, GstQuery * query);
405
406 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
407 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
408 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
409 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
410 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
411     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
412
413
414 /* check if an object was too late */
415 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
416     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
417     GstClockReturn status, GstClockTimeDiff jitter);
418 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
419     guint8 obj_type, GstMiniObject * obj);
420
421 static void
422 gst_base_sink_class_init (GstBaseSinkClass * klass)
423 {
424   GObjectClass *gobject_class;
425   GstElementClass *gstelement_class;
426
427   gobject_class = G_OBJECT_CLASS (klass);
428   gstelement_class = GST_ELEMENT_CLASS (klass);
429
430   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
431       "basesink element");
432
433   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
434
435   parent_class = g_type_class_peek_parent (klass);
436
437   gobject_class->finalize = gst_base_sink_finalize;
438   gobject_class->set_property = gst_base_sink_set_property;
439   gobject_class->get_property = gst_base_sink_get_property;
440
441   /* FIXME, this next value should be configured using an event from the
442    * upstream element, ie, the BUFFER_SIZE event. */
443   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
444       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
445           "Number of buffers to queue during preroll", 0, G_MAXUINT,
446           DEFAULT_PREROLL_QUEUE_LEN,
447           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
448
449   g_object_class_install_property (gobject_class, PROP_SYNC,
450       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
451           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
454       g_param_spec_int64 ("max-lateness", "Max Lateness",
455           "Maximum number of nanoseconds that a buffer can be late before it "
456           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
457           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
458
459   g_object_class_install_property (gobject_class, PROP_QOS,
460       g_param_spec_boolean ("qos", "Qos",
461           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
462           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
463   /**
464    * GstBaseSink:async
465    *
466    * If set to #TRUE, the basesink will perform asynchronous state changes.
467    * When set to #FALSE, the sink will not signal the parent when it prerolls.
468    * Use this option when dealing with sparse streams or when synchronisation is
469    * not required.
470    *
471    * Since: 0.10.15
472    */
473   g_object_class_install_property (gobject_class, PROP_ASYNC,
474       g_param_spec_boolean ("async", "Async",
475           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
476           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
477   /**
478    * GstBaseSink:ts-offset
479    *
480    * Controls the final synchronisation, a negative value will render the buffer
481    * earlier while a positive value delays playback. This property can be
482    * used to fix synchronisation in bad files.
483    *
484    * Since: 0.10.15
485    */
486   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
487       g_param_spec_int64 ("ts-offset", "TS Offset",
488           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
489           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
490
491   /**
492    * GstBaseSink:enable-last-buffer
493    *
494    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
495    * reference to the last buffer arrived and the last-buffer property is always
496    * set to NULL. This can be useful if you need buffers to be released as soon
497    * as possible, eg. if you're using a buffer pool.
498    *
499    * Since: 0.10.30
500    */
501   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
502       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
503           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
504           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505
506   /**
507    * GstBaseSink:last-buffer
508    *
509    * The last buffer that arrived in the sink and was used for preroll or for
510    * rendering. This property can be used to generate thumbnails. This property
511    * can be NULL when the sink has not yet received a bufer.
512    *
513    * Since: 0.10.15
514    */
515   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
516       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
517           "The last buffer received in the sink", GST_TYPE_BUFFER,
518           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
519   /**
520    * GstBaseSink:blocksize
521    *
522    * The amount of bytes to pull when operating in pull mode.
523    *
524    * Since: 0.10.22
525    */
526   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
527   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
528       g_param_spec_uint ("blocksize", "Block size",
529           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
530           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531   /**
532    * GstBaseSink:render-delay
533    *
534    * The additional delay between synchronisation and actual rendering of the
535    * media. This property will add additional latency to the device in order to
536    * make other sinks compensate for the delay.
537    *
538    * Since: 0.10.22
539    */
540   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
541       g_param_spec_uint64 ("render-delay", "Render Delay",
542           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
543           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
544   /**
545    * GstBaseSink:throttle-time
546    *
547    * The time to insert between buffers. This property can be used to control
548    * the maximum amount of buffers per second to render. Setting this property
549    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
550    *
551    * Since: 0.10.33
552    */
553   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
554       g_param_spec_uint64 ("throttle-time", "Throttle time",
555           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
556           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
557
558   gstelement_class->change_state =
559       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
560   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
561   gstelement_class->query = GST_DEBUG_FUNCPTR (default_element_query);
562   gstelement_class->get_query_types =
563       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
564
565   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
566   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
567   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
568   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
569   klass->activate_pull =
570       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
571   klass->query = GST_DEBUG_FUNCPTR (default_sink_query);
572
573   /* Registering debug symbols for function pointers */
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
575   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
576   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
577   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
578   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
579   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
580   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
581   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
582   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
583   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
584   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_sink_query);
585 }
586
587 static GstCaps *
588 gst_base_sink_pad_getcaps (GstPad * pad)
589 {
590   GstBaseSinkClass *bclass;
591   GstBaseSink *bsink;
592   GstCaps *caps = NULL;
593
594   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
595   bclass = GST_BASE_SINK_GET_CLASS (bsink);
596
597   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
598     /* if we are operating in pull mode we only accept the negotiated caps */
599     GST_OBJECT_LOCK (pad);
600     if ((caps = GST_PAD_CAPS (pad)))
601       gst_caps_ref (caps);
602     GST_OBJECT_UNLOCK (pad);
603   }
604   if (caps == NULL) {
605     if (bclass->get_caps)
606       caps = bclass->get_caps (bsink);
607
608     if (caps == NULL) {
609       GstPadTemplate *pad_template;
610
611       pad_template =
612           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
613           "sink");
614       if (pad_template != NULL) {
615         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
616       }
617     }
618   }
619   gst_object_unref (bsink);
620
621   return caps;
622 }
623
624 static gboolean
625 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
626 {
627   GstBaseSinkClass *bclass;
628   GstBaseSink *bsink;
629   gboolean res = TRUE;
630
631   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
632   bclass = GST_BASE_SINK_GET_CLASS (bsink);
633
634   if (res && bclass->set_caps)
635     res = bclass->set_caps (bsink, caps);
636
637   gst_object_unref (bsink);
638
639   return res;
640 }
641
642 static void
643 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
644 {
645   GstBaseSinkClass *bclass;
646   GstBaseSink *bsink;
647
648   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
649   bclass = GST_BASE_SINK_GET_CLASS (bsink);
650
651   if (bclass->fixate)
652     bclass->fixate (bsink, caps);
653
654   gst_object_unref (bsink);
655 }
656
657 static GstFlowReturn
658 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
659     GstCaps * caps, GstBuffer ** buf)
660 {
661   GstBaseSinkClass *bclass;
662   GstBaseSink *bsink;
663   GstFlowReturn result = GST_FLOW_OK;
664
665   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
666   if (G_UNLIKELY (bsink == NULL))
667     return GST_FLOW_WRONG_STATE;
668   bclass = GST_BASE_SINK_GET_CLASS (bsink);
669
670   if (bclass->buffer_alloc)
671     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
672   else
673     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
674
675   gst_object_unref (bsink);
676
677   return result;
678 }
679
680 static void
681 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
682 {
683   GstPadTemplate *pad_template;
684   GstBaseSinkPrivate *priv;
685
686   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
687
688   pad_template =
689       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
690   g_return_if_fail (pad_template != NULL);
691
692   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
693
694   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
695   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
696   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
697   gst_pad_set_bufferalloc_function (basesink->sinkpad,
698       gst_base_sink_pad_buffer_alloc);
699   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
700   gst_pad_set_activatepush_function (basesink->sinkpad,
701       gst_base_sink_pad_activate_push);
702   gst_pad_set_activatepull_function (basesink->sinkpad,
703       gst_base_sink_pad_activate_pull);
704   gst_pad_set_query_function (basesink->sinkpad, gst_base_sink_sink_query);
705   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
706   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
707   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
708   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
709
710   basesink->pad_mode = GST_ACTIVATE_NONE;
711   basesink->preroll_queue = g_queue_new ();
712   basesink->abidata.ABI.clip_segment = gst_segment_new ();
713   priv->have_latency = FALSE;
714
715   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
716   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
717
718   basesink->sync = DEFAULT_SYNC;
719   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
720   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
721   priv->async_enabled = DEFAULT_ASYNC;
722   priv->ts_offset = DEFAULT_TS_OFFSET;
723   priv->render_delay = DEFAULT_RENDER_DELAY;
724   priv->blocksize = DEFAULT_BLOCKSIZE;
725   priv->cached_clock_id = NULL;
726   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
727   priv->throttle_time = DEFAULT_THROTTLE_TIME;
728
729   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
730 }
731
732 static void
733 gst_base_sink_finalize (GObject * object)
734 {
735   GstBaseSink *basesink;
736
737   basesink = GST_BASE_SINK (object);
738
739   g_queue_free (basesink->preroll_queue);
740   gst_segment_free (basesink->abidata.ABI.clip_segment);
741
742   G_OBJECT_CLASS (parent_class)->finalize (object);
743 }
744
745 /**
746  * gst_base_sink_set_sync:
747  * @sink: the sink
748  * @sync: the new sync value.
749  *
750  * Configures @sink to synchronize on the clock or not. When
751  * @sync is FALSE, incoming samples will be played as fast as
752  * possible. If @sync is TRUE, the timestamps of the incomming
753  * buffers will be used to schedule the exact render time of its
754  * contents.
755  *
756  * Since: 0.10.4
757  */
758 void
759 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
760 {
761   g_return_if_fail (GST_IS_BASE_SINK (sink));
762
763   GST_OBJECT_LOCK (sink);
764   sink->sync = sync;
765   GST_OBJECT_UNLOCK (sink);
766 }
767
768 /**
769  * gst_base_sink_get_sync:
770  * @sink: the sink
771  *
772  * Checks if @sink is currently configured to synchronize against the
773  * clock.
774  *
775  * Returns: TRUE if the sink is configured to synchronize against the clock.
776  *
777  * Since: 0.10.4
778  */
779 gboolean
780 gst_base_sink_get_sync (GstBaseSink * sink)
781 {
782   gboolean res;
783
784   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
785
786   GST_OBJECT_LOCK (sink);
787   res = sink->sync;
788   GST_OBJECT_UNLOCK (sink);
789
790   return res;
791 }
792
793 /**
794  * gst_base_sink_set_max_lateness:
795  * @sink: the sink
796  * @max_lateness: the new max lateness value.
797  *
798  * Sets the new max lateness value to @max_lateness. This value is
799  * used to decide if a buffer should be dropped or not based on the
800  * buffer timestamp and the current clock time. A value of -1 means
801  * an unlimited time.
802  *
803  * Since: 0.10.4
804  */
805 void
806 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
807 {
808   g_return_if_fail (GST_IS_BASE_SINK (sink));
809
810   GST_OBJECT_LOCK (sink);
811   sink->abidata.ABI.max_lateness = max_lateness;
812   GST_OBJECT_UNLOCK (sink);
813 }
814
815 /**
816  * gst_base_sink_get_max_lateness:
817  * @sink: the sink
818  *
819  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
820  * more details.
821  *
822  * Returns: The maximum time in nanoseconds that a buffer can be late
823  * before it is dropped and not rendered. A value of -1 means an
824  * unlimited time.
825  *
826  * Since: 0.10.4
827  */
828 gint64
829 gst_base_sink_get_max_lateness (GstBaseSink * sink)
830 {
831   gint64 res;
832
833   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
834
835   GST_OBJECT_LOCK (sink);
836   res = sink->abidata.ABI.max_lateness;
837   GST_OBJECT_UNLOCK (sink);
838
839   return res;
840 }
841
842 /**
843  * gst_base_sink_set_qos_enabled:
844  * @sink: the sink
845  * @enabled: the new qos value.
846  *
847  * Configures @sink to send Quality-of-Service events upstream.
848  *
849  * Since: 0.10.5
850  */
851 void
852 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
853 {
854   g_return_if_fail (GST_IS_BASE_SINK (sink));
855
856   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
857 }
858
859 /**
860  * gst_base_sink_is_qos_enabled:
861  * @sink: the sink
862  *
863  * Checks if @sink is currently configured to send Quality-of-Service events
864  * upstream.
865  *
866  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
867  *
868  * Since: 0.10.5
869  */
870 gboolean
871 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
872 {
873   gboolean res;
874
875   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
876
877   res = g_atomic_int_get (&sink->priv->qos_enabled);
878
879   return res;
880 }
881
882 /**
883  * gst_base_sink_set_async_enabled:
884  * @sink: the sink
885  * @enabled: the new async value.
886  *
887  * Configures @sink to perform all state changes asynchronusly. When async is
888  * disabled, the sink will immediately go to PAUSED instead of waiting for a
889  * preroll buffer. This feature is useful if the sink does not synchronize
890  * against the clock or when it is dealing with sparse streams.
891  *
892  * Since: 0.10.15
893  */
894 void
895 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
896 {
897   g_return_if_fail (GST_IS_BASE_SINK (sink));
898
899   GST_PAD_PREROLL_LOCK (sink->sinkpad);
900   g_atomic_int_set (&sink->priv->async_enabled, enabled);
901   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
902   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
903 }
904
905 /**
906  * gst_base_sink_is_async_enabled:
907  * @sink: the sink
908  *
909  * Checks if @sink is currently configured to perform asynchronous state
910  * changes to PAUSED.
911  *
912  * Returns: TRUE if the sink is configured to perform asynchronous state
913  * changes.
914  *
915  * Since: 0.10.15
916  */
917 gboolean
918 gst_base_sink_is_async_enabled (GstBaseSink * sink)
919 {
920   gboolean res;
921
922   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
923
924   res = g_atomic_int_get (&sink->priv->async_enabled);
925
926   return res;
927 }
928
929 /**
930  * gst_base_sink_set_ts_offset:
931  * @sink: the sink
932  * @offset: the new offset
933  *
934  * Adjust the synchronisation of @sink with @offset. A negative value will
935  * render buffers earlier than their timestamp. A positive value will delay
936  * rendering. This function can be used to fix playback of badly timestamped
937  * buffers.
938  *
939  * Since: 0.10.15
940  */
941 void
942 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
943 {
944   g_return_if_fail (GST_IS_BASE_SINK (sink));
945
946   GST_OBJECT_LOCK (sink);
947   sink->priv->ts_offset = offset;
948   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
949   GST_OBJECT_UNLOCK (sink);
950 }
951
952 /**
953  * gst_base_sink_get_ts_offset:
954  * @sink: the sink
955  *
956  * Get the synchronisation offset of @sink.
957  *
958  * Returns: The synchronisation offset.
959  *
960  * Since: 0.10.15
961  */
962 GstClockTimeDiff
963 gst_base_sink_get_ts_offset (GstBaseSink * sink)
964 {
965   GstClockTimeDiff res;
966
967   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
968
969   GST_OBJECT_LOCK (sink);
970   res = sink->priv->ts_offset;
971   GST_OBJECT_UNLOCK (sink);
972
973   return res;
974 }
975
976 /**
977  * gst_base_sink_get_last_buffer:
978  * @sink: the sink
979  *
980  * Get the last buffer that arrived in the sink and was used for preroll or for
981  * rendering. This property can be used to generate thumbnails.
982  *
983  * The #GstCaps on the buffer can be used to determine the type of the buffer.
984  *
985  * Free-function: gst_buffer_unref
986  *
987  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
988  *     This function returns NULL when no buffer has arrived in the sink yet
989  *     or when the sink is not in PAUSED or PLAYING.
990  *
991  * Since: 0.10.15
992  */
993 GstBuffer *
994 gst_base_sink_get_last_buffer (GstBaseSink * sink)
995 {
996   GstBuffer *res;
997
998   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
999
1000   GST_OBJECT_LOCK (sink);
1001   if ((res = sink->priv->last_buffer))
1002     gst_buffer_ref (res);
1003   GST_OBJECT_UNLOCK (sink);
1004
1005   return res;
1006 }
1007
1008 /* with OBJECT_LOCK */
1009 static void
1010 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1011 {
1012   GstBuffer *old;
1013
1014   old = sink->priv->last_buffer;
1015   if (G_LIKELY (old != buffer)) {
1016     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1017     if (G_LIKELY (buffer))
1018       gst_buffer_ref (buffer);
1019     sink->priv->last_buffer = buffer;
1020   } else {
1021     old = NULL;
1022   }
1023   /* avoid unreffing with the lock because cleanup code might want to take the
1024    * lock too */
1025   if (G_LIKELY (old)) {
1026     GST_OBJECT_UNLOCK (sink);
1027     gst_buffer_unref (old);
1028     GST_OBJECT_LOCK (sink);
1029   }
1030 }
1031
1032 static void
1033 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1034 {
1035   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1036     return;
1037
1038   GST_OBJECT_LOCK (sink);
1039   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1040   GST_OBJECT_UNLOCK (sink);
1041 }
1042
1043 /**
1044  * gst_base_sink_set_last_buffer_enabled:
1045  * @sink: the sink
1046  * @enabled: the new enable-last-buffer value.
1047  *
1048  * Configures @sink to store the last received buffer in the last-buffer
1049  * property.
1050  *
1051  * Since: 0.10.30
1052  */
1053 void
1054 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1055 {
1056   g_return_if_fail (GST_IS_BASE_SINK (sink));
1057
1058   /* Only take lock if we change the value */
1059   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1060           !enabled, enabled) && !enabled) {
1061     GST_OBJECT_LOCK (sink);
1062     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1063     GST_OBJECT_UNLOCK (sink);
1064   }
1065 }
1066
1067 /**
1068  * gst_base_sink_is_last_buffer_enabled:
1069  * @sink: the sink
1070  *
1071  * Checks if @sink is currently configured to store the last received buffer in
1072  * the last-buffer property.
1073  *
1074  * Returns: TRUE if the sink is configured to store the last received buffer.
1075  *
1076  * Since: 0.10.30
1077  */
1078 gboolean
1079 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1080 {
1081   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1082
1083   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1084 }
1085
1086 /**
1087  * gst_base_sink_get_latency:
1088  * @sink: the sink
1089  *
1090  * Get the currently configured latency.
1091  *
1092  * Returns: The configured latency.
1093  *
1094  * Since: 0.10.12
1095  */
1096 GstClockTime
1097 gst_base_sink_get_latency (GstBaseSink * sink)
1098 {
1099   GstClockTime res;
1100
1101   GST_OBJECT_LOCK (sink);
1102   res = sink->priv->latency;
1103   GST_OBJECT_UNLOCK (sink);
1104
1105   return res;
1106 }
1107
1108 /**
1109  * gst_base_sink_query_latency:
1110  * @sink: the sink
1111  * @live: (out) (allow-none): if the sink is live
1112  * @upstream_live: (out) (allow-none): if an upstream element is live
1113  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1114  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1115  *
1116  * Query the sink for the latency parameters. The latency will be queried from
1117  * the upstream elements. @live will be TRUE if @sink is configured to
1118  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1119  * element is live.
1120  *
1121  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1122  * for the latency introduced by the upstream elements by setting the
1123  * @min_latency to a strictly possitive value.
1124  *
1125  * This function is mostly used by subclasses.
1126  *
1127  * Returns: TRUE if the query succeeded.
1128  *
1129  * Since: 0.10.12
1130  */
1131 gboolean
1132 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1133     gboolean * upstream_live, GstClockTime * min_latency,
1134     GstClockTime * max_latency)
1135 {
1136   gboolean l, us_live, res, have_latency;
1137   GstClockTime min, max, render_delay;
1138   GstQuery *query;
1139   GstClockTime us_min, us_max;
1140
1141   /* we are live when we sync to the clock */
1142   GST_OBJECT_LOCK (sink);
1143   l = sink->sync;
1144   have_latency = sink->priv->have_latency;
1145   render_delay = sink->priv->render_delay;
1146   GST_OBJECT_UNLOCK (sink);
1147
1148   /* assume no latency */
1149   min = 0;
1150   max = -1;
1151   us_live = FALSE;
1152
1153   if (have_latency) {
1154     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1155     /* we are ready for a latency query this is when we preroll or when we are
1156      * not async. */
1157     query = gst_query_new_latency ();
1158
1159     /* ask the peer for the latency */
1160     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1161       /* get upstream min and max latency */
1162       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1163
1164       if (us_live) {
1165         /* upstream live, use its latency, subclasses should use these
1166          * values to create the complete latency. */
1167         min = us_min;
1168         max = us_max;
1169       }
1170       if (l) {
1171         /* we need to add the render delay if we are live */
1172         if (min != -1)
1173           min += render_delay;
1174         if (max != -1)
1175           max += render_delay;
1176       }
1177     }
1178     gst_query_unref (query);
1179   } else {
1180     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1181     res = FALSE;
1182   }
1183
1184   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1185   if (!res) {
1186     if (!l) {
1187       res = TRUE;
1188       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1189     } else {
1190       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1191     }
1192   }
1193
1194   if (res) {
1195     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1196         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1197         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1198
1199     if (live)
1200       *live = l;
1201     if (upstream_live)
1202       *upstream_live = us_live;
1203     if (min_latency)
1204       *min_latency = min;
1205     if (max_latency)
1206       *max_latency = max;
1207   }
1208   return res;
1209 }
1210
1211 /**
1212  * gst_base_sink_set_render_delay:
1213  * @sink: a #GstBaseSink
1214  * @delay: the new delay
1215  *
1216  * Set the render delay in @sink to @delay. The render delay is the time
1217  * between actual rendering of a buffer and its synchronisation time. Some
1218  * devices might delay media rendering which can be compensated for with this
1219  * function.
1220  *
1221  * After calling this function, this sink will report additional latency and
1222  * other sinks will adjust their latency to delay the rendering of their media.
1223  *
1224  * This function is usually called by subclasses.
1225  *
1226  * Since: 0.10.21
1227  */
1228 void
1229 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1230 {
1231   GstClockTime old_render_delay;
1232
1233   g_return_if_fail (GST_IS_BASE_SINK (sink));
1234
1235   GST_OBJECT_LOCK (sink);
1236   old_render_delay = sink->priv->render_delay;
1237   sink->priv->render_delay = delay;
1238   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1239       GST_TIME_ARGS (delay));
1240   GST_OBJECT_UNLOCK (sink);
1241
1242   if (delay != old_render_delay) {
1243     GST_DEBUG_OBJECT (sink, "posting latency changed");
1244     gst_element_post_message (GST_ELEMENT_CAST (sink),
1245         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1246   }
1247 }
1248
1249 /**
1250  * gst_base_sink_get_render_delay:
1251  * @sink: a #GstBaseSink
1252  *
1253  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1254  * information about the render delay.
1255  *
1256  * Returns: the render delay of @sink.
1257  *
1258  * Since: 0.10.21
1259  */
1260 GstClockTime
1261 gst_base_sink_get_render_delay (GstBaseSink * sink)
1262 {
1263   GstClockTimeDiff res;
1264
1265   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1266
1267   GST_OBJECT_LOCK (sink);
1268   res = sink->priv->render_delay;
1269   GST_OBJECT_UNLOCK (sink);
1270
1271   return res;
1272 }
1273
1274 /**
1275  * gst_base_sink_set_blocksize:
1276  * @sink: a #GstBaseSink
1277  * @blocksize: the blocksize in bytes
1278  *
1279  * Set the number of bytes that the sink will pull when it is operating in pull
1280  * mode.
1281  *
1282  * Since: 0.10.22
1283  */
1284 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1285 void
1286 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1287 {
1288   g_return_if_fail (GST_IS_BASE_SINK (sink));
1289
1290   GST_OBJECT_LOCK (sink);
1291   sink->priv->blocksize = blocksize;
1292   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1293   GST_OBJECT_UNLOCK (sink);
1294 }
1295
1296 /**
1297  * gst_base_sink_get_blocksize:
1298  * @sink: a #GstBaseSink
1299  *
1300  * Get the number of bytes that the sink will pull when it is operating in pull
1301  * mode.
1302  *
1303  * Returns: the number of bytes @sink will pull in pull mode.
1304  *
1305  * Since: 0.10.22
1306  */
1307 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1308 guint
1309 gst_base_sink_get_blocksize (GstBaseSink * sink)
1310 {
1311   guint res;
1312
1313   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1314
1315   GST_OBJECT_LOCK (sink);
1316   res = sink->priv->blocksize;
1317   GST_OBJECT_UNLOCK (sink);
1318
1319   return res;
1320 }
1321
1322 /**
1323  * gst_base_sink_set_throttle_time:
1324  * @sink: a #GstBaseSink
1325  * @throttle: the throttle time in nanoseconds
1326  *
1327  * Set the time that will be inserted between rendered buffers. This
1328  * can be used to control the maximum buffers per second that the sink
1329  * will render. 
1330  *
1331  * Since: 0.10.33
1332  */
1333 void
1334 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1335 {
1336   g_return_if_fail (GST_IS_BASE_SINK (sink));
1337
1338   GST_OBJECT_LOCK (sink);
1339   sink->priv->throttle_time = throttle;
1340   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1341   GST_OBJECT_UNLOCK (sink);
1342 }
1343
1344 /**
1345  * gst_base_sink_get_throttle_time:
1346  * @sink: a #GstBaseSink
1347  *
1348  * Get the time that will be inserted between frames to control the 
1349  * maximum buffers per second.
1350  *
1351  * Returns: the number of nanoseconds @sink will put between frames.
1352  *
1353  * Since: 0.10.33
1354  */
1355 guint64
1356 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1357 {
1358   guint64 res;
1359
1360   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1361
1362   GST_OBJECT_LOCK (sink);
1363   res = sink->priv->throttle_time;
1364   GST_OBJECT_UNLOCK (sink);
1365
1366   return res;
1367 }
1368
1369 static void
1370 gst_base_sink_set_property (GObject * object, guint prop_id,
1371     const GValue * value, GParamSpec * pspec)
1372 {
1373   GstBaseSink *sink = GST_BASE_SINK (object);
1374
1375   switch (prop_id) {
1376     case PROP_PREROLL_QUEUE_LEN:
1377       /* preroll lock necessary to serialize with finish_preroll */
1378       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1379       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1380       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1381       break;
1382     case PROP_SYNC:
1383       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1384       break;
1385     case PROP_MAX_LATENESS:
1386       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1387       break;
1388     case PROP_QOS:
1389       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1390       break;
1391     case PROP_ASYNC:
1392       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1393       break;
1394     case PROP_TS_OFFSET:
1395       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1396       break;
1397     case PROP_BLOCKSIZE:
1398       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1399       break;
1400     case PROP_RENDER_DELAY:
1401       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1402       break;
1403     case PROP_ENABLE_LAST_BUFFER:
1404       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1405       break;
1406     case PROP_THROTTLE_TIME:
1407       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1408       break;
1409     default:
1410       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1411       break;
1412   }
1413 }
1414
1415 static void
1416 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1417     GParamSpec * pspec)
1418 {
1419   GstBaseSink *sink = GST_BASE_SINK (object);
1420
1421   switch (prop_id) {
1422     case PROP_PREROLL_QUEUE_LEN:
1423       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1424       break;
1425     case PROP_SYNC:
1426       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1427       break;
1428     case PROP_MAX_LATENESS:
1429       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1430       break;
1431     case PROP_QOS:
1432       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1433       break;
1434     case PROP_ASYNC:
1435       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1436       break;
1437     case PROP_TS_OFFSET:
1438       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1439       break;
1440     case PROP_LAST_BUFFER:
1441       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1442       break;
1443     case PROP_ENABLE_LAST_BUFFER:
1444       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1445       break;
1446     case PROP_BLOCKSIZE:
1447       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1448       break;
1449     case PROP_RENDER_DELAY:
1450       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1451       break;
1452     case PROP_THROTTLE_TIME:
1453       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1454       break;
1455     default:
1456       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1457       break;
1458   }
1459 }
1460
1461
1462 static GstCaps *
1463 gst_base_sink_get_caps (GstBaseSink * sink)
1464 {
1465   return NULL;
1466 }
1467
1468 static gboolean
1469 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1470 {
1471   return TRUE;
1472 }
1473
1474 static GstFlowReturn
1475 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1476     GstCaps * caps, GstBuffer ** buf)
1477 {
1478   *buf = NULL;
1479   return GST_FLOW_OK;
1480 }
1481
1482 /* with PREROLL_LOCK, STREAM_LOCK */
1483 static void
1484 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1485 {
1486   GstMiniObject *obj;
1487
1488   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1489   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1490     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1491     gst_mini_object_unref (obj);
1492   }
1493   /* we can't have EOS anymore now */
1494   basesink->eos = FALSE;
1495   basesink->priv->received_eos = FALSE;
1496   basesink->have_preroll = FALSE;
1497   basesink->priv->step_unlock = FALSE;
1498   basesink->eos_queued = FALSE;
1499   basesink->preroll_queued = 0;
1500   basesink->buffers_queued = 0;
1501   basesink->events_queued = 0;
1502   /* can't report latency anymore until we preroll again */
1503   if (basesink->priv->async_enabled) {
1504     GST_OBJECT_LOCK (basesink);
1505     basesink->priv->have_latency = FALSE;
1506     GST_OBJECT_UNLOCK (basesink);
1507   }
1508   /* and signal any waiters now */
1509   GST_PAD_PREROLL_SIGNAL (pad);
1510 }
1511
1512 /* with STREAM_LOCK, configures given segment with the event information. */
1513 static void
1514 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1515     GstEvent * event, GstSegment * segment)
1516 {
1517   gboolean update;
1518   gdouble rate, arate;
1519   GstFormat format;
1520   gint64 start;
1521   gint64 stop;
1522   gint64 time;
1523
1524   /* the newsegment event is needed to bring the buffer timestamps to the
1525    * stream time and to drop samples outside of the playback segment. */
1526   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1527       &start, &stop, &time);
1528
1529   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1530    * We protect with the OBJECT_LOCK so that we can use the values to
1531    * safely answer a POSITION query. */
1532   GST_OBJECT_LOCK (basesink);
1533   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1534       stop, time);
1535
1536   if (format == GST_FORMAT_TIME) {
1537     GST_DEBUG_OBJECT (basesink,
1538         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1539         "format GST_FORMAT_TIME, "
1540         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1541         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1542         update, rate, arate, GST_TIME_ARGS (segment->start),
1543         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1544         GST_TIME_ARGS (segment->accum));
1545   } else {
1546     GST_DEBUG_OBJECT (basesink,
1547         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1548         "format %d, "
1549         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1550         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1551         segment->format, segment->start, segment->stop, segment->time,
1552         segment->accum);
1553   }
1554   GST_OBJECT_UNLOCK (basesink);
1555 }
1556
1557 /* with PREROLL_LOCK, STREAM_LOCK */
1558 static gboolean
1559 gst_base_sink_commit_state (GstBaseSink * basesink)
1560 {
1561   /* commit state and proceed to next pending state */
1562   GstState current, next, pending, post_pending;
1563   gboolean post_paused = FALSE;
1564   gboolean post_async_done = FALSE;
1565   gboolean post_playing = FALSE;
1566
1567   /* we are certainly not playing async anymore now */
1568   basesink->playing_async = FALSE;
1569
1570   GST_OBJECT_LOCK (basesink);
1571   current = GST_STATE (basesink);
1572   next = GST_STATE_NEXT (basesink);
1573   pending = GST_STATE_PENDING (basesink);
1574   post_pending = pending;
1575
1576   switch (pending) {
1577     case GST_STATE_PLAYING:
1578     {
1579       GstBaseSinkClass *bclass;
1580       GstStateChangeReturn ret;
1581
1582       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1583
1584       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1585
1586       basesink->need_preroll = FALSE;
1587       post_async_done = TRUE;
1588       basesink->priv->commited = TRUE;
1589       post_playing = TRUE;
1590       /* post PAUSED too when we were READY */
1591       if (current == GST_STATE_READY) {
1592         post_paused = TRUE;
1593       }
1594
1595       /* make sure we notify the subclass of async playing */
1596       if (bclass->async_play) {
1597         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1598         ret = bclass->async_play (basesink);
1599         if (ret == GST_STATE_CHANGE_FAILURE)
1600           goto async_failed;
1601       }
1602       break;
1603     }
1604     case GST_STATE_PAUSED:
1605       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1606       post_paused = TRUE;
1607       post_async_done = TRUE;
1608       basesink->priv->commited = TRUE;
1609       post_pending = GST_STATE_VOID_PENDING;
1610       break;
1611     case GST_STATE_READY:
1612     case GST_STATE_NULL:
1613       goto stopping;
1614     case GST_STATE_VOID_PENDING:
1615       goto nothing_pending;
1616     default:
1617       break;
1618   }
1619
1620   /* we can report latency queries now */
1621   basesink->priv->have_latency = TRUE;
1622
1623   GST_STATE (basesink) = pending;
1624   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1625   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1626   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1627   GST_OBJECT_UNLOCK (basesink);
1628
1629   if (post_paused) {
1630     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1631     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1632         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1633             current, next, post_pending));
1634   }
1635   if (post_async_done) {
1636     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1637     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1638         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1639   }
1640   if (post_playing) {
1641     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1642     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1643         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1644             next, pending, GST_STATE_VOID_PENDING));
1645   }
1646
1647   GST_STATE_BROADCAST (basesink);
1648
1649   return TRUE;
1650
1651 nothing_pending:
1652   {
1653     /* Depending on the state, set our vars. We get in this situation when the
1654      * state change function got a change to update the state vars before the
1655      * streaming thread did. This is fine but we need to make sure that we
1656      * update the need_preroll var since it was TRUE when we got here and might
1657      * become FALSE if we got to PLAYING. */
1658     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1659         gst_element_state_get_name (current));
1660     switch (current) {
1661       case GST_STATE_PLAYING:
1662         basesink->need_preroll = FALSE;
1663         break;
1664       case GST_STATE_PAUSED:
1665         basesink->need_preroll = TRUE;
1666         break;
1667       default:
1668         basesink->need_preroll = FALSE;
1669         basesink->flushing = TRUE;
1670         break;
1671     }
1672     /* we can report latency queries now */
1673     basesink->priv->have_latency = TRUE;
1674     GST_OBJECT_UNLOCK (basesink);
1675     return TRUE;
1676   }
1677 stopping:
1678   {
1679     /* app is going to READY */
1680     GST_DEBUG_OBJECT (basesink, "stopping");
1681     basesink->need_preroll = FALSE;
1682     basesink->flushing = TRUE;
1683     GST_OBJECT_UNLOCK (basesink);
1684     return FALSE;
1685   }
1686 async_failed:
1687   {
1688     GST_DEBUG_OBJECT (basesink, "async commit failed");
1689     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1690     GST_OBJECT_UNLOCK (basesink);
1691     return FALSE;
1692   }
1693 }
1694
1695 static void
1696 start_stepping (GstBaseSink * sink, GstSegment * segment,
1697     GstStepInfo * pending, GstStepInfo * current)
1698 {
1699   gint64 end;
1700   GstMessage *message;
1701
1702   GST_DEBUG_OBJECT (sink, "update pending step");
1703
1704   GST_OBJECT_LOCK (sink);
1705   memcpy (current, pending, sizeof (GstStepInfo));
1706   pending->valid = FALSE;
1707   GST_OBJECT_UNLOCK (sink);
1708
1709   /* post message first */
1710   message =
1711       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1712       current->amount, current->rate, current->flush, current->intermediate);
1713   gst_message_set_seqnum (message, current->seqnum);
1714   gst_element_post_message (GST_ELEMENT (sink), message);
1715
1716   /* get the running time of where we paused and remember it */
1717   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1718   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1719
1720   /* set the new rate for the remainder of the segment */
1721   current->start_rate = segment->rate;
1722   segment->rate *= current->rate;
1723   segment->abs_rate = ABS (segment->rate);
1724
1725   /* save values */
1726   if (segment->rate > 0.0)
1727     current->start_stop = segment->stop;
1728   else
1729     current->start_start = segment->start;
1730
1731   if (current->format == GST_FORMAT_TIME) {
1732     end = current->start + current->amount;
1733     if (!current->flush) {
1734       /* update the segment clipping regions for non-flushing seeks */
1735       if (segment->rate > 0.0) {
1736         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1737         segment->last_stop = segment->stop;
1738       } else {
1739         gint64 position;
1740
1741         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1742         segment->time = position;
1743         segment->start = position;
1744         segment->last_stop = position;
1745       }
1746     }
1747   }
1748
1749   GST_DEBUG_OBJECT (sink,
1750       "segment now rate %lf, applied rate %lf, "
1751       "format GST_FORMAT_TIME, "
1752       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1753       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1754       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1755       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1756       GST_TIME_ARGS (segment->accum));
1757
1758   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1759       GST_TIME_ARGS (current->start));
1760
1761   if (current->amount == -1) {
1762     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1763     current->valid = FALSE;
1764   } else {
1765     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1766         "rate: %f", current->amount, gst_format_get_name (current->format),
1767         current->rate);
1768   }
1769 }
1770
1771 static void
1772 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1773     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1774 {
1775   gint64 stop, position;
1776   GstMessage *message;
1777
1778   GST_DEBUG_OBJECT (sink, "step complete");
1779
1780   if (segment->rate > 0.0)
1781     stop = rstart;
1782   else
1783     stop = rstop;
1784
1785   GST_DEBUG_OBJECT (sink,
1786       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1787
1788   if (stop == -1)
1789     current->duration = current->position;
1790   else
1791     current->duration = stop - current->start;
1792
1793   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1794       GST_TIME_ARGS (current->duration));
1795
1796   position = current->start + current->duration;
1797
1798   /* now move the segment to the new running time */
1799   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1800
1801   if (current->flush) {
1802     /* and remove the accumulated time we flushed, start time did not change */
1803     segment->accum = current->start;
1804   } else {
1805     /* start time is now the stepped position */
1806     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1807   }
1808
1809   /* restore the previous rate */
1810   segment->rate = current->start_rate;
1811   segment->abs_rate = ABS (segment->rate);
1812
1813   if (segment->rate > 0.0)
1814     segment->stop = current->start_stop;
1815   else
1816     segment->start = current->start_start;
1817
1818   /* the clip segment is used for position report in paused... */
1819   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1820
1821   /* post the step done when we know the stepped duration in TIME */
1822   message =
1823       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1824       current->amount, current->rate, current->flush, current->intermediate,
1825       current->duration, eos);
1826   gst_message_set_seqnum (message, current->seqnum);
1827   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1828
1829   if (!current->intermediate)
1830     sink->need_preroll = current->need_preroll;
1831
1832   /* and the current step info finished and becomes invalid */
1833   current->valid = FALSE;
1834 }
1835
1836 static gboolean
1837 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1838     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1839     gint64 * rstop)
1840 {
1841   gboolean step_end = FALSE;
1842
1843   /* see if we need to skip this buffer because of stepping */
1844   switch (current->format) {
1845     case GST_FORMAT_TIME:
1846     {
1847       guint64 end;
1848       gint64 first, last;
1849
1850       if (segment->rate > 0.0) {
1851         if (segment->stop == *cstop)
1852           *rstop = *rstart + current->amount;
1853
1854         first = *rstart;
1855         last = *rstop;
1856       } else {
1857         if (segment->start == *cstart)
1858           *rstart = *rstop + current->amount;
1859
1860         first = *rstop;
1861         last = *rstart;
1862       }
1863
1864       end = current->start + current->amount;
1865       current->position = first - current->start;
1866
1867       if (G_UNLIKELY (segment->abs_rate != 1.0))
1868         current->position /= segment->abs_rate;
1869
1870       GST_DEBUG_OBJECT (sink,
1871           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1872           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1873       GST_DEBUG_OBJECT (sink,
1874           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1875           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1876           GST_TIME_ARGS (last - current->start),
1877           GST_TIME_ARGS (current->amount));
1878
1879       if ((current->flush && current->position >= current->amount)
1880           || last >= end) {
1881         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1882         step_end = TRUE;
1883         if (segment->rate > 0.0) {
1884           *rstart = end;
1885           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1886         } else {
1887           *rstop = end;
1888           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1889         }
1890       }
1891       GST_DEBUG_OBJECT (sink,
1892           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1893           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1894       GST_DEBUG_OBJECT (sink,
1895           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1896           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1897       break;
1898     }
1899     case GST_FORMAT_BUFFERS:
1900       GST_DEBUG_OBJECT (sink,
1901           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1902           current->position, current->amount);
1903
1904       if (current->position < current->amount) {
1905         current->position++;
1906       } else {
1907         step_end = TRUE;
1908       }
1909       break;
1910     case GST_FORMAT_DEFAULT:
1911     default:
1912       GST_DEBUG_OBJECT (sink,
1913           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1914           current->position, current->amount);
1915       break;
1916   }
1917   return step_end;
1918 }
1919
1920 /* with STREAM_LOCK, PREROLL_LOCK
1921  *
1922  * Returns TRUE if the object needs synchronisation and takes therefore
1923  * part in prerolling.
1924  *
1925  * rsstart/rsstop contain the start/stop in stream time.
1926  * rrstart/rrstop contain the start/stop in running time.
1927  */
1928 static gboolean
1929 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1930     GstClockTime * rsstart, GstClockTime * rsstop,
1931     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1932     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1933     gboolean * step_end, guint8 obj_type)
1934 {
1935   GstBaseSinkClass *bclass;
1936   GstBuffer *buffer;
1937   GstClockTime start, stop;     /* raw start/stop timestamps */
1938   gint64 cstart, cstop;         /* clipped raw timestamps */
1939   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1940   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1941   GstFormat format;
1942   GstBaseSinkPrivate *priv;
1943   gboolean eos;
1944
1945   priv = basesink->priv;
1946
1947   /* start with nothing */
1948   start = stop = GST_CLOCK_TIME_NONE;
1949
1950   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1951     GstEvent *event = GST_EVENT_CAST (obj);
1952
1953     switch (GST_EVENT_TYPE (event)) {
1954         /* EOS event needs syncing */
1955       case GST_EVENT_EOS:
1956       {
1957         if (basesink->segment.rate >= 0.0) {
1958           sstart = sstop = priv->current_sstop;
1959           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1960             /* we have not seen a buffer yet, use the segment values */
1961             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1962                 basesink->segment.format, basesink->segment.stop);
1963           }
1964         } else {
1965           sstart = sstop = priv->current_sstart;
1966           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1967             /* we have not seen a buffer yet, use the segment values */
1968             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1969                 basesink->segment.format, basesink->segment.start);
1970           }
1971         }
1972
1973         rstart = rstop = priv->eos_rtime;
1974         *do_sync = rstart != -1;
1975         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1976             GST_TIME_ARGS (rstart));
1977         /* if we are stepping, we end now */
1978         *step_end = step->valid;
1979         eos = TRUE;
1980         goto eos_done;
1981       }
1982       default:
1983         /* other events do not need syncing */
1984         /* FIXME, maybe NEWSEGMENT might need synchronisation
1985          * since the POSITION query depends on accumulated times and
1986          * we cannot accumulate the current segment before the previous
1987          * one completed.
1988          */
1989         return FALSE;
1990     }
1991   }
1992
1993   eos = FALSE;
1994
1995 again:
1996   /* else do buffer sync code */
1997   buffer = GST_BUFFER_CAST (obj);
1998
1999   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2000
2001   /* just get the times to see if we need syncing, if the start returns -1 we
2002    * don't sync. */
2003   if (bclass->get_times)
2004     bclass->get_times (basesink, buffer, &start, &stop);
2005
2006   if (!GST_CLOCK_TIME_IS_VALID (start)) {
2007     /* we don't need to sync but we still want to get the timestamps for
2008      * tracking the position */
2009     gst_base_sink_get_times (basesink, buffer, &start, &stop);
2010     *do_sync = FALSE;
2011   } else {
2012     *do_sync = TRUE;
2013   }
2014
2015   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2016       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2017       GST_TIME_ARGS (stop), *do_sync);
2018
2019   /* collect segment and format for code clarity */
2020   format = segment->format;
2021
2022   /* no timestamp clipping if we did not get a TIME segment format */
2023   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2024     cstart = start;
2025     cstop = stop;
2026     /* do running and stream time in TIME format */
2027     format = GST_FORMAT_TIME;
2028     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2029     goto do_times;
2030   }
2031
2032   /* clip, only when we know about time */
2033   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2034               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2035     if (step->valid) {
2036       GST_DEBUG_OBJECT (basesink, "step out of segment");
2037       /* when we are stepping, pretend we're at the end of the segment */
2038       if (segment->rate > 0.0) {
2039         cstart = segment->stop;
2040         cstop = segment->stop;
2041       } else {
2042         cstart = segment->start;
2043         cstop = segment->start;
2044       }
2045       goto do_times;
2046     }
2047     goto out_of_segment;
2048   }
2049
2050   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2051     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2052         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2053         GST_TIME_ARGS (cstop));
2054   }
2055
2056   /* set last stop position */
2057   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2058     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2059   else
2060     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2061
2062 do_times:
2063   rstart = gst_segment_to_running_time (segment, format, cstart);
2064   rstop = gst_segment_to_running_time (segment, format, cstop);
2065
2066   if (G_UNLIKELY (step->valid)) {
2067     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2068                 &rstart, &rstop))) {
2069       /* step is still busy, we discard data when we are flushing */
2070       *stepped = step->flush;
2071       GST_DEBUG_OBJECT (basesink, "stepping busy");
2072     }
2073   }
2074   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2075    * upstream is behaving very badly */
2076   sstart = gst_segment_to_stream_time (segment, format, cstart);
2077   sstop = gst_segment_to_stream_time (segment, format, cstop);
2078
2079 eos_done:
2080   /* eos_done label only called when doing EOS, we also stop stepping then */
2081   if (*step_end && step->flush) {
2082     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2083     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2084     *step_end = FALSE;
2085     /* re-determine running start times for adjusted segment
2086      * (which has a flushed amount of running/accumulated time removed) */
2087     if (!GST_IS_EVENT (obj)) {
2088       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2089       goto again;
2090     }
2091   }
2092
2093   /* save times */
2094   *rsstart = sstart;
2095   *rsstop = sstop;
2096   *rrstart = rstart;
2097   *rrstop = rstop;
2098
2099   /* buffers and EOS always need syncing and preroll */
2100   return TRUE;
2101
2102   /* special cases */
2103 out_of_segment:
2104   {
2105     /* we usually clip in the chain function already but stepping could cause
2106      * the segment to be updated later. we return FALSE so that we don't try
2107      * to sync on it. */
2108     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2109     return FALSE;
2110   }
2111 }
2112
2113 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2114  * adjust a timestamp with the latency and timestamp offset. This function does
2115  * not adjust for the render delay. */
2116 static GstClockTime
2117 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2118 {
2119   GstClockTimeDiff ts_offset;
2120
2121   /* don't do anything funny with invalid timestamps */
2122   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2123     return time;
2124
2125   time += basesink->priv->latency;
2126
2127   /* apply offset, be carefull for underflows */
2128   ts_offset = basesink->priv->ts_offset;
2129   if (ts_offset < 0) {
2130     ts_offset = -ts_offset;
2131     if (ts_offset < time)
2132       time -= ts_offset;
2133     else
2134       time = 0;
2135   } else
2136     time += ts_offset;
2137
2138   /* subtract the render delay again, which was included in the latency */
2139   if (time > basesink->priv->render_delay)
2140     time -= basesink->priv->render_delay;
2141   else
2142     time = 0;
2143
2144   return time;
2145 }
2146
2147 /**
2148  * gst_base_sink_wait_clock:
2149  * @sink: the sink
2150  * @time: the running_time to be reached
2151  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2152  *
2153  * This function will block until @time is reached. It is usually called by
2154  * subclasses that use their own internal synchronisation.
2155  *
2156  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2157  * returned. Likewise, if synchronisation is disabled in the element or there
2158  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2159  *
2160  * This function should only be called with the PREROLL_LOCK held, like when
2161  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2162  * receiving a buffer in
2163  * the #GstBaseSinkClass.render() vmethod.
2164  *
2165  * The @time argument should be the running_time of when this method should
2166  * return and is not adjusted with any latency or offset configured in the
2167  * sink.
2168  *
2169  * Since: 0.10.20
2170  *
2171  * Returns: #GstClockReturn
2172  */
2173 GstClockReturn
2174 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2175     GstClockTimeDiff * jitter)
2176 {
2177   GstClockReturn ret;
2178   GstClock *clock;
2179   GstClockTime base_time;
2180
2181   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2182     goto invalid_time;
2183
2184   GST_OBJECT_LOCK (sink);
2185   if (G_UNLIKELY (!sink->sync))
2186     goto no_sync;
2187
2188   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2189     goto no_clock;
2190
2191   base_time = GST_ELEMENT_CAST (sink)->base_time;
2192   GST_LOG_OBJECT (sink,
2193       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2194       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2195
2196   /* add base_time to running_time to get the time against the clock */
2197   time += base_time;
2198
2199   /* Re-use existing clockid if available */
2200   /* FIXME: Casting to GstClockEntry only works because the types
2201    * are the same */
2202   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2203           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->priv->
2204               cached_clock_id) == clock)) {
2205     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2206             time)) {
2207       gst_clock_id_unref (sink->priv->cached_clock_id);
2208       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2209     }
2210   } else {
2211     if (sink->priv->cached_clock_id != NULL)
2212       gst_clock_id_unref (sink->priv->cached_clock_id);
2213     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2214   }
2215   GST_OBJECT_UNLOCK (sink);
2216
2217   /* A blocking wait is performed on the clock. We save the ClockID
2218    * so we can unlock the entry at any time. While we are blocking, we
2219    * release the PREROLL_LOCK so that other threads can interrupt the
2220    * entry. */
2221   sink->clock_id = sink->priv->cached_clock_id;
2222   /* release the preroll lock while waiting */
2223   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2224
2225   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2226
2227   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2228   sink->clock_id = NULL;
2229
2230   return ret;
2231
2232   /* no syncing needed */
2233 invalid_time:
2234   {
2235     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2236     return GST_CLOCK_BADTIME;
2237   }
2238 no_sync:
2239   {
2240     GST_DEBUG_OBJECT (sink, "sync disabled");
2241     GST_OBJECT_UNLOCK (sink);
2242     return GST_CLOCK_BADTIME;
2243   }
2244 no_clock:
2245   {
2246     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2247     GST_OBJECT_UNLOCK (sink);
2248     return GST_CLOCK_BADTIME;
2249   }
2250 }
2251
2252 /**
2253  * gst_base_sink_wait_preroll:
2254  * @sink: the sink
2255  *
2256  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2257  * against the clock it must unblock when going from PLAYING to the PAUSED state
2258  * and call this method before continuing to render the remaining data.
2259  *
2260  * This function will block until a state change to PLAYING happens (in which
2261  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2262  * to a state change to READY or a FLUSH event (in which case this function
2263  * returns #GST_FLOW_WRONG_STATE).
2264  *
2265  * This function should only be called with the PREROLL_LOCK held, like in the
2266  * render function.
2267  *
2268  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2269  * continue. Any other return value should be returned from the render vmethod.
2270  *
2271  * Since: 0.10.11
2272  */
2273 GstFlowReturn
2274 gst_base_sink_wait_preroll (GstBaseSink * sink)
2275 {
2276   sink->have_preroll = TRUE;
2277   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2278   /* block until the state changes, or we get a flush, or something */
2279   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2280   sink->have_preroll = FALSE;
2281   if (G_UNLIKELY (sink->flushing))
2282     goto stopping;
2283   if (G_UNLIKELY (sink->priv->step_unlock))
2284     goto step_unlocked;
2285   GST_DEBUG_OBJECT (sink, "continue after preroll");
2286
2287   return GST_FLOW_OK;
2288
2289   /* ERRORS */
2290 stopping:
2291   {
2292     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2293     return GST_FLOW_WRONG_STATE;
2294   }
2295 step_unlocked:
2296   {
2297     sink->priv->step_unlock = FALSE;
2298     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2299     return GST_FLOW_STEP;
2300   }
2301 }
2302
2303 static inline guint8
2304 get_object_type (GstMiniObject * obj)
2305 {
2306   guint8 obj_type;
2307
2308   if (G_LIKELY (GST_IS_BUFFER (obj)))
2309     obj_type = _PR_IS_BUFFER;
2310   else if (GST_IS_EVENT (obj))
2311     obj_type = _PR_IS_EVENT;
2312   else if (GST_IS_BUFFER_LIST (obj))
2313     obj_type = _PR_IS_BUFFERLIST;
2314   else
2315     obj_type = _PR_IS_NOTHING;
2316
2317   return obj_type;
2318 }
2319
2320 /**
2321  * gst_base_sink_do_preroll:
2322  * @sink: the sink
2323  * @obj: (transfer none): the mini object that caused the preroll
2324  *
2325  * If the @sink spawns its own thread for pulling buffers from upstream it
2326  * should call this method after it has pulled a buffer. If the element needed
2327  * to preroll, this function will perform the preroll and will then block
2328  * until the element state is changed.
2329  *
2330  * This function should be called with the PREROLL_LOCK held.
2331  *
2332  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2333  * continue. Any other return value should be returned from the render vmethod.
2334  *
2335  * Since: 0.10.22
2336  */
2337 GstFlowReturn
2338 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2339 {
2340   GstFlowReturn ret;
2341
2342   while (G_UNLIKELY (sink->need_preroll)) {
2343     guint8 obj_type;
2344     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2345
2346     obj_type = get_object_type (obj);
2347
2348     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2349     if (ret != GST_FLOW_OK)
2350       goto preroll_failed;
2351
2352     /* need to recheck here because the commit state could have
2353      * made us not need the preroll anymore */
2354     if (G_LIKELY (sink->need_preroll)) {
2355       /* block until the state changes, or we get a flush, or something */
2356       ret = gst_base_sink_wait_preroll (sink);
2357       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2358         goto preroll_failed;
2359     }
2360   }
2361   return GST_FLOW_OK;
2362
2363   /* ERRORS */
2364 preroll_failed:
2365   {
2366     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2367     return ret;
2368   }
2369 }
2370
2371 /**
2372  * gst_base_sink_wait_eos:
2373  * @sink: the sink
2374  * @time: the running_time to be reached
2375  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2376  *
2377  * This function will block until @time is reached. It is usually called by
2378  * subclasses that use their own internal synchronisation but want to let the
2379  * EOS be handled by the base class.
2380  *
2381  * This function should only be called with the PREROLL_LOCK held, like when
2382  * receiving an EOS event in the ::event vmethod.
2383  *
2384  * The @time argument should be the running_time of when the EOS should happen
2385  * and will be adjusted with any latency and offset configured in the sink.
2386  *
2387  * Returns: #GstFlowReturn
2388  *
2389  * Since: 0.10.15
2390  */
2391 GstFlowReturn
2392 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2393     GstClockTimeDiff * jitter)
2394 {
2395   GstClockReturn status;
2396   GstFlowReturn ret;
2397
2398   do {
2399     GstClockTime stime;
2400
2401     GST_DEBUG_OBJECT (sink, "checking preroll");
2402
2403     /* first wait for the playing state before we can continue */
2404     while (G_UNLIKELY (sink->need_preroll)) {
2405       ret = gst_base_sink_wait_preroll (sink);
2406       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2407         goto flushing;
2408     }
2409
2410     /* preroll done, we can sync since we are in PLAYING now. */
2411     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2412         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2413
2414     /* compensate for latency, ts_offset and render delay */
2415     stime = gst_base_sink_adjust_time (sink, time);
2416
2417     /* wait for the clock, this can be interrupted because we got shut down or
2418      * we PAUSED. */
2419     status = gst_base_sink_wait_clock (sink, stime, jitter);
2420
2421     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2422
2423     /* invalid time, no clock or sync disabled, just continue then */
2424     if (status == GST_CLOCK_BADTIME)
2425       break;
2426
2427     /* waiting could have been interrupted and we can be flushing now */
2428     if (G_UNLIKELY (sink->flushing))
2429       goto flushing;
2430
2431     /* retry if we got unscheduled, which means we did not reach the timeout
2432      * yet. if some other error occures, we continue. */
2433   } while (status == GST_CLOCK_UNSCHEDULED);
2434
2435   GST_DEBUG_OBJECT (sink, "end of stream");
2436
2437   return GST_FLOW_OK;
2438
2439   /* ERRORS */
2440 flushing:
2441   {
2442     GST_DEBUG_OBJECT (sink, "we are flushing");
2443     return GST_FLOW_WRONG_STATE;
2444   }
2445 }
2446
2447 /* with STREAM_LOCK, PREROLL_LOCK
2448  *
2449  * Make sure we are in PLAYING and synchronize an object to the clock.
2450  *
2451  * If we need preroll, we are not in PLAYING. We try to commit the state
2452  * if needed and then block if we still are not PLAYING.
2453  *
2454  * We start waiting on the clock in PLAYING. If we got interrupted, we
2455  * immediately try to re-preroll.
2456  *
2457  * Some objects do not need synchronisation (most events) and so this function
2458  * immediately returns GST_FLOW_OK.
2459  *
2460  * for objects that arrive later than max-lateness to be synchronized to the
2461  * clock have the @late boolean set to TRUE.
2462  *
2463  * This function keeps a running average of the jitter (the diff between the
2464  * clock time and the requested sync time). The jitter is negative for
2465  * objects that arrive in time and positive for late buffers.
2466  *
2467  * does not take ownership of obj.
2468  */
2469 static GstFlowReturn
2470 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2471     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2472 {
2473   GstClockTimeDiff jitter = 0;
2474   gboolean syncable;
2475   GstClockReturn status = GST_CLOCK_OK;
2476   GstClockTime rstart, rstop, sstart, sstop, stime;
2477   gboolean do_sync;
2478   GstBaseSinkPrivate *priv;
2479   GstFlowReturn ret;
2480   GstStepInfo *current, *pending;
2481   gboolean stepped;
2482
2483   priv = basesink->priv;
2484
2485 do_step:
2486   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2487   do_sync = TRUE;
2488   stepped = FALSE;
2489
2490   priv->current_rstart = GST_CLOCK_TIME_NONE;
2491
2492   /* get stepping info */
2493   current = &priv->current_step;
2494   pending = &priv->pending_step;
2495
2496   /* get timing information for this object against the render segment */
2497   syncable = gst_base_sink_get_sync_times (basesink, obj,
2498       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2499       current, step_end, obj_type);
2500
2501   if (G_UNLIKELY (stepped))
2502     goto step_skipped;
2503
2504   /* a syncable object needs to participate in preroll and
2505    * clocking. All buffers and EOS are syncable. */
2506   if (G_UNLIKELY (!syncable))
2507     goto not_syncable;
2508
2509   /* store timing info for current object */
2510   priv->current_rstart = rstart;
2511   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2512
2513   /* save sync time for eos when the previous object needed sync */
2514   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2515
2516   /* calculate inter frame spacing */
2517   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2518     GstClockTime in_diff;
2519
2520     in_diff = rstart - priv->prev_rstart;
2521
2522     if (priv->avg_in_diff == -1)
2523       priv->avg_in_diff = in_diff;
2524     else
2525       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2526
2527     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2528         GST_TIME_ARGS (priv->avg_in_diff));
2529
2530   }
2531   priv->prev_rstart = rstart;
2532
2533   if (G_UNLIKELY (priv->earliest_in_time != -1
2534           && rstart < priv->earliest_in_time))
2535     goto qos_dropped;
2536
2537 again:
2538   /* first do preroll, this makes sure we commit our state
2539    * to PAUSED and can continue to PLAYING. We cannot perform
2540    * any clock sync in PAUSED because there is no clock. */
2541   ret = gst_base_sink_do_preroll (basesink, obj);
2542   if (G_UNLIKELY (ret != GST_FLOW_OK))
2543     goto preroll_failed;
2544
2545   /* update the segment with a pending step if the current one is invalid and we
2546    * have a new pending one. We only accept new step updates after a preroll */
2547   if (G_UNLIKELY (pending->valid && !current->valid)) {
2548     start_stepping (basesink, &basesink->segment, pending, current);
2549     goto do_step;
2550   }
2551
2552   /* After rendering we store the position of the last buffer so that we can use
2553    * it to report the position. We need to take the lock here. */
2554   GST_OBJECT_LOCK (basesink);
2555   priv->current_sstart = sstart;
2556   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2557   GST_OBJECT_UNLOCK (basesink);
2558
2559   if (!do_sync)
2560     goto done;
2561
2562   /* adjust for latency */
2563   stime = gst_base_sink_adjust_time (basesink, rstart);
2564
2565   /* preroll done, we can sync since we are in PLAYING now. */
2566   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2567       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2568       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2569
2570   /* This function will return immediately if start == -1, no clock
2571    * or sync is disabled with GST_CLOCK_BADTIME. */
2572   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2573
2574   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2575       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2576
2577   /* invalid time, no clock or sync disabled, just render */
2578   if (status == GST_CLOCK_BADTIME)
2579     goto done;
2580
2581   /* waiting could have been interrupted and we can be flushing now */
2582   if (G_UNLIKELY (basesink->flushing))
2583     goto flushing;
2584
2585   /* check for unlocked by a state change, we are not flushing so
2586    * we can try to preroll on the current buffer. */
2587   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2588     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2589     priv->call_preroll = TRUE;
2590     goto again;
2591   }
2592
2593   /* successful syncing done, record observation */
2594   priv->current_jitter = jitter;
2595
2596   /* check if the object should be dropped */
2597   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2598       status, jitter);
2599
2600 done:
2601   return GST_FLOW_OK;
2602
2603   /* ERRORS */
2604 step_skipped:
2605   {
2606     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2607     *late = TRUE;
2608     return GST_FLOW_OK;
2609   }
2610 not_syncable:
2611   {
2612     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2613     return GST_FLOW_OK;
2614   }
2615 qos_dropped:
2616   {
2617     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2618     *late = TRUE;
2619     return GST_FLOW_OK;
2620   }
2621 flushing:
2622   {
2623     GST_DEBUG_OBJECT (basesink, "we are flushing");
2624     return GST_FLOW_WRONG_STATE;
2625   }
2626 preroll_failed:
2627   {
2628     GST_DEBUG_OBJECT (basesink, "preroll failed");
2629     *step_end = FALSE;
2630     return ret;
2631   }
2632 }
2633
2634 static gboolean
2635 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2636     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2637 {
2638   GstEvent *event;
2639   gboolean res;
2640
2641   /* generate Quality-of-Service event */
2642   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2643       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2644       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2645
2646   event = gst_event_new_qos_full (type, proportion, diff, time);
2647
2648   /* send upstream */
2649   res = gst_pad_push_event (basesink->sinkpad, event);
2650
2651   return res;
2652 }
2653
2654 static void
2655 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2656 {
2657   GstBaseSinkPrivate *priv;
2658   GstClockTime start, stop;
2659   GstClockTimeDiff jitter;
2660   GstClockTime pt, entered, left;
2661   GstClockTime duration;
2662   gdouble rate;
2663
2664   priv = sink->priv;
2665
2666   start = priv->current_rstart;
2667
2668   if (priv->current_step.valid)
2669     return;
2670
2671   /* if Quality-of-Service disabled, do nothing */
2672   if (!g_atomic_int_get (&priv->qos_enabled) ||
2673       !GST_CLOCK_TIME_IS_VALID (start))
2674     return;
2675
2676   stop = priv->current_rstop;
2677   jitter = priv->current_jitter;
2678
2679   if (jitter < 0) {
2680     /* this is the time the buffer entered the sink */
2681     if (start < -jitter)
2682       entered = 0;
2683     else
2684       entered = start + jitter;
2685     left = start;
2686   } else {
2687     /* this is the time the buffer entered the sink */
2688     entered = start + jitter;
2689     /* this is the time the buffer left the sink */
2690     left = start + jitter;
2691   }
2692
2693   /* calculate duration of the buffer */
2694   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2695     duration = stop - start;
2696   else
2697     duration = priv->avg_in_diff;
2698
2699   /* if we have the time when the last buffer left us, calculate
2700    * processing time */
2701   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2702     if (entered > priv->last_left) {
2703       pt = entered - priv->last_left;
2704     } else {
2705       pt = 0;
2706     }
2707   } else {
2708     pt = priv->avg_pt;
2709   }
2710
2711   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2712       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2713       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2714       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2715       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2716       GST_TIME_ARGS (duration), jitter);
2717
2718   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2719       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2720       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2721       priv->avg_rate);
2722
2723   /* collect running averages. for first observations, we copy the
2724    * values */
2725   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2726     priv->avg_duration = duration;
2727   else
2728     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2729
2730   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2731     priv->avg_pt = pt;
2732   else
2733     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2734
2735   if (priv->avg_duration != 0)
2736     rate =
2737         gst_guint64_to_gdouble (priv->avg_pt) /
2738         gst_guint64_to_gdouble (priv->avg_duration);
2739   else
2740     rate = 1.0;
2741
2742   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2743     if (dropped || priv->avg_rate < 0.0) {
2744       priv->avg_rate = rate;
2745     } else {
2746       if (rate > 1.0)
2747         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2748       else
2749         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2750     }
2751   }
2752
2753   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2754       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2755       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2756       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2757
2758
2759   if (priv->avg_rate >= 0.0) {
2760     GstQOSType type;
2761     GstClockTimeDiff diff;
2762
2763     /* if we have a valid rate, start sending QoS messages */
2764     if (priv->current_jitter < 0) {
2765       /* make sure we never go below 0 when adding the jitter to the
2766        * timestamp. */
2767       if (priv->current_rstart < -priv->current_jitter)
2768         priv->current_jitter = -priv->current_rstart;
2769     }
2770
2771     if (priv->throttle_time > 0) {
2772       diff = priv->throttle_time;
2773       type = GST_QOS_TYPE_THROTTLE;
2774     } else {
2775       diff = priv->current_jitter;
2776       if (diff <= 0)
2777         type = GST_QOS_TYPE_OVERFLOW;
2778       else
2779         type = GST_QOS_TYPE_UNDERFLOW;
2780     }
2781
2782     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2783         diff);
2784   }
2785
2786   /* record when this buffer will leave us */
2787   priv->last_left = left;
2788 }
2789
2790 /* reset all qos measuring */
2791 static void
2792 gst_base_sink_reset_qos (GstBaseSink * sink)
2793 {
2794   GstBaseSinkPrivate *priv;
2795
2796   priv = sink->priv;
2797
2798   priv->last_render_time = GST_CLOCK_TIME_NONE;
2799   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2800   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2801   priv->last_left = GST_CLOCK_TIME_NONE;
2802   priv->avg_duration = GST_CLOCK_TIME_NONE;
2803   priv->avg_pt = GST_CLOCK_TIME_NONE;
2804   priv->avg_rate = -1.0;
2805   priv->avg_render = GST_CLOCK_TIME_NONE;
2806   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2807   priv->rendered = 0;
2808   priv->dropped = 0;
2809
2810 }
2811
2812 /* Checks if the object was scheduled too late.
2813  *
2814  * rstart/rstop contain the running_time start and stop values
2815  * of the object.
2816  *
2817  * status and jitter contain the return values from the clock wait.
2818  *
2819  * returns TRUE if the buffer was too late.
2820  */
2821 static gboolean
2822 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2823     GstClockTime rstart, GstClockTime rstop,
2824     GstClockReturn status, GstClockTimeDiff jitter)
2825 {
2826   gboolean late;
2827   gint64 max_lateness;
2828   GstBaseSinkPrivate *priv;
2829
2830   priv = basesink->priv;
2831
2832   late = FALSE;
2833
2834   /* only for objects that were too late */
2835   if (G_LIKELY (status != GST_CLOCK_EARLY))
2836     goto in_time;
2837
2838   max_lateness = basesink->abidata.ABI.max_lateness;
2839
2840   /* check if frame dropping is enabled */
2841   if (max_lateness == -1)
2842     goto no_drop;
2843
2844   /* only check for buffers */
2845   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2846     goto not_buffer;
2847
2848   /* can't do check if we don't have a timestamp */
2849   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2850     goto no_timestamp;
2851
2852   /* we can add a valid stop time */
2853   if (GST_CLOCK_TIME_IS_VALID (rstop))
2854     max_lateness += rstop;
2855   else {
2856     max_lateness += rstart;
2857     /* no stop time, use avg frame diff */
2858     if (priv->avg_in_diff != -1)
2859       max_lateness += priv->avg_in_diff;
2860   }
2861
2862   /* if the jitter bigger than duration and lateness we are too late */
2863   if ((late = rstart + jitter > max_lateness)) {
2864     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2865         "buffer is too late %" GST_TIME_FORMAT
2866         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2867         GST_TIME_ARGS (max_lateness));
2868     /* !!emergency!!, if we did not receive anything valid for more than a
2869      * second, render it anyway so the user sees something */
2870     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2871         rstart - priv->last_render_time > GST_SECOND) {
2872       late = FALSE;
2873       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2874           (_("A lot of buffers are being dropped.")),
2875           ("There may be a timestamping problem, or this computer is too slow."));
2876       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2877           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2878           GST_TIME_ARGS (priv->last_render_time));
2879     }
2880   }
2881
2882 done:
2883   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2884     priv->last_render_time = rstart;
2885     /* the next allowed input timestamp */
2886     if (priv->throttle_time > 0)
2887       priv->earliest_in_time = rstart + priv->throttle_time;
2888   }
2889   return late;
2890
2891   /* all is fine */
2892 in_time:
2893   {
2894     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2895     goto done;
2896   }
2897 no_drop:
2898   {
2899     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2900     goto done;
2901   }
2902 not_buffer:
2903   {
2904     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2905     return FALSE;
2906   }
2907 no_timestamp:
2908   {
2909     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2910     return FALSE;
2911   }
2912 }
2913
2914 /* called before and after calling the render vmethod. It keeps track of how
2915  * much time was spent in the render method and is used to check if we are
2916  * flooded */
2917 static void
2918 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2919 {
2920   GstBaseSinkPrivate *priv;
2921
2922   priv = basesink->priv;
2923
2924   if (start) {
2925     priv->start = gst_util_get_timestamp ();
2926   } else {
2927     GstClockTime elapsed;
2928
2929     priv->stop = gst_util_get_timestamp ();
2930
2931     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2932
2933     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2934       priv->avg_render = elapsed;
2935     else
2936       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2937
2938     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2939         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2940   }
2941 }
2942
2943 /* with STREAM_LOCK, PREROLL_LOCK,
2944  *
2945  * Synchronize the object on the clock and then render it.
2946  *
2947  * takes ownership of obj.
2948  */
2949 static GstFlowReturn
2950 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2951     guint8 obj_type, gpointer obj)
2952 {
2953   GstFlowReturn ret;
2954   GstBaseSinkClass *bclass;
2955   gboolean late, step_end;
2956   gpointer sync_obj;
2957   GstBaseSinkPrivate *priv;
2958
2959   priv = basesink->priv;
2960
2961   if (OBJ_IS_BUFFERLIST (obj_type)) {
2962     /*
2963      * If buffer list, use the first group buffer within the list
2964      * for syncing
2965      */
2966     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2967     g_assert (NULL != sync_obj);
2968   } else {
2969     sync_obj = obj;
2970   }
2971
2972 again:
2973   late = FALSE;
2974   step_end = FALSE;
2975
2976   /* synchronize this object, non syncable objects return OK
2977    * immediately. */
2978   ret =
2979       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2980       obj_type);
2981   if (G_UNLIKELY (ret != GST_FLOW_OK))
2982     goto sync_failed;
2983
2984   /* and now render, event or buffer/buffer list. */
2985   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2986     /* drop late buffers unconditionally, let's hope it's unlikely */
2987     if (G_UNLIKELY (late))
2988       goto dropped;
2989
2990     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2991
2992     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2993             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2994       gint do_qos;
2995
2996       /* read once, to get same value before and after */
2997       do_qos = g_atomic_int_get (&priv->qos_enabled);
2998
2999       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
3000
3001       /* record rendering time for QoS and stats */
3002       if (do_qos)
3003         gst_base_sink_do_render_stats (basesink, TRUE);
3004
3005       if (!OBJ_IS_BUFFERLIST (obj_type)) {
3006         GstBuffer *buf;
3007
3008         /* For buffer lists do not set last buffer. Creating buffer
3009          * with meaningful data can be done only with memcpy which will
3010          * significantly affect performance */
3011         buf = GST_BUFFER_CAST (obj);
3012         gst_base_sink_set_last_buffer (basesink, buf);
3013
3014         ret = bclass->render (basesink, buf);
3015       } else {
3016         GstBufferList *buflist;
3017
3018         buflist = GST_BUFFER_LIST_CAST (obj);
3019
3020         ret = bclass->render_list (basesink, buflist);
3021       }
3022
3023       if (do_qos)
3024         gst_base_sink_do_render_stats (basesink, FALSE);
3025
3026       if (ret == GST_FLOW_STEP)
3027         goto again;
3028
3029       if (G_UNLIKELY (basesink->flushing))
3030         goto flushing;
3031
3032       priv->rendered++;
3033     }
3034   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3035     GstEvent *event = GST_EVENT_CAST (obj);
3036     gboolean event_res = TRUE;
3037     GstEventType type;
3038
3039     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3040
3041     type = GST_EVENT_TYPE (event);
3042
3043     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3044         gst_event_type_get_name (type));
3045
3046     if (bclass->event)
3047       event_res = bclass->event (basesink, event);
3048
3049     /* when we get here we could be flushing again when the event handler calls
3050      * _wait_eos(). We have to ignore this object in that case. */
3051     if (G_UNLIKELY (basesink->flushing))
3052       goto flushing;
3053
3054     if (G_LIKELY (event_res)) {
3055       guint32 seqnum;
3056
3057       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3058       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3059
3060       switch (type) {
3061         case GST_EVENT_EOS:
3062         {
3063           GstMessage *message;
3064
3065           /* the EOS event is completely handled so we mark
3066            * ourselves as being in the EOS state. eos is also
3067            * protected by the object lock so we can read it when
3068            * answering the POSITION query. */
3069           GST_OBJECT_LOCK (basesink);
3070           basesink->eos = TRUE;
3071           GST_OBJECT_UNLOCK (basesink);
3072
3073           /* ok, now we can post the message */
3074           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3075
3076           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3077           gst_message_set_seqnum (message, seqnum);
3078           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3079           break;
3080         }
3081         case GST_EVENT_NEWSEGMENT:
3082           /* configure the segment */
3083           gst_base_sink_configure_segment (basesink, pad, event,
3084               &basesink->segment);
3085           break;
3086         case GST_EVENT_SINK_MESSAGE:{
3087           GstMessage *msg = NULL;
3088
3089           gst_event_parse_sink_message (event, &msg);
3090
3091           if (msg)
3092             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3093         }
3094         default:
3095           break;
3096       }
3097     }
3098   } else {
3099     g_return_val_if_reached (GST_FLOW_ERROR);
3100   }
3101
3102 done:
3103   if (step_end) {
3104     /* the step ended, check if we need to activate a new step */
3105     GST_DEBUG_OBJECT (basesink, "step ended");
3106     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3107         priv->current_rstart, priv->current_rstop, basesink->eos);
3108     goto again;
3109   }
3110
3111   gst_base_sink_perform_qos (basesink, late);
3112
3113   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3114   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3115   return ret;
3116
3117   /* ERRORS */
3118 sync_failed:
3119   {
3120     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3121     goto done;
3122   }
3123 dropped:
3124   {
3125     priv->dropped++;
3126     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3127
3128     if (g_atomic_int_get (&priv->qos_enabled)) {
3129       GstMessage *qos_msg;
3130       GstClockTime timestamp, duration;
3131
3132       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3133       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3134
3135       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3136           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3137           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3138           GST_TIME_ARGS (priv->current_rstart),
3139           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3140           GST_TIME_ARGS (duration));
3141       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3142           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3143           priv->rendered, priv->dropped);
3144
3145       qos_msg =
3146           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3147           priv->current_rstart, priv->current_sstart, timestamp, duration);
3148       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3149           1000000);
3150       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3151           priv->dropped);
3152       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3153     }
3154     goto done;
3155   }
3156 flushing:
3157   {
3158     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3159     gst_mini_object_unref (obj);
3160     return GST_FLOW_WRONG_STATE;
3161   }
3162 }
3163
3164 /* with STREAM_LOCK, PREROLL_LOCK
3165  *
3166  * Perform preroll on the given object. For buffers this means
3167  * calling the preroll subclass method.
3168  * If that succeeds, the state will be commited.
3169  *
3170  * function does not take ownership of obj.
3171  */
3172 static GstFlowReturn
3173 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3174     GstMiniObject * obj)
3175 {
3176   GstFlowReturn ret;
3177
3178   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3179
3180   /* if it's a buffer, we need to call the preroll method */
3181   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3182     GstBaseSinkClass *bclass;
3183     GstBuffer *buf;
3184     GstClockTime timestamp;
3185
3186     if (OBJ_IS_BUFFERLIST (obj_type)) {
3187       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3188       g_assert (NULL != buf);
3189     } else {
3190       buf = GST_BUFFER_CAST (obj);
3191     }
3192
3193     timestamp = GST_BUFFER_TIMESTAMP (buf);
3194
3195     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3196         GST_TIME_ARGS (timestamp));
3197
3198     /*
3199      * For buffer lists do not set last buffer. Creating buffer
3200      * with meaningful data can be done only with memcpy which will
3201      * significantly affect performance
3202      */
3203     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3204       gst_base_sink_set_last_buffer (basesink, buf);
3205     }
3206
3207     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3208     if (bclass->preroll)
3209       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3210         goto preroll_failed;
3211
3212     basesink->priv->call_preroll = FALSE;
3213   }
3214
3215   /* commit state */
3216   if (G_LIKELY (basesink->playing_async)) {
3217     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3218       goto stopping;
3219   }
3220
3221   return GST_FLOW_OK;
3222
3223   /* ERRORS */
3224 preroll_failed:
3225   {
3226     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3227     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3228     return ret;
3229   }
3230 stopping:
3231   {
3232     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3233     return GST_FLOW_WRONG_STATE;
3234   }
3235 }
3236
3237 /* with STREAM_LOCK, PREROLL_LOCK
3238  *
3239  * Queue an object for rendering.
3240  * The first prerollable object queued will complete the preroll. If the
3241  * preroll queue is filled, we render all the objects in the queue.
3242  *
3243  * This function takes ownership of the object.
3244  */
3245 static GstFlowReturn
3246 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3247     guint8 obj_type, gpointer obj, gboolean prerollable)
3248 {
3249   GstFlowReturn ret = GST_FLOW_OK;
3250   gint length;
3251   GQueue *q;
3252
3253   if (G_UNLIKELY (basesink->need_preroll)) {
3254     if (G_LIKELY (prerollable))
3255       basesink->preroll_queued++;
3256
3257     length = basesink->preroll_queued;
3258
3259     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3260
3261     /* first prerollable item needs to finish the preroll */
3262     if (length == 1) {
3263       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3264       if (G_UNLIKELY (ret != GST_FLOW_OK))
3265         goto preroll_failed;
3266     }
3267     /* need to recheck if we need preroll, commit state during preroll
3268      * could have made us not need more preroll. */
3269     if (G_UNLIKELY (basesink->need_preroll)) {
3270       /* see if we can render now, if we can't add the object to the preroll
3271        * queue. */
3272       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3273         goto more_preroll;
3274     }
3275   }
3276   /* we can start rendering (or blocking) the queued object
3277    * if any. */
3278   q = basesink->preroll_queue;
3279   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3280     GstMiniObject *o;
3281     guint8 ot;
3282
3283     o = g_queue_pop_head (q);
3284     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3285
3286     ot = get_object_type (o);
3287
3288     /* do something with the return value */
3289     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3290     if (ret != GST_FLOW_OK)
3291       goto dequeue_failed;
3292   }
3293
3294   /* now render the object */
3295   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3296   basesink->preroll_queued = 0;
3297
3298   return ret;
3299
3300   /* special cases */
3301 preroll_failed:
3302   {
3303     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3304         gst_flow_get_name (ret));
3305     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3306     return ret;
3307   }
3308 more_preroll:
3309   {
3310     /* add object to the queue and return */
3311     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3312         length, basesink->preroll_queue_max_len);
3313     g_queue_push_tail (basesink->preroll_queue, obj);
3314     return GST_FLOW_OK;
3315   }
3316 dequeue_failed:
3317   {
3318     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3319         gst_flow_get_name (ret));
3320     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3321     return ret;
3322   }
3323 }
3324
3325 /* with STREAM_LOCK
3326  *
3327  * This function grabs the PREROLL_LOCK and adds the object to
3328  * the queue.
3329  *
3330  * This function takes ownership of obj.
3331  *
3332  * Note: Only GstEvent seem to be passed to this private method
3333  */
3334 static GstFlowReturn
3335 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3336     GstMiniObject * obj, gboolean prerollable)
3337 {
3338   GstFlowReturn ret;
3339
3340   GST_PAD_PREROLL_LOCK (pad);
3341   if (G_UNLIKELY (basesink->flushing))
3342     goto flushing;
3343
3344   if (G_UNLIKELY (basesink->priv->received_eos))
3345     goto was_eos;
3346
3347   ret =
3348       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3349       prerollable);
3350   GST_PAD_PREROLL_UNLOCK (pad);
3351
3352   return ret;
3353
3354   /* ERRORS */
3355 flushing:
3356   {
3357     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3358     GST_PAD_PREROLL_UNLOCK (pad);
3359     gst_mini_object_unref (obj);
3360     return GST_FLOW_WRONG_STATE;
3361   }
3362 was_eos:
3363   {
3364     GST_DEBUG_OBJECT (basesink,
3365         "we are EOS, dropping object, return UNEXPECTED");
3366     GST_PAD_PREROLL_UNLOCK (pad);
3367     gst_mini_object_unref (obj);
3368     return GST_FLOW_UNEXPECTED;
3369   }
3370 }
3371
3372 static void
3373 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3374 {
3375   /* make sure we are not blocked on the clock also clear any pending
3376    * eos state. */
3377   gst_base_sink_set_flushing (basesink, pad, TRUE);
3378
3379   /* we grab the stream lock but that is not needed since setting the
3380    * sink to flushing would make sure no state commit is being done
3381    * anymore */
3382   GST_PAD_STREAM_LOCK (pad);
3383   gst_base_sink_reset_qos (basesink);
3384   /* and we need to commit our state again on the next
3385    * prerolled buffer */
3386   basesink->playing_async = TRUE;
3387   if (basesink->priv->async_enabled) {
3388     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3389   } else {
3390     /* start time reset in above case as well;
3391      * arranges for a.o. proper position reporting when flushing in PAUSED */
3392     gst_element_set_start_time (GST_ELEMENT_CAST (basesink), 0);
3393     basesink->priv->have_latency = TRUE;
3394   }
3395   gst_base_sink_set_last_buffer (basesink, NULL);
3396   GST_PAD_STREAM_UNLOCK (pad);
3397 }
3398
3399 static void
3400 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3401 {
3402   /* unset flushing so we can accept new data, this also flushes out any EOS
3403    * event. */
3404   gst_base_sink_set_flushing (basesink, pad, FALSE);
3405
3406   /* for position reporting */
3407   GST_OBJECT_LOCK (basesink);
3408   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3409   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3410   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3411   basesink->priv->call_preroll = TRUE;
3412   basesink->priv->current_step.valid = FALSE;
3413   basesink->priv->pending_step.valid = FALSE;
3414   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3415     /* we need new segment info after the flush. */
3416     basesink->have_newsegment = FALSE;
3417     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3418     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3419   }
3420   GST_OBJECT_UNLOCK (basesink);
3421 }
3422
3423 static gboolean
3424 gst_base_sink_event (GstPad * pad, GstEvent * event)
3425 {
3426   GstBaseSink *basesink;
3427   gboolean result = TRUE;
3428   GstBaseSinkClass *bclass;
3429
3430   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3431   if (G_UNLIKELY (basesink == NULL)) {
3432     gst_event_unref (event);
3433     return FALSE;
3434   }
3435
3436   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3437
3438   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3439       event);
3440
3441   switch (GST_EVENT_TYPE (event)) {
3442     case GST_EVENT_EOS:
3443     {
3444       GstFlowReturn ret;
3445
3446       GST_PAD_PREROLL_LOCK (pad);
3447       if (G_UNLIKELY (basesink->flushing))
3448         goto flushing;
3449
3450       if (G_UNLIKELY (basesink->priv->received_eos)) {
3451         /* we can't accept anything when we are EOS */
3452         result = FALSE;
3453         gst_event_unref (event);
3454       } else {
3455         /* we set the received EOS flag here so that we can use it when testing if
3456          * we are prerolled and to refuse more buffers. */
3457         basesink->priv->received_eos = TRUE;
3458
3459         /* EOS is a prerollable object, we call the unlocked version because it
3460          * does not check the received_eos flag. */
3461         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3462             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3463         if (G_UNLIKELY (ret != GST_FLOW_OK))
3464           result = FALSE;
3465       }
3466       GST_PAD_PREROLL_UNLOCK (pad);
3467       break;
3468     }
3469     case GST_EVENT_NEWSEGMENT:
3470     {
3471       GstFlowReturn ret;
3472       gboolean update;
3473
3474       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3475
3476       GST_PAD_PREROLL_LOCK (pad);
3477       if (G_UNLIKELY (basesink->flushing))
3478         goto flushing;
3479
3480       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3481           NULL, NULL);
3482
3483       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3484         /* we can't accept anything when we are EOS */
3485         result = FALSE;
3486         gst_event_unref (event);
3487       } else {
3488         /* the new segment is a non prerollable item and does not block anything,
3489          * we need to configure the current clipping segment and insert the event
3490          * in the queue to serialize it with the buffers for rendering. */
3491         gst_base_sink_configure_segment (basesink, pad, event,
3492             basesink->abidata.ABI.clip_segment);
3493
3494         ret =
3495             gst_base_sink_queue_object_unlocked (basesink, pad,
3496             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3497         if (G_UNLIKELY (ret != GST_FLOW_OK))
3498           result = FALSE;
3499         else {
3500           GST_OBJECT_LOCK (basesink);
3501           basesink->have_newsegment = TRUE;
3502           GST_OBJECT_UNLOCK (basesink);
3503         }
3504       }
3505       GST_PAD_PREROLL_UNLOCK (pad);
3506       break;
3507     }
3508     case GST_EVENT_FLUSH_START:
3509       if (bclass->event)
3510         bclass->event (basesink, event);
3511
3512       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3513
3514       gst_base_sink_flush_start (basesink, pad);
3515
3516       gst_event_unref (event);
3517       break;
3518     case GST_EVENT_FLUSH_STOP:
3519       if (bclass->event)
3520         bclass->event (basesink, event);
3521
3522       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3523
3524       gst_base_sink_flush_stop (basesink, pad);
3525
3526       gst_event_unref (event);
3527       break;
3528     default:
3529       /* other events are sent to queue or subclass depending on if they
3530        * are serialized. */
3531       if (GST_EVENT_IS_SERIALIZED (event)) {
3532         gst_base_sink_queue_object (basesink, pad,
3533             GST_MINI_OBJECT_CAST (event), FALSE);
3534       } else {
3535         if (bclass->event)
3536           bclass->event (basesink, event);
3537         gst_event_unref (event);
3538       }
3539       break;
3540   }
3541 done:
3542   gst_object_unref (basesink);
3543
3544   return result;
3545
3546   /* ERRORS */
3547 flushing:
3548   {
3549     GST_DEBUG_OBJECT (basesink, "we are flushing");
3550     GST_PAD_PREROLL_UNLOCK (pad);
3551     result = FALSE;
3552     gst_event_unref (event);
3553     goto done;
3554   }
3555 }
3556
3557 /* default implementation to calculate the start and end
3558  * timestamps on a buffer, subclasses can override
3559  */
3560 static void
3561 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3562     GstClockTime * start, GstClockTime * end)
3563 {
3564   GstClockTime timestamp, duration;
3565
3566   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3567   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3568
3569     /* get duration to calculate end time */
3570     duration = GST_BUFFER_DURATION (buffer);
3571     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3572       *end = timestamp + duration;
3573     }
3574     *start = timestamp;
3575   }
3576 }
3577
3578 /* must be called with PREROLL_LOCK */
3579 static gboolean
3580 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3581 {
3582   gboolean is_prerolled, res;
3583
3584   /* we have 2 cases where the PREROLL_LOCK is released:
3585    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3586    *  2) we are syncing on the clock
3587    */
3588   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3589   res = !is_prerolled;
3590
3591   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3592       basesink->have_preroll, basesink->priv->received_eos, res);
3593
3594   return res;
3595 }
3596
3597 /* with STREAM_LOCK, PREROLL_LOCK
3598  *
3599  * Takes a buffer and compare the timestamps with the last segment.
3600  * If the buffer falls outside of the segment boundaries, drop it.
3601  * Else queue the buffer for preroll and rendering.
3602  *
3603  * This function takes ownership of the buffer.
3604  */
3605 static GstFlowReturn
3606 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3607     guint8 obj_type, gpointer obj)
3608 {
3609   GstBaseSinkClass *bclass;
3610   GstFlowReturn result;
3611   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3612   GstSegment *clip_segment;
3613   GstBuffer *time_buf;
3614
3615   if (G_UNLIKELY (basesink->flushing))
3616     goto flushing;
3617
3618   if (G_UNLIKELY (basesink->priv->received_eos))
3619     goto was_eos;
3620
3621   if (OBJ_IS_BUFFERLIST (obj_type)) {
3622     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3623     g_assert (NULL != time_buf);
3624   } else {
3625     time_buf = GST_BUFFER_CAST (obj);
3626   }
3627
3628   /* for code clarity */
3629   clip_segment = basesink->abidata.ABI.clip_segment;
3630
3631   if (G_UNLIKELY (!basesink->have_newsegment)) {
3632     gboolean sync;
3633
3634     sync = gst_base_sink_get_sync (basesink);
3635     if (sync) {
3636       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3637           (_("Internal data flow problem.")),
3638           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3639     }
3640
3641     /* this means this sink will assume timestamps start from 0 */
3642     GST_OBJECT_LOCK (basesink);
3643     clip_segment->start = 0;
3644     clip_segment->stop = -1;
3645     basesink->segment.start = 0;
3646     basesink->segment.stop = -1;
3647     basesink->have_newsegment = TRUE;
3648     GST_OBJECT_UNLOCK (basesink);
3649   }
3650
3651   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3652
3653   /* check if the buffer needs to be dropped, we first ask the subclass for the
3654    * start and end */
3655   if (bclass->get_times)
3656     bclass->get_times (basesink, time_buf, &start, &end);
3657
3658   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3659     /* if the subclass does not want sync, we use our own values so that we at
3660      * least clip the buffer to the segment */
3661     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3662   }
3663
3664   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3665       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3666
3667   /* a dropped buffer does not participate in anything */
3668   if (GST_CLOCK_TIME_IS_VALID (start) &&
3669       (clip_segment->format == GST_FORMAT_TIME)) {
3670     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3671                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3672       goto out_of_segment;
3673   }
3674
3675   /* now we can process the buffer in the queue, this function takes ownership
3676    * of the buffer */
3677   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3678       obj_type, obj, TRUE);
3679   return result;
3680
3681   /* ERRORS */
3682 flushing:
3683   {
3684     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3685     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3686     return GST_FLOW_WRONG_STATE;
3687   }
3688 was_eos:
3689   {
3690     GST_DEBUG_OBJECT (basesink,
3691         "we are EOS, dropping object, return UNEXPECTED");
3692     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3693     return GST_FLOW_UNEXPECTED;
3694   }
3695 out_of_segment:
3696   {
3697     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3698     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3699     return GST_FLOW_OK;
3700   }
3701 }
3702
3703 /* with STREAM_LOCK
3704  */
3705 static GstFlowReturn
3706 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3707     guint8 obj_type, gpointer obj)
3708 {
3709   GstFlowReturn result;
3710
3711   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3712     goto wrong_mode;
3713
3714   GST_PAD_PREROLL_LOCK (pad);
3715   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3716   GST_PAD_PREROLL_UNLOCK (pad);
3717
3718 done:
3719   return result;
3720
3721   /* ERRORS */
3722 wrong_mode:
3723   {
3724     GST_OBJECT_LOCK (pad);
3725     GST_WARNING_OBJECT (basesink,
3726         "Push on pad %s:%s, but it was not activated in push mode",
3727         GST_DEBUG_PAD_NAME (pad));
3728     GST_OBJECT_UNLOCK (pad);
3729     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3730     /* we don't post an error message this will signal to the peer
3731      * pushing that EOS is reached. */
3732     result = GST_FLOW_UNEXPECTED;
3733     goto done;
3734   }
3735 }
3736
3737 static GstFlowReturn
3738 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3739 {
3740   GstBaseSink *basesink;
3741
3742   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3743
3744   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3745 }
3746
3747 static GstFlowReturn
3748 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3749 {
3750   GstBaseSink *basesink;
3751   GstBaseSinkClass *bclass;
3752   GstFlowReturn result;
3753
3754   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3755   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3756
3757   if (G_LIKELY (bclass->render_list)) {
3758     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3759   } else {
3760     GstBufferListIterator *it;
3761     GstBuffer *group;
3762
3763     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3764
3765     it = gst_buffer_list_iterate (list);
3766
3767     if (gst_buffer_list_iterator_next_group (it)) {
3768       do {
3769         group = gst_buffer_list_iterator_merge_group (it);
3770         if (group == NULL) {
3771           group = gst_buffer_new ();
3772           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3773         } else {
3774           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3775         }
3776         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3777       } while (result == GST_FLOW_OK
3778           && gst_buffer_list_iterator_next_group (it));
3779     } else {
3780       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3781       result =
3782           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3783           gst_buffer_new ());
3784     }
3785     gst_buffer_list_iterator_free (it);
3786     gst_buffer_list_unref (list);
3787   }
3788   return result;
3789 }
3790
3791
3792 static gboolean
3793 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3794 {
3795   gboolean res = TRUE;
3796
3797   /* update our offset if the start/stop position was updated */
3798   if (segment->format == GST_FORMAT_BYTES) {
3799     segment->time = segment->start;
3800   } else if (segment->start == 0) {
3801     /* seek to start, we can implement a default for this. */
3802     segment->time = 0;
3803   } else {
3804     res = FALSE;
3805     GST_INFO_OBJECT (sink, "Can't do a default seek");
3806   }
3807
3808   return res;
3809 }
3810
3811 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3812
3813 static gboolean
3814 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3815     GstEvent * event, GstSegment * segment)
3816 {
3817   /* By default, we try one of 2 things:
3818    *   - For absolute seek positions, convert the requested position to our
3819    *     configured processing format and place it in the output segment \
3820    *   - For relative seek positions, convert our current (input) values to the
3821    *     seek format, adjust by the relative seek offset and then convert back to
3822    *     the processing format
3823    */
3824   GstSeekType cur_type, stop_type;
3825   gint64 cur, stop;
3826   GstSeekFlags flags;
3827   GstFormat seek_format, dest_format;
3828   gdouble rate;
3829   gboolean update;
3830   gboolean res = TRUE;
3831
3832   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3833       &cur_type, &cur, &stop_type, &stop);
3834   dest_format = segment->format;
3835
3836   if (seek_format == dest_format) {
3837     gst_segment_set_seek (segment, rate, seek_format, flags,
3838         cur_type, cur, stop_type, stop, &update);
3839     return TRUE;
3840   }
3841
3842   if (cur_type != GST_SEEK_TYPE_NONE) {
3843     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3844     res =
3845         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3846         &cur);
3847     cur_type = GST_SEEK_TYPE_SET;
3848   }
3849
3850   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3851     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3852     res =
3853         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3854         &stop);
3855     stop_type = GST_SEEK_TYPE_SET;
3856   }
3857
3858   /* And finally, configure our output segment in the desired format */
3859   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3860       stop_type, stop, &update);
3861
3862   if (!res)
3863     goto no_format;
3864
3865   return res;
3866
3867 no_format:
3868   {
3869     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3870     return FALSE;
3871   }
3872 }
3873
3874 /* perform a seek, only executed in pull mode */
3875 static gboolean
3876 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3877 {
3878   gboolean flush;
3879   gdouble rate;
3880   GstFormat seek_format, dest_format;
3881   GstSeekFlags flags;
3882   GstSeekType cur_type, stop_type;
3883   gboolean seekseg_configured = FALSE;
3884   gint64 cur, stop;
3885   gboolean update, res = TRUE;
3886   GstSegment seeksegment;
3887
3888   dest_format = sink->segment.format;
3889
3890   if (event) {
3891     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3892     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3893         &cur_type, &cur, &stop_type, &stop);
3894
3895     flush = flags & GST_SEEK_FLAG_FLUSH;
3896   } else {
3897     GST_DEBUG_OBJECT (sink, "performing seek without event");
3898     flush = FALSE;
3899   }
3900
3901   if (flush) {
3902     GST_DEBUG_OBJECT (sink, "flushing upstream");
3903     gst_pad_push_event (pad, gst_event_new_flush_start ());
3904     gst_base_sink_flush_start (sink, pad);
3905   } else {
3906     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3907   }
3908
3909   GST_PAD_STREAM_LOCK (pad);
3910
3911   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3912    * copy the current segment info into the temp segment that we can actually
3913    * attempt the seek with. We only update the real segment if the seek succeeds. */
3914   if (!seekseg_configured) {
3915     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3916
3917     /* now configure the final seek segment */
3918     if (event) {
3919       if (sink->segment.format != seek_format) {
3920         /* OK, here's where we give the subclass a chance to convert the relative
3921          * seek into an absolute one in the processing format. We set up any
3922          * absolute seek above, before taking the stream lock. */
3923         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3924                 &seeksegment)) {
3925           GST_DEBUG_OBJECT (sink,
3926               "Preparing the seek failed after flushing. " "Aborting seek");
3927           res = FALSE;
3928         }
3929       } else {
3930         /* The seek format matches our processing format, no need to ask the
3931          * the subclass to configure the segment. */
3932         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3933             cur_type, cur, stop_type, stop, &update);
3934       }
3935     }
3936     /* Else, no seek event passed, so we're just (re)starting the
3937        current segment. */
3938   }
3939
3940   if (res) {
3941     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3942         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3943         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3944
3945     /* do the seek, segment.last_stop contains the new position. */
3946     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3947   }
3948
3949
3950   if (flush) {
3951     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3952     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3953     gst_base_sink_flush_stop (sink, pad);
3954   } else if (res && sink->abidata.ABI.running) {
3955     /* we are running the current segment and doing a non-flushing seek,
3956      * close the segment first based on the last_stop. */
3957     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3958         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3959   }
3960
3961   /* The subclass must have converted the segment to the processing format
3962    * by now */
3963   if (res && seeksegment.format != dest_format) {
3964     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3965         "in the correct format. Aborting seek.");
3966     res = FALSE;
3967   }
3968
3969   /* if successful seek, we update our real segment and push
3970    * out the new segment. */
3971   if (res) {
3972     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3973
3974     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3975       gst_element_post_message (GST_ELEMENT (sink),
3976           gst_message_new_segment_start (GST_OBJECT (sink),
3977               sink->segment.format, sink->segment.last_stop));
3978     }
3979   }
3980
3981   sink->priv->discont = TRUE;
3982   sink->abidata.ABI.running = TRUE;
3983
3984   GST_PAD_STREAM_UNLOCK (pad);
3985
3986   return res;
3987 }
3988
3989 static void
3990 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3991     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3992     gboolean flush, gboolean intermediate)
3993 {
3994   GST_OBJECT_LOCK (sink);
3995   pending->seqnum = seqnum;
3996   pending->format = format;
3997   pending->amount = amount;
3998   pending->position = 0;
3999   pending->rate = rate;
4000   pending->flush = flush;
4001   pending->intermediate = intermediate;
4002   pending->valid = TRUE;
4003   /* flush invalidates the current stepping segment */
4004   if (flush)
4005     current->valid = FALSE;
4006   GST_OBJECT_UNLOCK (sink);
4007 }
4008
4009 static gboolean
4010 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
4011 {
4012   GstBaseSinkPrivate *priv;
4013   GstBaseSinkClass *bclass;
4014   gboolean flush, intermediate;
4015   gdouble rate;
4016   GstFormat format;
4017   guint64 amount;
4018   guint seqnum;
4019   GstStepInfo *pending, *current;
4020   GstMessage *message;
4021
4022   bclass = GST_BASE_SINK_GET_CLASS (sink);
4023   priv = sink->priv;
4024
4025   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
4026
4027   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
4028   seqnum = gst_event_get_seqnum (event);
4029
4030   pending = &priv->pending_step;
4031   current = &priv->current_step;
4032
4033   /* post message first */
4034   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4035       amount, rate, flush, intermediate);
4036   gst_message_set_seqnum (message, seqnum);
4037   gst_element_post_message (GST_ELEMENT (sink), message);
4038
4039   if (flush) {
4040     /* we need to call ::unlock before locking PREROLL_LOCK
4041      * since we lock it before going into ::render */
4042     if (bclass->unlock)
4043       bclass->unlock (sink);
4044
4045     GST_PAD_PREROLL_LOCK (sink->sinkpad);
4046     /* now that we have the PREROLL lock, clear our unlock request */
4047     if (bclass->unlock_stop)
4048       bclass->unlock_stop (sink);
4049
4050     /* update the stepinfo and make it valid */
4051     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4052         intermediate);
4053
4054     if (sink->priv->async_enabled) {
4055       /* and we need to commit our state again on the next
4056        * prerolled buffer */
4057       sink->playing_async = TRUE;
4058       priv->pending_step.need_preroll = TRUE;
4059       sink->need_preroll = FALSE;
4060       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4061     } else {
4062       sink->priv->have_latency = TRUE;
4063       sink->need_preroll = FALSE;
4064     }
4065     priv->current_sstart = GST_CLOCK_TIME_NONE;
4066     priv->current_sstop = GST_CLOCK_TIME_NONE;
4067     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4068     priv->call_preroll = TRUE;
4069     gst_base_sink_set_last_buffer (sink, NULL);
4070     gst_base_sink_reset_qos (sink);
4071
4072     if (sink->clock_id) {
4073       gst_clock_id_unschedule (sink->clock_id);
4074     }
4075
4076     if (sink->have_preroll) {
4077       GST_DEBUG_OBJECT (sink, "signal waiter");
4078       priv->step_unlock = TRUE;
4079       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4080     }
4081     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4082   } else {
4083     /* update the stepinfo and make it valid */
4084     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4085         intermediate);
4086   }
4087
4088   return TRUE;
4089 }
4090
4091 /* with STREAM_LOCK
4092  */
4093 static void
4094 gst_base_sink_loop (GstPad * pad)
4095 {
4096   GstBaseSink *basesink;
4097   GstBuffer *buf = NULL;
4098   GstFlowReturn result;
4099   guint blocksize;
4100   guint64 offset;
4101
4102   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4103
4104   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4105
4106   if ((blocksize = basesink->priv->blocksize) == 0)
4107     blocksize = -1;
4108
4109   offset = basesink->segment.last_stop;
4110
4111   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4112       offset, blocksize);
4113
4114   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4115   if (G_UNLIKELY (result != GST_FLOW_OK))
4116     goto paused;
4117
4118   if (G_UNLIKELY (buf == NULL))
4119     goto no_buffer;
4120
4121   offset += GST_BUFFER_SIZE (buf);
4122
4123   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4124
4125   GST_PAD_PREROLL_LOCK (pad);
4126   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4127   GST_PAD_PREROLL_UNLOCK (pad);
4128   if (G_UNLIKELY (result != GST_FLOW_OK))
4129     goto paused;
4130
4131   return;
4132
4133   /* ERRORS */
4134 paused:
4135   {
4136     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4137         gst_flow_get_name (result));
4138     gst_pad_pause_task (pad);
4139     if (result == GST_FLOW_UNEXPECTED) {
4140       /* perform EOS logic */
4141       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4142         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4143             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4144                 basesink->segment.format, basesink->segment.last_stop));
4145       } else {
4146         gst_base_sink_event (pad, gst_event_new_eos ());
4147       }
4148     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4149       /* for fatal errors we post an error message, post the error
4150        * first so the app knows about the error first. 
4151        * wrong-state is not a fatal error because it happens due to
4152        * flushing and posting an error message in that case is the
4153        * wrong thing to do, e.g. when basesrc is doing a flushing
4154        * seek. */
4155       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4156           (_("Internal data stream error.")),
4157           ("stream stopped, reason %s", gst_flow_get_name (result)));
4158       gst_base_sink_event (pad, gst_event_new_eos ());
4159     }
4160     return;
4161   }
4162 no_buffer:
4163   {
4164     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4165     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4166         (_("Internal data flow error.")), ("element returned NULL buffer"));
4167     result = GST_FLOW_ERROR;
4168     goto paused;
4169   }
4170 }
4171
4172 static gboolean
4173 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4174     gboolean flushing)
4175 {
4176   GstBaseSinkClass *bclass;
4177
4178   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4179
4180   if (flushing) {
4181     /* unlock any subclasses, we need to do this before grabbing the
4182      * PREROLL_LOCK since we hold this lock before going into ::render. */
4183     if (bclass->unlock)
4184       bclass->unlock (basesink);
4185   }
4186
4187   GST_PAD_PREROLL_LOCK (pad);
4188   basesink->flushing = flushing;
4189   if (flushing) {
4190     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4191     if (bclass->unlock_stop)
4192       bclass->unlock_stop (basesink);
4193
4194     /* set need_preroll before we unblock the clock. If the clock is unblocked
4195      * before timing out, we can reuse the buffer for preroll. */
4196     basesink->need_preroll = TRUE;
4197
4198     /* step 2, unblock clock sync (if any) or any other blocking thing */
4199     if (basesink->clock_id) {
4200       gst_clock_id_unschedule (basesink->clock_id);
4201     }
4202
4203     /* flush out the data thread if it's locked in finish_preroll, this will
4204      * also flush out the EOS state */
4205     GST_DEBUG_OBJECT (basesink,
4206         "flushing out data thread, need preroll to TRUE");
4207     gst_base_sink_preroll_queue_flush (basesink, pad);
4208   }
4209   GST_PAD_PREROLL_UNLOCK (pad);
4210
4211   return TRUE;
4212 }
4213
4214 static gboolean
4215 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4216 {
4217   gboolean result;
4218
4219   if (active) {
4220     /* start task */
4221     result = gst_pad_start_task (basesink->sinkpad,
4222         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4223   } else {
4224     /* step 2, make sure streaming finishes */
4225     result = gst_pad_stop_task (basesink->sinkpad);
4226   }
4227
4228   return result;
4229 }
4230
4231 static gboolean
4232 gst_base_sink_pad_activate (GstPad * pad)
4233 {
4234   gboolean result = FALSE;
4235   GstBaseSink *basesink;
4236
4237   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4238
4239   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4240
4241   gst_base_sink_set_flushing (basesink, pad, FALSE);
4242
4243   /* we need to have the pull mode enabled */
4244   if (!basesink->can_activate_pull) {
4245     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4246     goto fallback;
4247   }
4248
4249   /* check if downstreams supports pull mode at all */
4250   if (!gst_pad_check_pull_range (pad)) {
4251     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4252     goto fallback;
4253   }
4254
4255   /* set the pad mode before starting the task so that it's in the
4256    * correct state for the new thread. also the sink set_caps and get_caps
4257    * function checks this */
4258   basesink->pad_mode = GST_ACTIVATE_PULL;
4259
4260   /* we first try to negotiate a format so that when we try to activate
4261    * downstream, it knows about our format */
4262   if (!gst_base_sink_negotiate_pull (basesink)) {
4263     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4264     goto fallback;
4265   }
4266
4267   /* ok activate now */
4268   if (!gst_pad_activate_pull (pad, TRUE)) {
4269     /* clear any pending caps */
4270     GST_OBJECT_LOCK (basesink);
4271     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4272     GST_OBJECT_UNLOCK (basesink);
4273     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4274     goto fallback;
4275   }
4276
4277   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4278   result = TRUE;
4279   goto done;
4280
4281   /* push mode fallback */
4282 fallback:
4283   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4284   if ((result = gst_pad_activate_push (pad, TRUE))) {
4285     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4286   }
4287
4288 done:
4289   if (!result) {
4290     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4291     gst_base_sink_set_flushing (basesink, pad, TRUE);
4292   }
4293
4294   gst_object_unref (basesink);
4295
4296   return result;
4297 }
4298
4299 static gboolean
4300 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4301 {
4302   gboolean result;
4303   GstBaseSink *basesink;
4304
4305   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4306
4307   if (active) {
4308     if (!basesink->can_activate_push) {
4309       result = FALSE;
4310       basesink->pad_mode = GST_ACTIVATE_NONE;
4311     } else {
4312       result = TRUE;
4313       basesink->pad_mode = GST_ACTIVATE_PUSH;
4314     }
4315   } else {
4316     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4317       g_warning ("Internal GStreamer activation error!!!");
4318       result = FALSE;
4319     } else {
4320       gst_base_sink_set_flushing (basesink, pad, TRUE);
4321       result = TRUE;
4322       basesink->pad_mode = GST_ACTIVATE_NONE;
4323     }
4324   }
4325
4326   gst_object_unref (basesink);
4327
4328   return result;
4329 }
4330
4331 static gboolean
4332 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4333 {
4334   GstCaps *caps;
4335   gboolean result;
4336
4337   result = FALSE;
4338
4339   /* this returns the intersection between our caps and the peer caps. If there
4340    * is no peer, it returns NULL and we can't operate in pull mode so we can
4341    * fail the negotiation. */
4342   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4343   if (caps == NULL || gst_caps_is_empty (caps))
4344     goto no_caps_possible;
4345
4346   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4347
4348   caps = gst_caps_make_writable (caps);
4349   /* get the first (preferred) format */
4350   gst_caps_truncate (caps);
4351   /* try to fixate */
4352   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4353
4354   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4355
4356   if (gst_caps_is_any (caps)) {
4357     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4358         "allowing pull()");
4359     /* neither side has template caps in this case, so they are prepared for
4360        pull() without setcaps() */
4361     result = TRUE;
4362   } else if (gst_caps_is_fixed (caps)) {
4363     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4364       goto could_not_set_caps;
4365
4366     GST_OBJECT_LOCK (basesink);
4367     gst_caps_replace (&basesink->priv->pull_caps, caps);
4368     GST_OBJECT_UNLOCK (basesink);
4369
4370     result = TRUE;
4371   }
4372
4373   gst_caps_unref (caps);
4374
4375   return result;
4376
4377 no_caps_possible:
4378   {
4379     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4380     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4381     if (caps)
4382       gst_caps_unref (caps);
4383     return FALSE;
4384   }
4385 could_not_set_caps:
4386   {
4387     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4388     gst_caps_unref (caps);
4389     return FALSE;
4390   }
4391 }
4392
4393 /* this won't get called until we implement an activate function */
4394 static gboolean
4395 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4396 {
4397   gboolean result = FALSE;
4398   GstBaseSink *basesink;
4399   GstBaseSinkClass *bclass;
4400
4401   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4402   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4403
4404   if (active) {
4405     GstFormat format;
4406     gint64 duration;
4407
4408     /* we mark we have a newsegment here because pull based
4409      * mode works just fine without having a newsegment before the
4410      * first buffer */
4411     format = GST_FORMAT_BYTES;
4412
4413     gst_segment_init (&basesink->segment, format);
4414     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4415     GST_OBJECT_LOCK (basesink);
4416     basesink->have_newsegment = TRUE;
4417     GST_OBJECT_UNLOCK (basesink);
4418
4419     /* get the peer duration in bytes */
4420     result = gst_pad_query_peer_duration (pad, &format, &duration);
4421     if (result) {
4422       GST_DEBUG_OBJECT (basesink,
4423           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4424       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4425           duration);
4426       gst_segment_set_duration (&basesink->segment, format, duration);
4427     } else {
4428       GST_DEBUG_OBJECT (basesink, "unknown duration");
4429     }
4430
4431     if (bclass->activate_pull)
4432       result = bclass->activate_pull (basesink, TRUE);
4433     else
4434       result = FALSE;
4435
4436     if (!result)
4437       goto activate_failed;
4438
4439   } else {
4440     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4441       g_warning ("Internal GStreamer activation error!!!");
4442       result = FALSE;
4443     } else {
4444       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4445       if (bclass->activate_pull)
4446         result &= bclass->activate_pull (basesink, FALSE);
4447       basesink->pad_mode = GST_ACTIVATE_NONE;
4448       /* clear any pending caps */
4449       GST_OBJECT_LOCK (basesink);
4450       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4451       GST_OBJECT_UNLOCK (basesink);
4452     }
4453   }
4454   gst_object_unref (basesink);
4455
4456   return result;
4457
4458   /* ERRORS */
4459 activate_failed:
4460   {
4461     /* reset, as starting the thread failed */
4462     basesink->pad_mode = GST_ACTIVATE_NONE;
4463
4464     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4465     return FALSE;
4466   }
4467 }
4468
4469 /* send an event to our sinkpad peer. */
4470 static gboolean
4471 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4472 {
4473   GstPad *pad;
4474   GstBaseSink *basesink = GST_BASE_SINK (element);
4475   gboolean forward, result = TRUE;
4476   GstActivateMode mode;
4477
4478   GST_OBJECT_LOCK (element);
4479   /* get the pad and the scheduling mode */
4480   pad = gst_object_ref (basesink->sinkpad);
4481   mode = basesink->pad_mode;
4482   GST_OBJECT_UNLOCK (element);
4483
4484   /* only push UPSTREAM events upstream */
4485   forward = GST_EVENT_IS_UPSTREAM (event);
4486
4487   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4488       event);
4489
4490   switch (GST_EVENT_TYPE (event)) {
4491     case GST_EVENT_LATENCY:
4492     {
4493       GstClockTime latency;
4494
4495       gst_event_parse_latency (event, &latency);
4496
4497       /* store the latency. We use this to adjust the running_time before syncing
4498        * it to the clock. */
4499       GST_OBJECT_LOCK (element);
4500       basesink->priv->latency = latency;
4501       if (!basesink->priv->have_latency)
4502         forward = FALSE;
4503       GST_OBJECT_UNLOCK (element);
4504       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4505           GST_TIME_ARGS (latency));
4506
4507       /* We forward this event so that all elements know about the global pipeline
4508        * latency. This is interesting for an element when it wants to figure out
4509        * when a particular piece of data will be rendered. */
4510       break;
4511     }
4512     case GST_EVENT_SEEK:
4513       /* in pull mode we will execute the seek */
4514       if (mode == GST_ACTIVATE_PULL)
4515         result = gst_base_sink_perform_seek (basesink, pad, event);
4516       break;
4517     case GST_EVENT_STEP:
4518       result = gst_base_sink_perform_step (basesink, pad, event);
4519       forward = FALSE;
4520       break;
4521     default:
4522       break;
4523   }
4524
4525   if (forward) {
4526     result = gst_pad_push_event (pad, event);
4527   } else {
4528     /* not forwarded, unref the event */
4529     gst_event_unref (event);
4530   }
4531
4532   gst_object_unref (pad);
4533
4534   GST_DEBUG_OBJECT (basesink, "handled event %p %" GST_PTR_FORMAT ": %d", event,
4535       event, result);
4536
4537   return result;
4538 }
4539
4540 static gboolean
4541 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4542     gint64 * cur, gboolean * upstream)
4543 {
4544   GstClock *clock = NULL;
4545   gboolean res = FALSE;
4546   GstFormat oformat, tformat;
4547   GstSegment *segment;
4548   GstClockTime now, latency;
4549   GstClockTimeDiff base;
4550   gint64 time, accum, duration;
4551   gdouble rate;
4552   gint64 last;
4553   gboolean last_seen, with_clock, in_paused;
4554
4555   GST_OBJECT_LOCK (basesink);
4556   /* we can only get the segment when we are not NULL or READY */
4557   if (!basesink->have_newsegment)
4558     goto wrong_state;
4559
4560   in_paused = FALSE;
4561   /* when not in PLAYING or when we're busy with a state change, we
4562    * cannot read from the clock so we report time based on the
4563    * last seen timestamp. */
4564   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4565       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4566     in_paused = TRUE;
4567   }
4568
4569   /* we don't use the clip segment in pull mode, when seeking we update the
4570    * main segment directly with the new segment values without it having to be
4571    * activated by the rendering after preroll */
4572   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4573     segment = basesink->abidata.ABI.clip_segment;
4574   else
4575     segment = &basesink->segment;
4576
4577   /* our intermediate time format */
4578   tformat = GST_FORMAT_TIME;
4579   /* get the format in the segment */
4580   oformat = segment->format;
4581
4582   /* report with last seen position when EOS */
4583   last_seen = basesink->eos;
4584
4585   /* assume we will use the clock for getting the current position */
4586   with_clock = TRUE;
4587   if (basesink->sync == FALSE)
4588     with_clock = FALSE;
4589
4590   /* and we need a clock */
4591   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4592     with_clock = FALSE;
4593   else
4594     gst_object_ref (clock);
4595
4596   /* mainloop might be querying position when going to playing async,
4597    * while (audio) rendering might be quickly advancing stream position,
4598    * so use clock asap rather than last reported position */
4599   if (in_paused && with_clock && g_atomic_int_get (&basesink->priv->to_playing)) {
4600     GST_DEBUG_OBJECT (basesink, "going to PLAYING, so not PAUSED");
4601     in_paused = FALSE;
4602   }
4603
4604   /* collect all data we need holding the lock */
4605   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4606     time = segment->time;
4607   else
4608     time = 0;
4609
4610   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4611     duration = segment->stop - segment->start;
4612   else
4613     duration = 0;
4614
4615   accum = segment->accum;
4616   rate = segment->rate * segment->applied_rate;
4617   latency = basesink->priv->latency;
4618
4619   if (oformat == GST_FORMAT_TIME) {
4620     gint64 start, stop;
4621
4622     start = basesink->priv->current_sstart;
4623     stop = basesink->priv->current_sstop;
4624
4625     if (in_paused) {
4626       /* in paused we use the last position as a lower bound */
4627       if (stop == -1 || segment->rate > 0.0)
4628         last = start;
4629       else
4630         last = stop;
4631     } else {
4632       /* in playing, use last stop time as upper bound */
4633       if (start == -1 || segment->rate > 0.0)
4634         last = stop;
4635       else
4636         last = start;
4637     }
4638   } else {
4639     /* convert last stop to stream time */
4640     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4641   }
4642
4643   if (in_paused) {
4644     /* in paused, use start_time */
4645     base = GST_ELEMENT_START_TIME (basesink);
4646     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4647         GST_TIME_ARGS (base));
4648   } else if (with_clock) {
4649     /* else use clock when needed */
4650     base = GST_ELEMENT_CAST (basesink)->base_time;
4651     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4652         GST_TIME_ARGS (base));
4653   } else {
4654     /* else, no sync or clock -> no base time */
4655     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4656     base = -1;
4657   }
4658
4659   /* no base, we can't calculate running_time, use last seem timestamp to report
4660    * time */
4661   if (base == -1)
4662     last_seen = TRUE;
4663
4664   /* need to release the object lock before we can get the time,
4665    * a clock might take the LOCK of the provider, which could be
4666    * a basesink subclass. */
4667   GST_OBJECT_UNLOCK (basesink);
4668
4669   if (last_seen) {
4670     /* in EOS or when no valid stream_time, report the value of last seen
4671      * timestamp */
4672     if (last == -1) {
4673       /* no timestamp, we need to ask upstream */
4674       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4675       res = FALSE;
4676       *upstream = TRUE;
4677       goto done;
4678     }
4679     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4680         GST_TIME_ARGS (last));
4681     *cur = last;
4682   } else {
4683     if (oformat != tformat) {
4684       /* convert accum, time and duration to time */
4685       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4686               &accum))
4687         goto convert_failed;
4688       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4689               &tformat, &duration))
4690         goto convert_failed;
4691       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4692               &time))
4693         goto convert_failed;
4694       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4695               &last))
4696         goto convert_failed;
4697
4698       /* assume time format from now on */
4699       oformat = tformat;
4700     }
4701
4702     if (!in_paused && with_clock) {
4703       now = gst_clock_get_time (clock);
4704     } else {
4705       now = base;
4706       base = 0;
4707     }
4708
4709     /* subtract base time and accumulated time from the clock time.
4710      * Make sure we don't go negative. This is the current time in
4711      * the segment which we need to scale with the combined
4712      * rate and applied rate. */
4713     base += accum;
4714     base += latency;
4715     if (GST_CLOCK_DIFF (base, now) < 0)
4716       base = now;
4717
4718     /* for negative rates we need to count back from the segment
4719      * duration. */
4720     if (rate < 0.0)
4721       time += duration;
4722
4723     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4724
4725     if (in_paused) {
4726       /* never report less than segment values in paused */
4727       if (last != -1)
4728         *cur = MAX (last, *cur);
4729     } else {
4730       /* never report more than last seen position in playing */
4731       if (last != -1)
4732         *cur = MIN (last, *cur);
4733     }
4734
4735     GST_DEBUG_OBJECT (basesink,
4736         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4737         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4738         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4739         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4740   }
4741
4742   if (oformat != format) {
4743     /* convert to final format */
4744     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4745       goto convert_failed;
4746   }
4747
4748   res = TRUE;
4749
4750 done:
4751   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4752       res, GST_TIME_ARGS (*cur));
4753
4754   if (clock)
4755     gst_object_unref (clock);
4756
4757   return res;
4758
4759   /* special cases */
4760 wrong_state:
4761   {
4762     /* in NULL or READY we always return FALSE and -1 */
4763     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4764     res = FALSE;
4765     *cur = -1;
4766     GST_OBJECT_UNLOCK (basesink);
4767     goto done;
4768   }
4769 convert_failed:
4770   {
4771     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4772     *upstream = TRUE;
4773     res = FALSE;
4774     goto done;
4775   }
4776 }
4777
4778 static gboolean
4779 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4780     gint64 * dur, gboolean * upstream)
4781 {
4782   gboolean res = FALSE;
4783
4784   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4785     GstFormat uformat = GST_FORMAT_BYTES;
4786     gint64 uduration;
4787
4788     /* get the duration in bytes, in pull mode that's all we are sure to
4789      * know. We have to explicitly get this value from upstream instead of
4790      * using our cached value because it might change. Duration caching
4791      * should be done at a higher level. */
4792     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4793     if (res) {
4794       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4795       if (format != uformat) {
4796         /* convert to the requested format */
4797         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4798             &format, dur);
4799       } else {
4800         *dur = uduration;
4801       }
4802     }
4803     *upstream = FALSE;
4804   } else {
4805     *upstream = TRUE;
4806   }
4807
4808   return res;
4809 }
4810
4811 static const GstQueryType *
4812 gst_base_sink_get_query_types (GstElement * element)
4813 {
4814   static const GstQueryType query_types[] = {
4815     GST_QUERY_DURATION,
4816     GST_QUERY_POSITION,
4817     GST_QUERY_SEGMENT,
4818     GST_QUERY_LATENCY,
4819     0
4820   };
4821
4822   return query_types;
4823 }
4824
4825 static gboolean
4826 default_element_query (GstElement * element, GstQuery * query)
4827 {
4828   gboolean res = FALSE;
4829
4830   GstBaseSink *basesink = GST_BASE_SINK (element);
4831
4832   switch (GST_QUERY_TYPE (query)) {
4833     case GST_QUERY_POSITION:
4834     {
4835       gint64 cur = 0;
4836       GstFormat format;
4837       gboolean upstream = FALSE;
4838
4839       gst_query_parse_position (query, &format, NULL);
4840
4841       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4842           gst_format_get_name (format));
4843
4844       /* first try to get the position based on the clock */
4845       if ((res =
4846               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4847         gst_query_set_position (query, format, cur);
4848       } else if (upstream) {
4849         /* fallback to peer query */
4850         res = gst_pad_peer_query (basesink->sinkpad, query);
4851       }
4852       if (!res) {
4853         /* we can handle a few things if upstream failed */
4854         if (format == GST_FORMAT_PERCENT) {
4855           gint64 dur = 0;
4856           GstFormat uformat = GST_FORMAT_TIME;
4857
4858           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4859               &upstream);
4860           if (!res && upstream) {
4861             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4862                 &cur);
4863           }
4864           if (res) {
4865             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4866                 &upstream);
4867             if (!res && upstream) {
4868               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4869                   &dur);
4870             }
4871           }
4872           if (res) {
4873             gint64 pos;
4874
4875             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4876                 dur);
4877             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4878           }
4879         }
4880       }
4881       break;
4882     }
4883     case GST_QUERY_DURATION:
4884     {
4885       gint64 dur = 0;
4886       GstFormat format;
4887       gboolean upstream = FALSE;
4888
4889       gst_query_parse_duration (query, &format, NULL);
4890
4891       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4892           gst_format_get_name (format));
4893
4894       if ((res =
4895               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4896         gst_query_set_duration (query, format, dur);
4897       } else if (upstream) {
4898         /* fallback to peer query */
4899         res = gst_pad_peer_query (basesink->sinkpad, query);
4900       }
4901       if (!res) {
4902         /* we can handle a few things if upstream failed */
4903         if (format == GST_FORMAT_PERCENT) {
4904           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4905               GST_FORMAT_PERCENT_MAX);
4906           res = TRUE;
4907         }
4908       }
4909       break;
4910     }
4911     case GST_QUERY_LATENCY:
4912     {
4913       gboolean live, us_live;
4914       GstClockTime min, max;
4915
4916       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4917                   &max))) {
4918         gst_query_set_latency (query, live, min, max);
4919       }
4920       break;
4921     }
4922     case GST_QUERY_JITTER:
4923       break;
4924     case GST_QUERY_RATE:
4925       /* gst_query_set_rate (query, basesink->segment_rate); */
4926       res = TRUE;
4927       break;
4928     case GST_QUERY_SEGMENT:
4929     {
4930       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4931         gst_query_set_segment (query, basesink->segment.rate,
4932             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4933         res = TRUE;
4934       } else {
4935         res = gst_pad_peer_query (basesink->sinkpad, query);
4936       }
4937       break;
4938     }
4939     case GST_QUERY_SEEKING:
4940     case GST_QUERY_CONVERT:
4941     case GST_QUERY_FORMATS:
4942     default:
4943       res = gst_pad_peer_query (basesink->sinkpad, query);
4944       break;
4945   }
4946   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4947       GST_QUERY_TYPE_NAME (query), res);
4948   return res;
4949 }
4950
4951 static gboolean
4952 default_sink_query (GstBaseSink * basesink, GstQuery * query)
4953 {
4954   return gst_pad_query_default (basesink->sinkpad, query);
4955 }
4956
4957 static gboolean
4958 gst_base_sink_sink_query (GstPad * pad, GstQuery * query)
4959 {
4960   GstBaseSink *basesink;
4961   GstBaseSinkClass *bclass;
4962   gboolean res;
4963
4964   basesink = GST_BASE_SINK_CAST (gst_pad_get_parent (pad));
4965   if (G_UNLIKELY (basesink == NULL)) {
4966     gst_query_unref (query);
4967     return FALSE;
4968   }
4969
4970   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4971
4972   if (bclass->query)
4973     res = bclass->query (basesink, query);
4974   else
4975     res = FALSE;
4976
4977   gst_object_unref (basesink);
4978
4979   return res;
4980 }
4981
4982 static GstStateChangeReturn
4983 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4984 {
4985   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4986   GstBaseSink *basesink = GST_BASE_SINK (element);
4987   GstBaseSinkClass *bclass;
4988   GstBaseSinkPrivate *priv;
4989
4990   priv = basesink->priv;
4991
4992   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4993
4994   switch (transition) {
4995     case GST_STATE_CHANGE_NULL_TO_READY:
4996       if (bclass->start)
4997         if (!bclass->start (basesink))
4998           goto start_failed;
4999       break;
5000     case GST_STATE_CHANGE_READY_TO_PAUSED:
5001       /* need to complete preroll before this state change completes, there
5002        * is no data flow in READY so we can safely assume we need to preroll. */
5003       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5004       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
5005       basesink->have_newsegment = FALSE;
5006       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
5007       gst_segment_init (basesink->abidata.ABI.clip_segment,
5008           GST_FORMAT_UNDEFINED);
5009       basesink->offset = 0;
5010       basesink->have_preroll = FALSE;
5011       priv->step_unlock = FALSE;
5012       basesink->need_preroll = TRUE;
5013       basesink->playing_async = TRUE;
5014       priv->current_sstart = GST_CLOCK_TIME_NONE;
5015       priv->current_sstop = GST_CLOCK_TIME_NONE;
5016       priv->eos_rtime = GST_CLOCK_TIME_NONE;
5017       priv->latency = 0;
5018       basesink->eos = FALSE;
5019       priv->received_eos = FALSE;
5020       gst_base_sink_reset_qos (basesink);
5021       priv->commited = FALSE;
5022       priv->call_preroll = TRUE;
5023       priv->current_step.valid = FALSE;
5024       priv->pending_step.valid = FALSE;
5025       if (priv->async_enabled) {
5026         GST_DEBUG_OBJECT (basesink, "doing async state change");
5027         /* when async enabled, post async-start message and return ASYNC from
5028          * the state change function */
5029         ret = GST_STATE_CHANGE_ASYNC;
5030         gst_element_post_message (GST_ELEMENT_CAST (basesink),
5031             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5032       } else {
5033         priv->have_latency = TRUE;
5034       }
5035       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5036       break;
5037     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5038       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5039       g_atomic_int_set (&basesink->priv->to_playing, TRUE);
5040       if (!gst_base_sink_needs_preroll (basesink)) {
5041         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
5042         /* no preroll needed anymore now. */
5043         basesink->playing_async = FALSE;
5044         basesink->need_preroll = FALSE;
5045         if (basesink->eos) {
5046           GstMessage *message;
5047
5048           /* need to post EOS message here */
5049           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
5050           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
5051           gst_message_set_seqnum (message, basesink->priv->seqnum);
5052           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
5053         } else {
5054           GST_DEBUG_OBJECT (basesink, "signal preroll");
5055           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
5056         }
5057       } else {
5058         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
5059         basesink->need_preroll = TRUE;
5060         basesink->playing_async = TRUE;
5061         priv->call_preroll = TRUE;
5062         priv->commited = FALSE;
5063         if (priv->async_enabled) {
5064           GST_DEBUG_OBJECT (basesink, "doing async state change");
5065           ret = GST_STATE_CHANGE_ASYNC;
5066           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5067               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5068         }
5069       }
5070       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5071       break;
5072     default:
5073       break;
5074   }
5075
5076   {
5077     GstStateChangeReturn bret;
5078
5079     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5080     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5081       goto activate_failed;
5082   }
5083
5084   switch (transition) {
5085     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5086       /* completed transition, so need not be marked any longer
5087        * And it should be unmarked, since e.g. losing our position upon flush
5088        * does not really change state to PAUSED ... */
5089       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5090       break;
5091     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5092       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5093       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5094       /* FIXME, make sure we cannot enter _render first */
5095
5096       /* we need to call ::unlock before locking PREROLL_LOCK
5097        * since we lock it before going into ::render */
5098       if (bclass->unlock)
5099         bclass->unlock (basesink);
5100
5101       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5102       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5103       /* now that we have the PREROLL lock, clear our unlock request */
5104       if (bclass->unlock_stop)
5105         bclass->unlock_stop (basesink);
5106
5107       /* we need preroll again and we set the flag before unlocking the clockid
5108        * because if the clockid is unlocked before a current buffer expired, we
5109        * can use that buffer to preroll with */
5110       basesink->need_preroll = TRUE;
5111
5112       if (basesink->clock_id) {
5113         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5114         gst_clock_id_unschedule (basesink->clock_id);
5115       }
5116
5117       /* if we don't have a preroll buffer we need to wait for a preroll and
5118        * return ASYNC. */
5119       if (!gst_base_sink_needs_preroll (basesink)) {
5120         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5121         basesink->playing_async = FALSE;
5122       } else {
5123         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5124           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5125           ret = GST_STATE_CHANGE_SUCCESS;
5126         } else {
5127           GST_DEBUG_OBJECT (basesink,
5128               "PLAYING to PAUSED, we are not prerolled");
5129           basesink->playing_async = TRUE;
5130           priv->commited = FALSE;
5131           priv->call_preroll = TRUE;
5132           if (priv->async_enabled) {
5133             GST_DEBUG_OBJECT (basesink, "doing async state change");
5134             ret = GST_STATE_CHANGE_ASYNC;
5135             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5136                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5137                     FALSE));
5138           }
5139         }
5140       }
5141       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5142           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5143
5144       gst_base_sink_reset_qos (basesink);
5145       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5146       break;
5147     case GST_STATE_CHANGE_PAUSED_TO_READY:
5148       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5149       /* start by resetting our position state with the object lock so that the
5150        * position query gets the right idea. We do this before we post the
5151        * messages so that the message handlers pick this up. */
5152       GST_OBJECT_LOCK (basesink);
5153       basesink->have_newsegment = FALSE;
5154       priv->current_sstart = GST_CLOCK_TIME_NONE;
5155       priv->current_sstop = GST_CLOCK_TIME_NONE;
5156       priv->have_latency = FALSE;
5157       if (priv->cached_clock_id) {
5158         gst_clock_id_unref (priv->cached_clock_id);
5159         priv->cached_clock_id = NULL;
5160       }
5161       GST_OBJECT_UNLOCK (basesink);
5162
5163       gst_base_sink_set_last_buffer (basesink, NULL);
5164       priv->call_preroll = FALSE;
5165
5166       if (!priv->commited) {
5167         if (priv->async_enabled) {
5168           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5169
5170           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5171               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5172                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5173
5174           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5175               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5176         }
5177         priv->commited = TRUE;
5178       } else {
5179         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5180       }
5181       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5182       break;
5183     case GST_STATE_CHANGE_READY_TO_NULL:
5184       if (bclass->stop) {
5185         if (!bclass->stop (basesink)) {
5186           GST_WARNING_OBJECT (basesink, "failed to stop");
5187         }
5188       }
5189       gst_base_sink_set_last_buffer (basesink, NULL);
5190       priv->call_preroll = FALSE;
5191       break;
5192     default:
5193       break;
5194   }
5195
5196   return ret;
5197
5198   /* ERRORS */
5199 start_failed:
5200   {
5201     GST_DEBUG_OBJECT (basesink, "failed to start");
5202     return GST_STATE_CHANGE_FAILURE;
5203   }
5204 activate_failed:
5205   {
5206     GST_DEBUG_OBJECT (basesink,
5207         "element failed to change states -- activation problem?");
5208     return GST_STATE_CHANGE_FAILURE;
5209   }
5210 }