Suppress deprecation warnings in selected files, for g_static_rec_mutex_* mostly
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
150  * with newer GLib versions (>= 2.31.0) */
151 #define GLIB_DISABLE_DEPRECATION_WARNINGS
152 #include <gst/gst_private.h>
153
154 #include "gstbasesink.h"
155 #include <gst/gstmarshal.h>
156 #include <gst/gst-i18n-lib.h>
157
158 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
159 #define GST_CAT_DEFAULT gst_base_sink_debug
160
161 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
162    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
163
164 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
165
166 typedef struct
167 {
168   gboolean valid;               /* if this info is valid */
169   guint32 seqnum;               /* the seqnum of the STEP event */
170   GstFormat format;             /* the format of the amount */
171   guint64 amount;               /* the total amount of data to skip */
172   guint64 position;             /* the position in the stepped data */
173   guint64 duration;             /* the duration in time of the skipped data */
174   guint64 start;                /* running_time of the start */
175   gdouble rate;                 /* rate of skipping */
176   gdouble start_rate;           /* rate before skipping */
177   guint64 start_start;          /* start position skipping */
178   guint64 start_stop;           /* stop position skipping */
179   gboolean flush;               /* if this was a flushing step */
180   gboolean intermediate;        /* if this is an intermediate step */
181   gboolean need_preroll;        /* if we need preroll after this step */
182 } GstStepInfo;
183
184 /* FIXME, some stuff in ABI.data and other in Private...
185  * Make up your mind please.
186  */
187 struct _GstBaseSinkPrivate
188 {
189   gint qos_enabled;             /* ATOMIC */
190   gboolean async_enabled;
191   GstClockTimeDiff ts_offset;
192   GstClockTime render_delay;
193
194   /* start, stop of current buffer, stream time, used to report position */
195   GstClockTime current_sstart;
196   GstClockTime current_sstop;
197
198   /* start, stop and jitter of current buffer, running time */
199   GstClockTime current_rstart;
200   GstClockTime current_rstop;
201   GstClockTimeDiff current_jitter;
202   /* the running time of the previous buffer */
203   GstClockTime prev_rstart;
204
205   /* EOS sync time in running time */
206   GstClockTime eos_rtime;
207
208   /* last buffer that arrived in time, running time */
209   GstClockTime last_render_time;
210   /* when the last buffer left the sink, running time */
211   GstClockTime last_left;
212
213   /* running averages go here these are done on running time */
214   GstClockTime avg_pt;
215   GstClockTime avg_duration;
216   gdouble avg_rate;
217   GstClockTime avg_in_diff;
218
219   /* these are done on system time. avg_jitter and avg_render are
220    * compared to eachother to see if the rendering time takes a
221    * huge amount of the processing, If so we are flooded with
222    * buffers. */
223   GstClockTime last_left_systime;
224   GstClockTime avg_jitter;
225   GstClockTime start, stop;
226   GstClockTime avg_render;
227
228   /* number of rendered and dropped frames */
229   guint64 rendered;
230   guint64 dropped;
231
232   /* latency stuff */
233   GstClockTime latency;
234
235   /* if we already commited the state */
236   gboolean commited;
237   /* state change to playing ongoing */
238   gboolean to_playing;
239
240   /* when we received EOS */
241   gboolean received_eos;
242
243   /* when we are prerolled and able to report latency */
244   gboolean have_latency;
245
246   /* the last buffer we prerolled or rendered. Useful for making snapshots */
247   gint enable_last_buffer;      /* atomic */
248   GstBuffer *last_buffer;
249
250   /* caps for pull based scheduling */
251   GstCaps *pull_caps;
252
253   /* blocksize for pulling */
254   guint blocksize;
255
256   gboolean discont;
257
258   /* seqnum of the stream */
259   guint32 seqnum;
260
261   gboolean call_preroll;
262   gboolean step_unlock;
263
264   /* we have a pending and a current step operation */
265   GstStepInfo current_step;
266   GstStepInfo pending_step;
267
268   /* Cached GstClockID */
269   GstClockID cached_clock_id;
270
271   /* for throttling and QoS */
272   GstClockTime earliest_in_time;
273   GstClockTime throttle_time;
274 };
275
276 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
277
278 /* generic running average, this has a neutral window size */
279 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
280
281 /* the windows for these running averages are experimentally obtained.
282  * possitive values get averaged more while negative values use a small
283  * window so we can react faster to badness. */
284 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
285 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
286
287 enum
288 {
289   _PR_IS_NOTHING = 1 << 0,
290   _PR_IS_BUFFER = 1 << 1,
291   _PR_IS_BUFFERLIST = 1 << 2,
292   _PR_IS_EVENT = 1 << 3
293 } PrivateObjectType;
294
295 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
296 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
297 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
298 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
299
300 /* BaseSink properties */
301
302 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
303 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
304
305 #define DEFAULT_PREROLL_QUEUE_LEN   0
306 #define DEFAULT_SYNC                TRUE
307 #define DEFAULT_MAX_LATENESS        -1
308 #define DEFAULT_QOS                 FALSE
309 #define DEFAULT_ASYNC               TRUE
310 #define DEFAULT_TS_OFFSET           0
311 #define DEFAULT_BLOCKSIZE           4096
312 #define DEFAULT_RENDER_DELAY        0
313 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
314 #define DEFAULT_THROTTLE_TIME       0
315
316 enum
317 {
318   PROP_0,
319   PROP_PREROLL_QUEUE_LEN,
320   PROP_SYNC,
321   PROP_MAX_LATENESS,
322   PROP_QOS,
323   PROP_ASYNC,
324   PROP_TS_OFFSET,
325   PROP_ENABLE_LAST_BUFFER,
326   PROP_LAST_BUFFER,
327   PROP_BLOCKSIZE,
328   PROP_RENDER_DELAY,
329   PROP_THROTTLE_TIME,
330   PROP_LAST
331 };
332
333 static GstElementClass *parent_class = NULL;
334
335 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
336 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
337 static void gst_base_sink_finalize (GObject * object);
338
339 GType
340 gst_base_sink_get_type (void)
341 {
342   static volatile gsize base_sink_type = 0;
343
344   if (g_once_init_enter (&base_sink_type)) {
345     GType _type;
346     static const GTypeInfo base_sink_info = {
347       sizeof (GstBaseSinkClass),
348       NULL,
349       NULL,
350       (GClassInitFunc) gst_base_sink_class_init,
351       NULL,
352       NULL,
353       sizeof (GstBaseSink),
354       0,
355       (GInstanceInitFunc) gst_base_sink_init,
356     };
357
358     _type = g_type_register_static (GST_TYPE_ELEMENT,
359         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
360     g_once_init_leave (&base_sink_type, _type);
361   }
362   return base_sink_type;
363 }
364
365 static void gst_base_sink_set_property (GObject * object, guint prop_id,
366     const GValue * value, GParamSpec * pspec);
367 static void gst_base_sink_get_property (GObject * object, guint prop_id,
368     GValue * value, GParamSpec * pspec);
369
370 static gboolean gst_base_sink_send_event (GstElement * element,
371     GstEvent * event);
372 static gboolean default_element_query (GstElement * element, GstQuery * query);
373 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
374
375 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
376 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
377 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
378     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
379 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
380     GstClockTime * start, GstClockTime * end);
381 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
382     GstPad * pad, gboolean flushing);
383 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
384     gboolean active);
385 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
386     GstSegment * segment);
387 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
388     GstEvent * event, GstSegment * segment);
389
390 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
391     GstStateChange transition);
392
393 static gboolean gst_base_sink_sink_query (GstPad * pad, GstQuery * query);
394 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
395 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
396     GstBufferList * list);
397
398 static void gst_base_sink_loop (GstPad * pad);
399 static gboolean gst_base_sink_pad_activate (GstPad * pad);
400 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
401 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
402 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
403
404 static gboolean default_sink_query (GstBaseSink * sink, GstQuery * query);
405
406 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
407 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
408 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
409 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
410 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
411     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
412
413
414 /* check if an object was too late */
415 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
416     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
417     GstClockReturn status, GstClockTimeDiff jitter);
418 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
419     guint8 obj_type, GstMiniObject * obj);
420
421 static void
422 gst_base_sink_class_init (GstBaseSinkClass * klass)
423 {
424   GObjectClass *gobject_class;
425   GstElementClass *gstelement_class;
426
427   gobject_class = G_OBJECT_CLASS (klass);
428   gstelement_class = GST_ELEMENT_CLASS (klass);
429
430   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
431       "basesink element");
432
433   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
434
435   parent_class = g_type_class_peek_parent (klass);
436
437   gobject_class->finalize = gst_base_sink_finalize;
438   gobject_class->set_property = gst_base_sink_set_property;
439   gobject_class->get_property = gst_base_sink_get_property;
440
441   /* FIXME, this next value should be configured using an event from the
442    * upstream element, ie, the BUFFER_SIZE event. */
443   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
444       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
445           "Number of buffers to queue during preroll", 0, G_MAXUINT,
446           DEFAULT_PREROLL_QUEUE_LEN,
447           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
448
449   g_object_class_install_property (gobject_class, PROP_SYNC,
450       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
451           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
454       g_param_spec_int64 ("max-lateness", "Max Lateness",
455           "Maximum number of nanoseconds that a buffer can be late before it "
456           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
457           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
458
459   g_object_class_install_property (gobject_class, PROP_QOS,
460       g_param_spec_boolean ("qos", "Qos",
461           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
462           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
463   /**
464    * GstBaseSink:async
465    *
466    * If set to #TRUE, the basesink will perform asynchronous state changes.
467    * When set to #FALSE, the sink will not signal the parent when it prerolls.
468    * Use this option when dealing with sparse streams or when synchronisation is
469    * not required.
470    *
471    * Since: 0.10.15
472    */
473   g_object_class_install_property (gobject_class, PROP_ASYNC,
474       g_param_spec_boolean ("async", "Async",
475           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
476           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
477   /**
478    * GstBaseSink:ts-offset
479    *
480    * Controls the final synchronisation, a negative value will render the buffer
481    * earlier while a positive value delays playback. This property can be
482    * used to fix synchronisation in bad files.
483    *
484    * Since: 0.10.15
485    */
486   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
487       g_param_spec_int64 ("ts-offset", "TS Offset",
488           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
489           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
490
491   /**
492    * GstBaseSink:enable-last-buffer
493    *
494    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
495    * reference to the last buffer arrived and the last-buffer property is always
496    * set to NULL. This can be useful if you need buffers to be released as soon
497    * as possible, eg. if you're using a buffer pool.
498    *
499    * Since: 0.10.30
500    */
501   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
502       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
503           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
504           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505
506   /**
507    * GstBaseSink:last-buffer
508    *
509    * The last buffer that arrived in the sink and was used for preroll or for
510    * rendering. This property can be used to generate thumbnails. This property
511    * can be NULL when the sink has not yet received a bufer.
512    *
513    * Since: 0.10.15
514    */
515   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
516       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
517           "The last buffer received in the sink", GST_TYPE_BUFFER,
518           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
519   /**
520    * GstBaseSink:blocksize
521    *
522    * The amount of bytes to pull when operating in pull mode.
523    *
524    * Since: 0.10.22
525    */
526   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
527   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
528       g_param_spec_uint ("blocksize", "Block size",
529           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
530           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531   /**
532    * GstBaseSink:render-delay
533    *
534    * The additional delay between synchronisation and actual rendering of the
535    * media. This property will add additional latency to the device in order to
536    * make other sinks compensate for the delay.
537    *
538    * Since: 0.10.22
539    */
540   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
541       g_param_spec_uint64 ("render-delay", "Render Delay",
542           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
543           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
544   /**
545    * GstBaseSink:throttle-time
546    *
547    * The time to insert between buffers. This property can be used to control
548    * the maximum amount of buffers per second to render. Setting this property
549    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
550    *
551    * Since: 0.10.33
552    */
553   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
554       g_param_spec_uint64 ("throttle-time", "Throttle time",
555           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
556           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
557
558   gstelement_class->change_state =
559       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
560   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
561   gstelement_class->query = GST_DEBUG_FUNCPTR (default_element_query);
562   gstelement_class->get_query_types =
563       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
564
565   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
566   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
567   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
568   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
569   klass->activate_pull =
570       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
571   klass->query = GST_DEBUG_FUNCPTR (default_sink_query);
572
573   /* Registering debug symbols for function pointers */
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
575   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
576   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
577   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
578   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
579   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
580   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
581   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
582   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
583   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
584   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_sink_query);
585 }
586
587 static GstCaps *
588 gst_base_sink_pad_getcaps (GstPad * pad)
589 {
590   GstBaseSinkClass *bclass;
591   GstBaseSink *bsink;
592   GstCaps *caps = NULL;
593
594   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
595   bclass = GST_BASE_SINK_GET_CLASS (bsink);
596
597   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
598     /* if we are operating in pull mode we only accept the negotiated caps */
599     GST_OBJECT_LOCK (pad);
600     if ((caps = GST_PAD_CAPS (pad)))
601       gst_caps_ref (caps);
602     GST_OBJECT_UNLOCK (pad);
603   }
604   if (caps == NULL) {
605     if (bclass->get_caps)
606       caps = bclass->get_caps (bsink);
607
608     if (caps == NULL) {
609       GstPadTemplate *pad_template;
610
611       pad_template =
612           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
613           "sink");
614       if (pad_template != NULL) {
615         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
616       }
617     }
618   }
619   gst_object_unref (bsink);
620
621   return caps;
622 }
623
624 static gboolean
625 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
626 {
627   GstBaseSinkClass *bclass;
628   GstBaseSink *bsink;
629   gboolean res = TRUE;
630
631   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
632   bclass = GST_BASE_SINK_GET_CLASS (bsink);
633
634   if (res && bclass->set_caps)
635     res = bclass->set_caps (bsink, caps);
636
637   gst_object_unref (bsink);
638
639   return res;
640 }
641
642 static void
643 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
644 {
645   GstBaseSinkClass *bclass;
646   GstBaseSink *bsink;
647
648   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
649   bclass = GST_BASE_SINK_GET_CLASS (bsink);
650
651   if (bclass->fixate)
652     bclass->fixate (bsink, caps);
653
654   gst_object_unref (bsink);
655 }
656
657 static GstFlowReturn
658 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
659     GstCaps * caps, GstBuffer ** buf)
660 {
661   GstBaseSinkClass *bclass;
662   GstBaseSink *bsink;
663   GstFlowReturn result = GST_FLOW_OK;
664
665   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
666   if (G_UNLIKELY (bsink == NULL))
667     return GST_FLOW_WRONG_STATE;
668   bclass = GST_BASE_SINK_GET_CLASS (bsink);
669
670   if (bclass->buffer_alloc)
671     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
672   else
673     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
674
675   gst_object_unref (bsink);
676
677   return result;
678 }
679
680 static void
681 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
682 {
683   GstPadTemplate *pad_template;
684   GstBaseSinkPrivate *priv;
685
686   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
687
688   pad_template =
689       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
690   g_return_if_fail (pad_template != NULL);
691
692   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
693
694   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
695   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
696   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
697   gst_pad_set_bufferalloc_function (basesink->sinkpad,
698       gst_base_sink_pad_buffer_alloc);
699   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
700   gst_pad_set_activatepush_function (basesink->sinkpad,
701       gst_base_sink_pad_activate_push);
702   gst_pad_set_activatepull_function (basesink->sinkpad,
703       gst_base_sink_pad_activate_pull);
704   gst_pad_set_query_function (basesink->sinkpad, gst_base_sink_sink_query);
705   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
706   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
707   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
708   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
709
710   basesink->pad_mode = GST_ACTIVATE_NONE;
711   basesink->preroll_queue = g_queue_new ();
712   basesink->abidata.ABI.clip_segment = gst_segment_new ();
713   priv->have_latency = FALSE;
714
715   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
716   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
717
718   basesink->sync = DEFAULT_SYNC;
719   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
720   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
721   priv->async_enabled = DEFAULT_ASYNC;
722   priv->ts_offset = DEFAULT_TS_OFFSET;
723   priv->render_delay = DEFAULT_RENDER_DELAY;
724   priv->blocksize = DEFAULT_BLOCKSIZE;
725   priv->cached_clock_id = NULL;
726   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
727   priv->throttle_time = DEFAULT_THROTTLE_TIME;
728
729   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
730 }
731
732 static void
733 gst_base_sink_finalize (GObject * object)
734 {
735   GstBaseSink *basesink;
736
737   basesink = GST_BASE_SINK (object);
738
739   g_queue_free (basesink->preroll_queue);
740   gst_segment_free (basesink->abidata.ABI.clip_segment);
741
742   G_OBJECT_CLASS (parent_class)->finalize (object);
743 }
744
745 /**
746  * gst_base_sink_set_sync:
747  * @sink: the sink
748  * @sync: the new sync value.
749  *
750  * Configures @sink to synchronize on the clock or not. When
751  * @sync is FALSE, incoming samples will be played as fast as
752  * possible. If @sync is TRUE, the timestamps of the incomming
753  * buffers will be used to schedule the exact render time of its
754  * contents.
755  *
756  * Since: 0.10.4
757  */
758 void
759 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
760 {
761   g_return_if_fail (GST_IS_BASE_SINK (sink));
762
763   GST_OBJECT_LOCK (sink);
764   sink->sync = sync;
765   GST_OBJECT_UNLOCK (sink);
766 }
767
768 /**
769  * gst_base_sink_get_sync:
770  * @sink: the sink
771  *
772  * Checks if @sink is currently configured to synchronize against the
773  * clock.
774  *
775  * Returns: TRUE if the sink is configured to synchronize against the clock.
776  *
777  * Since: 0.10.4
778  */
779 gboolean
780 gst_base_sink_get_sync (GstBaseSink * sink)
781 {
782   gboolean res;
783
784   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
785
786   GST_OBJECT_LOCK (sink);
787   res = sink->sync;
788   GST_OBJECT_UNLOCK (sink);
789
790   return res;
791 }
792
793 /**
794  * gst_base_sink_set_max_lateness:
795  * @sink: the sink
796  * @max_lateness: the new max lateness value.
797  *
798  * Sets the new max lateness value to @max_lateness. This value is
799  * used to decide if a buffer should be dropped or not based on the
800  * buffer timestamp and the current clock time. A value of -1 means
801  * an unlimited time.
802  *
803  * Since: 0.10.4
804  */
805 void
806 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
807 {
808   g_return_if_fail (GST_IS_BASE_SINK (sink));
809
810   GST_OBJECT_LOCK (sink);
811   sink->abidata.ABI.max_lateness = max_lateness;
812   GST_OBJECT_UNLOCK (sink);
813 }
814
815 /**
816  * gst_base_sink_get_max_lateness:
817  * @sink: the sink
818  *
819  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
820  * more details.
821  *
822  * Returns: The maximum time in nanoseconds that a buffer can be late
823  * before it is dropped and not rendered. A value of -1 means an
824  * unlimited time.
825  *
826  * Since: 0.10.4
827  */
828 gint64
829 gst_base_sink_get_max_lateness (GstBaseSink * sink)
830 {
831   gint64 res;
832
833   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
834
835   GST_OBJECT_LOCK (sink);
836   res = sink->abidata.ABI.max_lateness;
837   GST_OBJECT_UNLOCK (sink);
838
839   return res;
840 }
841
842 /**
843  * gst_base_sink_set_qos_enabled:
844  * @sink: the sink
845  * @enabled: the new qos value.
846  *
847  * Configures @sink to send Quality-of-Service events upstream.
848  *
849  * Since: 0.10.5
850  */
851 void
852 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
853 {
854   g_return_if_fail (GST_IS_BASE_SINK (sink));
855
856   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
857 }
858
859 /**
860  * gst_base_sink_is_qos_enabled:
861  * @sink: the sink
862  *
863  * Checks if @sink is currently configured to send Quality-of-Service events
864  * upstream.
865  *
866  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
867  *
868  * Since: 0.10.5
869  */
870 gboolean
871 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
872 {
873   gboolean res;
874
875   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
876
877   res = g_atomic_int_get (&sink->priv->qos_enabled);
878
879   return res;
880 }
881
882 /**
883  * gst_base_sink_set_async_enabled:
884  * @sink: the sink
885  * @enabled: the new async value.
886  *
887  * Configures @sink to perform all state changes asynchronusly. When async is
888  * disabled, the sink will immediately go to PAUSED instead of waiting for a
889  * preroll buffer. This feature is useful if the sink does not synchronize
890  * against the clock or when it is dealing with sparse streams.
891  *
892  * Since: 0.10.15
893  */
894 void
895 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
896 {
897   g_return_if_fail (GST_IS_BASE_SINK (sink));
898
899   GST_PAD_PREROLL_LOCK (sink->sinkpad);
900   g_atomic_int_set (&sink->priv->async_enabled, enabled);
901   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
902   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
903 }
904
905 /**
906  * gst_base_sink_is_async_enabled:
907  * @sink: the sink
908  *
909  * Checks if @sink is currently configured to perform asynchronous state
910  * changes to PAUSED.
911  *
912  * Returns: TRUE if the sink is configured to perform asynchronous state
913  * changes.
914  *
915  * Since: 0.10.15
916  */
917 gboolean
918 gst_base_sink_is_async_enabled (GstBaseSink * sink)
919 {
920   gboolean res;
921
922   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
923
924   res = g_atomic_int_get (&sink->priv->async_enabled);
925
926   return res;
927 }
928
929 /**
930  * gst_base_sink_set_ts_offset:
931  * @sink: the sink
932  * @offset: the new offset
933  *
934  * Adjust the synchronisation of @sink with @offset. A negative value will
935  * render buffers earlier than their timestamp. A positive value will delay
936  * rendering. This function can be used to fix playback of badly timestamped
937  * buffers.
938  *
939  * Since: 0.10.15
940  */
941 void
942 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
943 {
944   g_return_if_fail (GST_IS_BASE_SINK (sink));
945
946   GST_OBJECT_LOCK (sink);
947   sink->priv->ts_offset = offset;
948   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
949   GST_OBJECT_UNLOCK (sink);
950 }
951
952 /**
953  * gst_base_sink_get_ts_offset:
954  * @sink: the sink
955  *
956  * Get the synchronisation offset of @sink.
957  *
958  * Returns: The synchronisation offset.
959  *
960  * Since: 0.10.15
961  */
962 GstClockTimeDiff
963 gst_base_sink_get_ts_offset (GstBaseSink * sink)
964 {
965   GstClockTimeDiff res;
966
967   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
968
969   GST_OBJECT_LOCK (sink);
970   res = sink->priv->ts_offset;
971   GST_OBJECT_UNLOCK (sink);
972
973   return res;
974 }
975
976 /**
977  * gst_base_sink_get_last_buffer:
978  * @sink: the sink
979  *
980  * Get the last buffer that arrived in the sink and was used for preroll or for
981  * rendering. This property can be used to generate thumbnails.
982  *
983  * The #GstCaps on the buffer can be used to determine the type of the buffer.
984  *
985  * Free-function: gst_buffer_unref
986  *
987  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
988  *     This function returns NULL when no buffer has arrived in the sink yet
989  *     or when the sink is not in PAUSED or PLAYING.
990  *
991  * Since: 0.10.15
992  */
993 GstBuffer *
994 gst_base_sink_get_last_buffer (GstBaseSink * sink)
995 {
996   GstBuffer *res;
997
998   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
999
1000   GST_OBJECT_LOCK (sink);
1001   if ((res = sink->priv->last_buffer))
1002     gst_buffer_ref (res);
1003   GST_OBJECT_UNLOCK (sink);
1004
1005   return res;
1006 }
1007
1008 /* with OBJECT_LOCK */
1009 static void
1010 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1011 {
1012   GstBuffer *old;
1013
1014   old = sink->priv->last_buffer;
1015   if (G_LIKELY (old != buffer)) {
1016     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1017     if (G_LIKELY (buffer))
1018       gst_buffer_ref (buffer);
1019     sink->priv->last_buffer = buffer;
1020   } else {
1021     old = NULL;
1022   }
1023   /* avoid unreffing with the lock because cleanup code might want to take the
1024    * lock too */
1025   if (G_LIKELY (old)) {
1026     GST_OBJECT_UNLOCK (sink);
1027     gst_buffer_unref (old);
1028     GST_OBJECT_LOCK (sink);
1029   }
1030 }
1031
1032 static void
1033 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1034 {
1035   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1036     return;
1037
1038   GST_OBJECT_LOCK (sink);
1039   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1040   GST_OBJECT_UNLOCK (sink);
1041 }
1042
1043 /**
1044  * gst_base_sink_set_last_buffer_enabled:
1045  * @sink: the sink
1046  * @enabled: the new enable-last-buffer value.
1047  *
1048  * Configures @sink to store the last received buffer in the last-buffer
1049  * property.
1050  *
1051  * Since: 0.10.30
1052  */
1053 void
1054 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1055 {
1056   g_return_if_fail (GST_IS_BASE_SINK (sink));
1057
1058   /* Only take lock if we change the value */
1059   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1060           !enabled, enabled) && !enabled) {
1061     GST_OBJECT_LOCK (sink);
1062     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1063     GST_OBJECT_UNLOCK (sink);
1064   }
1065 }
1066
1067 /**
1068  * gst_base_sink_is_last_buffer_enabled:
1069  * @sink: the sink
1070  *
1071  * Checks if @sink is currently configured to store the last received buffer in
1072  * the last-buffer property.
1073  *
1074  * Returns: TRUE if the sink is configured to store the last received buffer.
1075  *
1076  * Since: 0.10.30
1077  */
1078 gboolean
1079 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1080 {
1081   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1082
1083   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1084 }
1085
1086 /**
1087  * gst_base_sink_get_latency:
1088  * @sink: the sink
1089  *
1090  * Get the currently configured latency.
1091  *
1092  * Returns: The configured latency.
1093  *
1094  * Since: 0.10.12
1095  */
1096 GstClockTime
1097 gst_base_sink_get_latency (GstBaseSink * sink)
1098 {
1099   GstClockTime res;
1100
1101   GST_OBJECT_LOCK (sink);
1102   res = sink->priv->latency;
1103   GST_OBJECT_UNLOCK (sink);
1104
1105   return res;
1106 }
1107
1108 /**
1109  * gst_base_sink_query_latency:
1110  * @sink: the sink
1111  * @live: (out) (allow-none): if the sink is live
1112  * @upstream_live: (out) (allow-none): if an upstream element is live
1113  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1114  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1115  *
1116  * Query the sink for the latency parameters. The latency will be queried from
1117  * the upstream elements. @live will be TRUE if @sink is configured to
1118  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1119  * element is live.
1120  *
1121  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1122  * for the latency introduced by the upstream elements by setting the
1123  * @min_latency to a strictly possitive value.
1124  *
1125  * This function is mostly used by subclasses.
1126  *
1127  * Returns: TRUE if the query succeeded.
1128  *
1129  * Since: 0.10.12
1130  */
1131 gboolean
1132 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1133     gboolean * upstream_live, GstClockTime * min_latency,
1134     GstClockTime * max_latency)
1135 {
1136   gboolean l, us_live, res, have_latency;
1137   GstClockTime min, max, render_delay;
1138   GstQuery *query;
1139   GstClockTime us_min, us_max;
1140
1141   /* we are live when we sync to the clock */
1142   GST_OBJECT_LOCK (sink);
1143   l = sink->sync;
1144   have_latency = sink->priv->have_latency;
1145   render_delay = sink->priv->render_delay;
1146   GST_OBJECT_UNLOCK (sink);
1147
1148   /* assume no latency */
1149   min = 0;
1150   max = -1;
1151   us_live = FALSE;
1152
1153   if (have_latency) {
1154     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1155     /* we are ready for a latency query this is when we preroll or when we are
1156      * not async. */
1157     query = gst_query_new_latency ();
1158
1159     /* ask the peer for the latency */
1160     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1161       /* get upstream min and max latency */
1162       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1163
1164       if (us_live) {
1165         /* upstream live, use its latency, subclasses should use these
1166          * values to create the complete latency. */
1167         min = us_min;
1168         max = us_max;
1169       }
1170       if (l) {
1171         /* we need to add the render delay if we are live */
1172         if (min != -1)
1173           min += render_delay;
1174         if (max != -1)
1175           max += render_delay;
1176       }
1177     }
1178     gst_query_unref (query);
1179   } else {
1180     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1181     res = FALSE;
1182   }
1183
1184   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1185   if (!res) {
1186     if (!l) {
1187       res = TRUE;
1188       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1189     } else {
1190       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1191     }
1192   }
1193
1194   if (res) {
1195     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1196         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1197         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1198
1199     if (live)
1200       *live = l;
1201     if (upstream_live)
1202       *upstream_live = us_live;
1203     if (min_latency)
1204       *min_latency = min;
1205     if (max_latency)
1206       *max_latency = max;
1207   }
1208   return res;
1209 }
1210
1211 /**
1212  * gst_base_sink_set_render_delay:
1213  * @sink: a #GstBaseSink
1214  * @delay: the new delay
1215  *
1216  * Set the render delay in @sink to @delay. The render delay is the time
1217  * between actual rendering of a buffer and its synchronisation time. Some
1218  * devices might delay media rendering which can be compensated for with this
1219  * function.
1220  *
1221  * After calling this function, this sink will report additional latency and
1222  * other sinks will adjust their latency to delay the rendering of their media.
1223  *
1224  * This function is usually called by subclasses.
1225  *
1226  * Since: 0.10.21
1227  */
1228 void
1229 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1230 {
1231   GstClockTime old_render_delay;
1232
1233   g_return_if_fail (GST_IS_BASE_SINK (sink));
1234
1235   GST_OBJECT_LOCK (sink);
1236   old_render_delay = sink->priv->render_delay;
1237   sink->priv->render_delay = delay;
1238   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1239       GST_TIME_ARGS (delay));
1240   GST_OBJECT_UNLOCK (sink);
1241
1242   if (delay != old_render_delay) {
1243     GST_DEBUG_OBJECT (sink, "posting latency changed");
1244     gst_element_post_message (GST_ELEMENT_CAST (sink),
1245         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1246   }
1247 }
1248
1249 /**
1250  * gst_base_sink_get_render_delay:
1251  * @sink: a #GstBaseSink
1252  *
1253  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1254  * information about the render delay.
1255  *
1256  * Returns: the render delay of @sink.
1257  *
1258  * Since: 0.10.21
1259  */
1260 GstClockTime
1261 gst_base_sink_get_render_delay (GstBaseSink * sink)
1262 {
1263   GstClockTimeDiff res;
1264
1265   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1266
1267   GST_OBJECT_LOCK (sink);
1268   res = sink->priv->render_delay;
1269   GST_OBJECT_UNLOCK (sink);
1270
1271   return res;
1272 }
1273
1274 /**
1275  * gst_base_sink_set_blocksize:
1276  * @sink: a #GstBaseSink
1277  * @blocksize: the blocksize in bytes
1278  *
1279  * Set the number of bytes that the sink will pull when it is operating in pull
1280  * mode.
1281  *
1282  * Since: 0.10.22
1283  */
1284 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1285 void
1286 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1287 {
1288   g_return_if_fail (GST_IS_BASE_SINK (sink));
1289
1290   GST_OBJECT_LOCK (sink);
1291   sink->priv->blocksize = blocksize;
1292   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1293   GST_OBJECT_UNLOCK (sink);
1294 }
1295
1296 /**
1297  * gst_base_sink_get_blocksize:
1298  * @sink: a #GstBaseSink
1299  *
1300  * Get the number of bytes that the sink will pull when it is operating in pull
1301  * mode.
1302  *
1303  * Returns: the number of bytes @sink will pull in pull mode.
1304  *
1305  * Since: 0.10.22
1306  */
1307 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1308 guint
1309 gst_base_sink_get_blocksize (GstBaseSink * sink)
1310 {
1311   guint res;
1312
1313   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1314
1315   GST_OBJECT_LOCK (sink);
1316   res = sink->priv->blocksize;
1317   GST_OBJECT_UNLOCK (sink);
1318
1319   return res;
1320 }
1321
1322 /**
1323  * gst_base_sink_set_throttle_time:
1324  * @sink: a #GstBaseSink
1325  * @throttle: the throttle time in nanoseconds
1326  *
1327  * Set the time that will be inserted between rendered buffers. This
1328  * can be used to control the maximum buffers per second that the sink
1329  * will render. 
1330  *
1331  * Since: 0.10.33
1332  */
1333 void
1334 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1335 {
1336   g_return_if_fail (GST_IS_BASE_SINK (sink));
1337
1338   GST_OBJECT_LOCK (sink);
1339   sink->priv->throttle_time = throttle;
1340   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1341   GST_OBJECT_UNLOCK (sink);
1342 }
1343
1344 /**
1345  * gst_base_sink_get_throttle_time:
1346  * @sink: a #GstBaseSink
1347  *
1348  * Get the time that will be inserted between frames to control the 
1349  * maximum buffers per second.
1350  *
1351  * Returns: the number of nanoseconds @sink will put between frames.
1352  *
1353  * Since: 0.10.33
1354  */
1355 guint64
1356 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1357 {
1358   guint64 res;
1359
1360   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1361
1362   GST_OBJECT_LOCK (sink);
1363   res = sink->priv->throttle_time;
1364   GST_OBJECT_UNLOCK (sink);
1365
1366   return res;
1367 }
1368
1369 static void
1370 gst_base_sink_set_property (GObject * object, guint prop_id,
1371     const GValue * value, GParamSpec * pspec)
1372 {
1373   GstBaseSink *sink = GST_BASE_SINK (object);
1374
1375   switch (prop_id) {
1376     case PROP_PREROLL_QUEUE_LEN:
1377       /* preroll lock necessary to serialize with finish_preroll */
1378       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1379       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1380       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1381       break;
1382     case PROP_SYNC:
1383       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1384       break;
1385     case PROP_MAX_LATENESS:
1386       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1387       break;
1388     case PROP_QOS:
1389       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1390       break;
1391     case PROP_ASYNC:
1392       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1393       break;
1394     case PROP_TS_OFFSET:
1395       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1396       break;
1397     case PROP_BLOCKSIZE:
1398       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1399       break;
1400     case PROP_RENDER_DELAY:
1401       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1402       break;
1403     case PROP_ENABLE_LAST_BUFFER:
1404       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1405       break;
1406     case PROP_THROTTLE_TIME:
1407       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1408       break;
1409     default:
1410       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1411       break;
1412   }
1413 }
1414
1415 static void
1416 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1417     GParamSpec * pspec)
1418 {
1419   GstBaseSink *sink = GST_BASE_SINK (object);
1420
1421   switch (prop_id) {
1422     case PROP_PREROLL_QUEUE_LEN:
1423       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1424       break;
1425     case PROP_SYNC:
1426       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1427       break;
1428     case PROP_MAX_LATENESS:
1429       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1430       break;
1431     case PROP_QOS:
1432       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1433       break;
1434     case PROP_ASYNC:
1435       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1436       break;
1437     case PROP_TS_OFFSET:
1438       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1439       break;
1440     case PROP_LAST_BUFFER:
1441       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1442       break;
1443     case PROP_ENABLE_LAST_BUFFER:
1444       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1445       break;
1446     case PROP_BLOCKSIZE:
1447       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1448       break;
1449     case PROP_RENDER_DELAY:
1450       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1451       break;
1452     case PROP_THROTTLE_TIME:
1453       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1454       break;
1455     default:
1456       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1457       break;
1458   }
1459 }
1460
1461
1462 static GstCaps *
1463 gst_base_sink_get_caps (GstBaseSink * sink)
1464 {
1465   return NULL;
1466 }
1467
1468 static gboolean
1469 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1470 {
1471   return TRUE;
1472 }
1473
1474 static GstFlowReturn
1475 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1476     GstCaps * caps, GstBuffer ** buf)
1477 {
1478   *buf = NULL;
1479   return GST_FLOW_OK;
1480 }
1481
1482 /* with PREROLL_LOCK, STREAM_LOCK */
1483 static void
1484 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1485 {
1486   GstMiniObject *obj;
1487
1488   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1489   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1490     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1491     gst_mini_object_unref (obj);
1492   }
1493   /* we can't have EOS anymore now */
1494   basesink->eos = FALSE;
1495   basesink->priv->received_eos = FALSE;
1496   basesink->have_preroll = FALSE;
1497   basesink->priv->step_unlock = FALSE;
1498   basesink->eos_queued = FALSE;
1499   basesink->preroll_queued = 0;
1500   basesink->buffers_queued = 0;
1501   basesink->events_queued = 0;
1502   /* can't report latency anymore until we preroll again */
1503   if (basesink->priv->async_enabled) {
1504     GST_OBJECT_LOCK (basesink);
1505     basesink->priv->have_latency = FALSE;
1506     GST_OBJECT_UNLOCK (basesink);
1507   }
1508   /* and signal any waiters now */
1509   GST_PAD_PREROLL_SIGNAL (pad);
1510 }
1511
1512 /* with STREAM_LOCK, configures given segment with the event information. */
1513 static void
1514 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1515     GstEvent * event, GstSegment * segment)
1516 {
1517   gboolean update;
1518   gdouble rate, arate;
1519   GstFormat format;
1520   gint64 start;
1521   gint64 stop;
1522   gint64 time;
1523
1524   /* the newsegment event is needed to bring the buffer timestamps to the
1525    * stream time and to drop samples outside of the playback segment. */
1526   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1527       &start, &stop, &time);
1528
1529   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1530    * We protect with the OBJECT_LOCK so that we can use the values to
1531    * safely answer a POSITION query. */
1532   GST_OBJECT_LOCK (basesink);
1533   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1534       stop, time);
1535
1536   if (format == GST_FORMAT_TIME) {
1537     GST_DEBUG_OBJECT (basesink,
1538         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1539         "format GST_FORMAT_TIME, "
1540         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1541         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1542         update, rate, arate, GST_TIME_ARGS (segment->start),
1543         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1544         GST_TIME_ARGS (segment->accum));
1545   } else {
1546     GST_DEBUG_OBJECT (basesink,
1547         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1548         "format %d, "
1549         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1550         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1551         segment->format, segment->start, segment->stop, segment->time,
1552         segment->accum);
1553   }
1554   GST_OBJECT_UNLOCK (basesink);
1555 }
1556
1557 /* with PREROLL_LOCK, STREAM_LOCK */
1558 static gboolean
1559 gst_base_sink_commit_state (GstBaseSink * basesink)
1560 {
1561   /* commit state and proceed to next pending state */
1562   GstState current, next, pending, post_pending;
1563   gboolean post_paused = FALSE;
1564   gboolean post_async_done = FALSE;
1565   gboolean post_playing = FALSE;
1566
1567   /* we are certainly not playing async anymore now */
1568   basesink->playing_async = FALSE;
1569
1570   GST_OBJECT_LOCK (basesink);
1571   current = GST_STATE (basesink);
1572   next = GST_STATE_NEXT (basesink);
1573   pending = GST_STATE_PENDING (basesink);
1574   post_pending = pending;
1575
1576   switch (pending) {
1577     case GST_STATE_PLAYING:
1578     {
1579       GstBaseSinkClass *bclass;
1580       GstStateChangeReturn ret;
1581
1582       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1583
1584       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1585
1586       basesink->need_preroll = FALSE;
1587       post_async_done = TRUE;
1588       basesink->priv->commited = TRUE;
1589       post_playing = TRUE;
1590       /* post PAUSED too when we were READY */
1591       if (current == GST_STATE_READY) {
1592         post_paused = TRUE;
1593       }
1594
1595       /* make sure we notify the subclass of async playing */
1596       if (bclass->async_play) {
1597         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1598         ret = bclass->async_play (basesink);
1599         if (ret == GST_STATE_CHANGE_FAILURE)
1600           goto async_failed;
1601       }
1602       break;
1603     }
1604     case GST_STATE_PAUSED:
1605       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1606       post_paused = TRUE;
1607       post_async_done = TRUE;
1608       basesink->priv->commited = TRUE;
1609       post_pending = GST_STATE_VOID_PENDING;
1610       break;
1611     case GST_STATE_READY:
1612     case GST_STATE_NULL:
1613       goto stopping;
1614     case GST_STATE_VOID_PENDING:
1615       goto nothing_pending;
1616     default:
1617       break;
1618   }
1619
1620   /* we can report latency queries now */
1621   basesink->priv->have_latency = TRUE;
1622
1623   GST_STATE (basesink) = pending;
1624   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1625   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1626   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1627   GST_OBJECT_UNLOCK (basesink);
1628
1629   if (post_paused) {
1630     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1631     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1632         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1633             current, next, post_pending));
1634   }
1635   if (post_async_done) {
1636     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1637     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1638         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1639   }
1640   if (post_playing) {
1641     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1642     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1643         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1644             next, pending, GST_STATE_VOID_PENDING));
1645   }
1646
1647   GST_STATE_BROADCAST (basesink);
1648
1649   return TRUE;
1650
1651 nothing_pending:
1652   {
1653     /* Depending on the state, set our vars. We get in this situation when the
1654      * state change function got a change to update the state vars before the
1655      * streaming thread did. This is fine but we need to make sure that we
1656      * update the need_preroll var since it was TRUE when we got here and might
1657      * become FALSE if we got to PLAYING. */
1658     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1659         gst_element_state_get_name (current));
1660     switch (current) {
1661       case GST_STATE_PLAYING:
1662         basesink->need_preroll = FALSE;
1663         break;
1664       case GST_STATE_PAUSED:
1665         basesink->need_preroll = TRUE;
1666         break;
1667       default:
1668         basesink->need_preroll = FALSE;
1669         basesink->flushing = TRUE;
1670         break;
1671     }
1672     /* we can report latency queries now */
1673     basesink->priv->have_latency = TRUE;
1674     GST_OBJECT_UNLOCK (basesink);
1675     return TRUE;
1676   }
1677 stopping:
1678   {
1679     /* app is going to READY */
1680     GST_DEBUG_OBJECT (basesink, "stopping");
1681     basesink->need_preroll = FALSE;
1682     basesink->flushing = TRUE;
1683     GST_OBJECT_UNLOCK (basesink);
1684     return FALSE;
1685   }
1686 async_failed:
1687   {
1688     GST_DEBUG_OBJECT (basesink, "async commit failed");
1689     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1690     GST_OBJECT_UNLOCK (basesink);
1691     return FALSE;
1692   }
1693 }
1694
1695 static void
1696 start_stepping (GstBaseSink * sink, GstSegment * segment,
1697     GstStepInfo * pending, GstStepInfo * current)
1698 {
1699   gint64 end;
1700   GstMessage *message;
1701
1702   GST_DEBUG_OBJECT (sink, "update pending step");
1703
1704   GST_OBJECT_LOCK (sink);
1705   memcpy (current, pending, sizeof (GstStepInfo));
1706   pending->valid = FALSE;
1707   GST_OBJECT_UNLOCK (sink);
1708
1709   /* post message first */
1710   message =
1711       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1712       current->amount, current->rate, current->flush, current->intermediate);
1713   gst_message_set_seqnum (message, current->seqnum);
1714   gst_element_post_message (GST_ELEMENT (sink), message);
1715
1716   /* get the running time of where we paused and remember it */
1717   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1718   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1719
1720   /* set the new rate for the remainder of the segment */
1721   current->start_rate = segment->rate;
1722   segment->rate *= current->rate;
1723   segment->abs_rate = ABS (segment->rate);
1724
1725   /* save values */
1726   if (segment->rate > 0.0)
1727     current->start_stop = segment->stop;
1728   else
1729     current->start_start = segment->start;
1730
1731   if (current->format == GST_FORMAT_TIME) {
1732     end = current->start + current->amount;
1733     if (!current->flush) {
1734       /* update the segment clipping regions for non-flushing seeks */
1735       if (segment->rate > 0.0) {
1736         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1737         segment->last_stop = segment->stop;
1738       } else {
1739         gint64 position;
1740
1741         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1742         segment->time = position;
1743         segment->start = position;
1744         segment->last_stop = position;
1745       }
1746     }
1747   }
1748
1749   GST_DEBUG_OBJECT (sink,
1750       "segment now rate %lf, applied rate %lf, "
1751       "format GST_FORMAT_TIME, "
1752       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1753       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1754       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1755       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1756       GST_TIME_ARGS (segment->accum));
1757
1758   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1759       GST_TIME_ARGS (current->start));
1760
1761   if (current->amount == -1) {
1762     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1763     current->valid = FALSE;
1764   } else {
1765     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1766         "rate: %f", current->amount, gst_format_get_name (current->format),
1767         current->rate);
1768   }
1769 }
1770
1771 static void
1772 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1773     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1774 {
1775   gint64 stop, position;
1776   GstMessage *message;
1777
1778   GST_DEBUG_OBJECT (sink, "step complete");
1779
1780   if (segment->rate > 0.0)
1781     stop = rstart;
1782   else
1783     stop = rstop;
1784
1785   GST_DEBUG_OBJECT (sink,
1786       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1787
1788   if (stop == -1)
1789     current->duration = current->position;
1790   else
1791     current->duration = stop - current->start;
1792
1793   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1794       GST_TIME_ARGS (current->duration));
1795
1796   position = current->start + current->duration;
1797
1798   /* now move the segment to the new running time */
1799   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1800
1801   if (current->flush) {
1802     /* and remove the accumulated time we flushed, start time did not change */
1803     segment->accum = current->start;
1804   } else {
1805     /* start time is now the stepped position */
1806     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1807   }
1808
1809   /* restore the previous rate */
1810   segment->rate = current->start_rate;
1811   segment->abs_rate = ABS (segment->rate);
1812
1813   if (segment->rate > 0.0)
1814     segment->stop = current->start_stop;
1815   else
1816     segment->start = current->start_start;
1817
1818   /* the clip segment is used for position report in paused... */
1819   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1820
1821   /* post the step done when we know the stepped duration in TIME */
1822   message =
1823       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1824       current->amount, current->rate, current->flush, current->intermediate,
1825       current->duration, eos);
1826   gst_message_set_seqnum (message, current->seqnum);
1827   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1828
1829   if (!current->intermediate)
1830     sink->need_preroll = current->need_preroll;
1831
1832   /* and the current step info finished and becomes invalid */
1833   current->valid = FALSE;
1834 }
1835
1836 static gboolean
1837 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1838     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1839     gint64 * rstop)
1840 {
1841   gboolean step_end = FALSE;
1842
1843   /* see if we need to skip this buffer because of stepping */
1844   switch (current->format) {
1845     case GST_FORMAT_TIME:
1846     {
1847       guint64 end;
1848       gint64 first, last;
1849
1850       if (segment->rate > 0.0) {
1851         if (segment->stop == *cstop)
1852           *rstop = *rstart + current->amount;
1853
1854         first = *rstart;
1855         last = *rstop;
1856       } else {
1857         if (segment->start == *cstart)
1858           *rstart = *rstop + current->amount;
1859
1860         first = *rstop;
1861         last = *rstart;
1862       }
1863
1864       end = current->start + current->amount;
1865       current->position = first - current->start;
1866
1867       if (G_UNLIKELY (segment->abs_rate != 1.0))
1868         current->position /= segment->abs_rate;
1869
1870       GST_DEBUG_OBJECT (sink,
1871           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1872           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1873       GST_DEBUG_OBJECT (sink,
1874           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1875           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1876           GST_TIME_ARGS (last - current->start),
1877           GST_TIME_ARGS (current->amount));
1878
1879       if ((current->flush && current->position >= current->amount)
1880           || last >= end) {
1881         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1882         step_end = TRUE;
1883         if (segment->rate > 0.0) {
1884           *rstart = end;
1885           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1886         } else {
1887           *rstop = end;
1888           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1889         }
1890       }
1891       GST_DEBUG_OBJECT (sink,
1892           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1893           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1894       GST_DEBUG_OBJECT (sink,
1895           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1896           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1897       break;
1898     }
1899     case GST_FORMAT_BUFFERS:
1900       GST_DEBUG_OBJECT (sink,
1901           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1902           current->position, current->amount);
1903
1904       if (current->position < current->amount) {
1905         current->position++;
1906       } else {
1907         step_end = TRUE;
1908       }
1909       break;
1910     case GST_FORMAT_DEFAULT:
1911     default:
1912       GST_DEBUG_OBJECT (sink,
1913           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1914           current->position, current->amount);
1915       break;
1916   }
1917   return step_end;
1918 }
1919
1920 /* with STREAM_LOCK, PREROLL_LOCK
1921  *
1922  * Returns TRUE if the object needs synchronisation and takes therefore
1923  * part in prerolling.
1924  *
1925  * rsstart/rsstop contain the start/stop in stream time.
1926  * rrstart/rrstop contain the start/stop in running time.
1927  */
1928 static gboolean
1929 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1930     GstClockTime * rsstart, GstClockTime * rsstop,
1931     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1932     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1933     gboolean * step_end, guint8 obj_type)
1934 {
1935   GstBaseSinkClass *bclass;
1936   GstBuffer *buffer;
1937   GstClockTime start, stop;     /* raw start/stop timestamps */
1938   gint64 cstart, cstop;         /* clipped raw timestamps */
1939   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1940   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1941   GstFormat format;
1942   GstBaseSinkPrivate *priv;
1943   gboolean eos;
1944
1945   priv = basesink->priv;
1946
1947   /* start with nothing */
1948   start = stop = GST_CLOCK_TIME_NONE;
1949
1950   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1951     GstEvent *event = GST_EVENT_CAST (obj);
1952
1953     switch (GST_EVENT_TYPE (event)) {
1954         /* EOS event needs syncing */
1955       case GST_EVENT_EOS:
1956       {
1957         if (basesink->segment.rate >= 0.0) {
1958           sstart = sstop = priv->current_sstop;
1959           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1960             /* we have not seen a buffer yet, use the segment values */
1961             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1962                 basesink->segment.format, basesink->segment.stop);
1963           }
1964         } else {
1965           sstart = sstop = priv->current_sstart;
1966           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1967             /* we have not seen a buffer yet, use the segment values */
1968             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1969                 basesink->segment.format, basesink->segment.start);
1970           }
1971         }
1972
1973         rstart = rstop = priv->eos_rtime;
1974         *do_sync = rstart != -1;
1975         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1976             GST_TIME_ARGS (rstart));
1977         /* if we are stepping, we end now */
1978         *step_end = step->valid;
1979         eos = TRUE;
1980         goto eos_done;
1981       }
1982       default:
1983         /* other events do not need syncing */
1984         /* FIXME, maybe NEWSEGMENT might need synchronisation
1985          * since the POSITION query depends on accumulated times and
1986          * we cannot accumulate the current segment before the previous
1987          * one completed.
1988          */
1989         return FALSE;
1990     }
1991   }
1992
1993   eos = FALSE;
1994
1995 again:
1996   /* else do buffer sync code */
1997   buffer = GST_BUFFER_CAST (obj);
1998
1999   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2000
2001   /* just get the times to see if we need syncing, if the start returns -1 we
2002    * don't sync. */
2003   if (bclass->get_times)
2004     bclass->get_times (basesink, buffer, &start, &stop);
2005
2006   if (!GST_CLOCK_TIME_IS_VALID (start)) {
2007     /* we don't need to sync but we still want to get the timestamps for
2008      * tracking the position */
2009     gst_base_sink_get_times (basesink, buffer, &start, &stop);
2010     *do_sync = FALSE;
2011   } else {
2012     *do_sync = TRUE;
2013   }
2014
2015   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2016       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2017       GST_TIME_ARGS (stop), *do_sync);
2018
2019   /* collect segment and format for code clarity */
2020   format = segment->format;
2021
2022   /* no timestamp clipping if we did not get a TIME segment format */
2023   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2024     cstart = start;
2025     cstop = stop;
2026     /* do running and stream time in TIME format */
2027     format = GST_FORMAT_TIME;
2028     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2029     goto do_times;
2030   }
2031
2032   /* clip, only when we know about time */
2033   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2034               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2035     if (step->valid) {
2036       GST_DEBUG_OBJECT (basesink, "step out of segment");
2037       /* when we are stepping, pretend we're at the end of the segment */
2038       if (segment->rate > 0.0) {
2039         cstart = segment->stop;
2040         cstop = segment->stop;
2041       } else {
2042         cstart = segment->start;
2043         cstop = segment->start;
2044       }
2045       goto do_times;
2046     }
2047     goto out_of_segment;
2048   }
2049
2050   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2051     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2052         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2053         GST_TIME_ARGS (cstop));
2054   }
2055
2056   /* set last stop position */
2057   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2058     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2059   else
2060     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2061
2062 do_times:
2063   rstart = gst_segment_to_running_time (segment, format, cstart);
2064   rstop = gst_segment_to_running_time (segment, format, cstop);
2065
2066   if (G_UNLIKELY (step->valid)) {
2067     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2068                 &rstart, &rstop))) {
2069       /* step is still busy, we discard data when we are flushing */
2070       *stepped = step->flush;
2071       GST_DEBUG_OBJECT (basesink, "stepping busy");
2072     }
2073   }
2074   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2075    * upstream is behaving very badly */
2076   sstart = gst_segment_to_stream_time (segment, format, cstart);
2077   sstop = gst_segment_to_stream_time (segment, format, cstop);
2078
2079 eos_done:
2080   /* eos_done label only called when doing EOS, we also stop stepping then */
2081   if (*step_end && step->flush) {
2082     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2083     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2084     *step_end = FALSE;
2085     /* re-determine running start times for adjusted segment
2086      * (which has a flushed amount of running/accumulated time removed) */
2087     if (!GST_IS_EVENT (obj)) {
2088       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2089       goto again;
2090     }
2091   }
2092
2093   /* save times */
2094   *rsstart = sstart;
2095   *rsstop = sstop;
2096   *rrstart = rstart;
2097   *rrstop = rstop;
2098
2099   /* buffers and EOS always need syncing and preroll */
2100   return TRUE;
2101
2102   /* special cases */
2103 out_of_segment:
2104   {
2105     /* we usually clip in the chain function already but stepping could cause
2106      * the segment to be updated later. we return FALSE so that we don't try
2107      * to sync on it. */
2108     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2109     return FALSE;
2110   }
2111 }
2112
2113 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2114  * adjust a timestamp with the latency and timestamp offset. This function does
2115  * not adjust for the render delay. */
2116 static GstClockTime
2117 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2118 {
2119   GstClockTimeDiff ts_offset;
2120
2121   /* don't do anything funny with invalid timestamps */
2122   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2123     return time;
2124
2125   time += basesink->priv->latency;
2126
2127   /* apply offset, be carefull for underflows */
2128   ts_offset = basesink->priv->ts_offset;
2129   if (ts_offset < 0) {
2130     ts_offset = -ts_offset;
2131     if (ts_offset < time)
2132       time -= ts_offset;
2133     else
2134       time = 0;
2135   } else
2136     time += ts_offset;
2137
2138   /* subtract the render delay again, which was included in the latency */
2139   if (time > basesink->priv->render_delay)
2140     time -= basesink->priv->render_delay;
2141   else
2142     time = 0;
2143
2144   return time;
2145 }
2146
2147 /**
2148  * gst_base_sink_wait_clock:
2149  * @sink: the sink
2150  * @time: the running_time to be reached
2151  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2152  *
2153  * This function will block until @time is reached. It is usually called by
2154  * subclasses that use their own internal synchronisation.
2155  *
2156  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2157  * returned. Likewise, if synchronisation is disabled in the element or there
2158  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2159  *
2160  * This function should only be called with the PREROLL_LOCK held, like when
2161  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2162  * receiving a buffer in
2163  * the #GstBaseSinkClass.render() vmethod.
2164  *
2165  * The @time argument should be the running_time of when this method should
2166  * return and is not adjusted with any latency or offset configured in the
2167  * sink.
2168  *
2169  * Since: 0.10.20
2170  *
2171  * Returns: #GstClockReturn
2172  */
2173 GstClockReturn
2174 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2175     GstClockTimeDiff * jitter)
2176 {
2177   GstClockReturn ret;
2178   GstClock *clock;
2179   GstClockTime base_time;
2180
2181   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2182     goto invalid_time;
2183
2184   GST_OBJECT_LOCK (sink);
2185   if (G_UNLIKELY (!sink->sync))
2186     goto no_sync;
2187
2188   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2189     goto no_clock;
2190
2191   base_time = GST_ELEMENT_CAST (sink)->base_time;
2192   GST_LOG_OBJECT (sink,
2193       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2194       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2195
2196   /* add base_time to running_time to get the time against the clock */
2197   time += base_time;
2198
2199   /* Re-use existing clockid if available */
2200   /* FIXME: Casting to GstClockEntry only works because the types
2201    * are the same */
2202   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2203           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->priv->
2204               cached_clock_id) == clock)) {
2205     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2206             time)) {
2207       gst_clock_id_unref (sink->priv->cached_clock_id);
2208       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2209     }
2210   } else {
2211     if (sink->priv->cached_clock_id != NULL)
2212       gst_clock_id_unref (sink->priv->cached_clock_id);
2213     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2214   }
2215   GST_OBJECT_UNLOCK (sink);
2216
2217   /* A blocking wait is performed on the clock. We save the ClockID
2218    * so we can unlock the entry at any time. While we are blocking, we
2219    * release the PREROLL_LOCK so that other threads can interrupt the
2220    * entry. */
2221   sink->clock_id = sink->priv->cached_clock_id;
2222   /* release the preroll lock while waiting */
2223   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2224
2225   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2226
2227   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2228   sink->clock_id = NULL;
2229
2230   return ret;
2231
2232   /* no syncing needed */
2233 invalid_time:
2234   {
2235     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2236     return GST_CLOCK_BADTIME;
2237   }
2238 no_sync:
2239   {
2240     GST_DEBUG_OBJECT (sink, "sync disabled");
2241     GST_OBJECT_UNLOCK (sink);
2242     return GST_CLOCK_BADTIME;
2243   }
2244 no_clock:
2245   {
2246     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2247     GST_OBJECT_UNLOCK (sink);
2248     return GST_CLOCK_BADTIME;
2249   }
2250 }
2251
2252 /**
2253  * gst_base_sink_wait_preroll:
2254  * @sink: the sink
2255  *
2256  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2257  * against the clock it must unblock when going from PLAYING to the PAUSED state
2258  * and call this method before continuing to render the remaining data.
2259  *
2260  * This function will block until a state change to PLAYING happens (in which
2261  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2262  * to a state change to READY or a FLUSH event (in which case this function
2263  * returns #GST_FLOW_WRONG_STATE).
2264  *
2265  * This function should only be called with the PREROLL_LOCK held, like in the
2266  * render function.
2267  *
2268  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2269  * continue. Any other return value should be returned from the render vmethod.
2270  *
2271  * Since: 0.10.11
2272  */
2273 GstFlowReturn
2274 gst_base_sink_wait_preroll (GstBaseSink * sink)
2275 {
2276   sink->have_preroll = TRUE;
2277   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2278   /* block until the state changes, or we get a flush, or something */
2279   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2280   sink->have_preroll = FALSE;
2281   if (G_UNLIKELY (sink->flushing))
2282     goto stopping;
2283   if (G_UNLIKELY (sink->priv->step_unlock))
2284     goto step_unlocked;
2285   GST_DEBUG_OBJECT (sink, "continue after preroll");
2286
2287   return GST_FLOW_OK;
2288
2289   /* ERRORS */
2290 stopping:
2291   {
2292     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2293     return GST_FLOW_WRONG_STATE;
2294   }
2295 step_unlocked:
2296   {
2297     sink->priv->step_unlock = FALSE;
2298     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2299     return GST_FLOW_STEP;
2300   }
2301 }
2302
2303 static inline guint8
2304 get_object_type (GstMiniObject * obj)
2305 {
2306   guint8 obj_type;
2307
2308   if (G_LIKELY (GST_IS_BUFFER (obj)))
2309     obj_type = _PR_IS_BUFFER;
2310   else if (GST_IS_EVENT (obj))
2311     obj_type = _PR_IS_EVENT;
2312   else if (GST_IS_BUFFER_LIST (obj))
2313     obj_type = _PR_IS_BUFFERLIST;
2314   else
2315     obj_type = _PR_IS_NOTHING;
2316
2317   return obj_type;
2318 }
2319
2320 /**
2321  * gst_base_sink_do_preroll:
2322  * @sink: the sink
2323  * @obj: (transfer none): the mini object that caused the preroll
2324  *
2325  * If the @sink spawns its own thread for pulling buffers from upstream it
2326  * should call this method after it has pulled a buffer. If the element needed
2327  * to preroll, this function will perform the preroll and will then block
2328  * until the element state is changed.
2329  *
2330  * This function should be called with the PREROLL_LOCK held.
2331  *
2332  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2333  * continue. Any other return value should be returned from the render vmethod.
2334  *
2335  * Since: 0.10.22
2336  */
2337 GstFlowReturn
2338 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2339 {
2340   GstFlowReturn ret;
2341
2342   while (G_UNLIKELY (sink->need_preroll)) {
2343     guint8 obj_type;
2344     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2345
2346     obj_type = get_object_type (obj);
2347
2348     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2349     if (ret != GST_FLOW_OK)
2350       goto preroll_failed;
2351
2352     /* need to recheck here because the commit state could have
2353      * made us not need the preroll anymore */
2354     if (G_LIKELY (sink->need_preroll)) {
2355       /* block until the state changes, or we get a flush, or something */
2356       ret = gst_base_sink_wait_preroll (sink);
2357       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2358         goto preroll_failed;
2359     }
2360   }
2361   return GST_FLOW_OK;
2362
2363   /* ERRORS */
2364 preroll_failed:
2365   {
2366     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2367     return ret;
2368   }
2369 }
2370
2371 /**
2372  * gst_base_sink_wait_eos:
2373  * @sink: the sink
2374  * @time: the running_time to be reached
2375  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2376  *
2377  * This function will block until @time is reached. It is usually called by
2378  * subclasses that use their own internal synchronisation but want to let the
2379  * EOS be handled by the base class.
2380  *
2381  * This function should only be called with the PREROLL_LOCK held, like when
2382  * receiving an EOS event in the ::event vmethod.
2383  *
2384  * The @time argument should be the running_time of when the EOS should happen
2385  * and will be adjusted with any latency and offset configured in the sink.
2386  *
2387  * Returns: #GstFlowReturn
2388  *
2389  * Since: 0.10.15
2390  */
2391 GstFlowReturn
2392 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2393     GstClockTimeDiff * jitter)
2394 {
2395   GstClockReturn status;
2396   GstFlowReturn ret;
2397
2398   do {
2399     GstClockTime stime;
2400
2401     GST_DEBUG_OBJECT (sink, "checking preroll");
2402
2403     /* first wait for the playing state before we can continue */
2404     while (G_UNLIKELY (sink->need_preroll)) {
2405       ret = gst_base_sink_wait_preroll (sink);
2406       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2407         goto flushing;
2408     }
2409
2410     /* preroll done, we can sync since we are in PLAYING now. */
2411     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2412         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2413
2414     /* compensate for latency and ts_offset. We don't adjust for render delay
2415      * because we don't interact with the device on EOS normally. */
2416     stime = gst_base_sink_adjust_time (sink, time);
2417
2418     /* wait for the clock, this can be interrupted because we got shut down or
2419      * we PAUSED. */
2420     status = gst_base_sink_wait_clock (sink, stime, jitter);
2421
2422     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2423
2424     /* invalid time, no clock or sync disabled, just continue then */
2425     if (status == GST_CLOCK_BADTIME)
2426       break;
2427
2428     /* waiting could have been interrupted and we can be flushing now */
2429     if (G_UNLIKELY (sink->flushing))
2430       goto flushing;
2431
2432     /* retry if we got unscheduled, which means we did not reach the timeout
2433      * yet. if some other error occures, we continue. */
2434   } while (status == GST_CLOCK_UNSCHEDULED);
2435
2436   GST_DEBUG_OBJECT (sink, "end of stream");
2437
2438   return GST_FLOW_OK;
2439
2440   /* ERRORS */
2441 flushing:
2442   {
2443     GST_DEBUG_OBJECT (sink, "we are flushing");
2444     return GST_FLOW_WRONG_STATE;
2445   }
2446 }
2447
2448 /* with STREAM_LOCK, PREROLL_LOCK
2449  *
2450  * Make sure we are in PLAYING and synchronize an object to the clock.
2451  *
2452  * If we need preroll, we are not in PLAYING. We try to commit the state
2453  * if needed and then block if we still are not PLAYING.
2454  *
2455  * We start waiting on the clock in PLAYING. If we got interrupted, we
2456  * immediately try to re-preroll.
2457  *
2458  * Some objects do not need synchronisation (most events) and so this function
2459  * immediately returns GST_FLOW_OK.
2460  *
2461  * for objects that arrive later than max-lateness to be synchronized to the
2462  * clock have the @late boolean set to TRUE.
2463  *
2464  * This function keeps a running average of the jitter (the diff between the
2465  * clock time and the requested sync time). The jitter is negative for
2466  * objects that arrive in time and positive for late buffers.
2467  *
2468  * does not take ownership of obj.
2469  */
2470 static GstFlowReturn
2471 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2472     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2473 {
2474   GstClockTimeDiff jitter = 0;
2475   gboolean syncable;
2476   GstClockReturn status = GST_CLOCK_OK;
2477   GstClockTime rstart, rstop, sstart, sstop, stime;
2478   gboolean do_sync;
2479   GstBaseSinkPrivate *priv;
2480   GstFlowReturn ret;
2481   GstStepInfo *current, *pending;
2482   gboolean stepped;
2483
2484   priv = basesink->priv;
2485
2486 do_step:
2487   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2488   do_sync = TRUE;
2489   stepped = FALSE;
2490
2491   priv->current_rstart = GST_CLOCK_TIME_NONE;
2492
2493   /* get stepping info */
2494   current = &priv->current_step;
2495   pending = &priv->pending_step;
2496
2497   /* get timing information for this object against the render segment */
2498   syncable = gst_base_sink_get_sync_times (basesink, obj,
2499       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2500       current, step_end, obj_type);
2501
2502   if (G_UNLIKELY (stepped))
2503     goto step_skipped;
2504
2505   /* a syncable object needs to participate in preroll and
2506    * clocking. All buffers and EOS are syncable. */
2507   if (G_UNLIKELY (!syncable))
2508     goto not_syncable;
2509
2510   /* store timing info for current object */
2511   priv->current_rstart = rstart;
2512   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2513
2514   /* save sync time for eos when the previous object needed sync */
2515   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2516
2517   /* calculate inter frame spacing */
2518   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2519     GstClockTime in_diff;
2520
2521     in_diff = rstart - priv->prev_rstart;
2522
2523     if (priv->avg_in_diff == -1)
2524       priv->avg_in_diff = in_diff;
2525     else
2526       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2527
2528     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2529         GST_TIME_ARGS (priv->avg_in_diff));
2530
2531   }
2532   priv->prev_rstart = rstart;
2533
2534   if (G_UNLIKELY (priv->earliest_in_time != -1
2535           && rstart < priv->earliest_in_time))
2536     goto qos_dropped;
2537
2538 again:
2539   /* first do preroll, this makes sure we commit our state
2540    * to PAUSED and can continue to PLAYING. We cannot perform
2541    * any clock sync in PAUSED because there is no clock. */
2542   ret = gst_base_sink_do_preroll (basesink, obj);
2543   if (G_UNLIKELY (ret != GST_FLOW_OK))
2544     goto preroll_failed;
2545
2546   /* update the segment with a pending step if the current one is invalid and we
2547    * have a new pending one. We only accept new step updates after a preroll */
2548   if (G_UNLIKELY (pending->valid && !current->valid)) {
2549     start_stepping (basesink, &basesink->segment, pending, current);
2550     goto do_step;
2551   }
2552
2553   /* After rendering we store the position of the last buffer so that we can use
2554    * it to report the position. We need to take the lock here. */
2555   GST_OBJECT_LOCK (basesink);
2556   priv->current_sstart = sstart;
2557   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2558   GST_OBJECT_UNLOCK (basesink);
2559
2560   if (!do_sync)
2561     goto done;
2562
2563   /* adjust for latency */
2564   stime = gst_base_sink_adjust_time (basesink, rstart);
2565
2566   /* adjust for render-delay, avoid underflows */
2567   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2568     if (stime > priv->render_delay)
2569       stime -= priv->render_delay;
2570     else
2571       stime = 0;
2572   }
2573
2574   /* preroll done, we can sync since we are in PLAYING now. */
2575   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2576       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2577       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2578
2579   /* This function will return immediately if start == -1, no clock
2580    * or sync is disabled with GST_CLOCK_BADTIME. */
2581   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2582
2583   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2584       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2585
2586   /* invalid time, no clock or sync disabled, just render */
2587   if (status == GST_CLOCK_BADTIME)
2588     goto done;
2589
2590   /* waiting could have been interrupted and we can be flushing now */
2591   if (G_UNLIKELY (basesink->flushing))
2592     goto flushing;
2593
2594   /* check for unlocked by a state change, we are not flushing so
2595    * we can try to preroll on the current buffer. */
2596   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2597     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2598     priv->call_preroll = TRUE;
2599     goto again;
2600   }
2601
2602   /* successful syncing done, record observation */
2603   priv->current_jitter = jitter;
2604
2605   /* check if the object should be dropped */
2606   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2607       status, jitter);
2608
2609 done:
2610   return GST_FLOW_OK;
2611
2612   /* ERRORS */
2613 step_skipped:
2614   {
2615     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2616     *late = TRUE;
2617     return GST_FLOW_OK;
2618   }
2619 not_syncable:
2620   {
2621     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2622     return GST_FLOW_OK;
2623   }
2624 qos_dropped:
2625   {
2626     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2627     *late = TRUE;
2628     return GST_FLOW_OK;
2629   }
2630 flushing:
2631   {
2632     GST_DEBUG_OBJECT (basesink, "we are flushing");
2633     return GST_FLOW_WRONG_STATE;
2634   }
2635 preroll_failed:
2636   {
2637     GST_DEBUG_OBJECT (basesink, "preroll failed");
2638     *step_end = FALSE;
2639     return ret;
2640   }
2641 }
2642
2643 static gboolean
2644 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2645     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2646 {
2647   GstEvent *event;
2648   gboolean res;
2649
2650   /* generate Quality-of-Service event */
2651   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2652       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2653       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2654
2655   event = gst_event_new_qos_full (type, proportion, diff, time);
2656
2657   /* send upstream */
2658   res = gst_pad_push_event (basesink->sinkpad, event);
2659
2660   return res;
2661 }
2662
2663 static void
2664 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2665 {
2666   GstBaseSinkPrivate *priv;
2667   GstClockTime start, stop;
2668   GstClockTimeDiff jitter;
2669   GstClockTime pt, entered, left;
2670   GstClockTime duration;
2671   gdouble rate;
2672
2673   priv = sink->priv;
2674
2675   start = priv->current_rstart;
2676
2677   if (priv->current_step.valid)
2678     return;
2679
2680   /* if Quality-of-Service disabled, do nothing */
2681   if (!g_atomic_int_get (&priv->qos_enabled) ||
2682       !GST_CLOCK_TIME_IS_VALID (start))
2683     return;
2684
2685   stop = priv->current_rstop;
2686   jitter = priv->current_jitter;
2687
2688   if (jitter < 0) {
2689     /* this is the time the buffer entered the sink */
2690     if (start < -jitter)
2691       entered = 0;
2692     else
2693       entered = start + jitter;
2694     left = start;
2695   } else {
2696     /* this is the time the buffer entered the sink */
2697     entered = start + jitter;
2698     /* this is the time the buffer left the sink */
2699     left = start + jitter;
2700   }
2701
2702   /* calculate duration of the buffer */
2703   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2704     duration = stop - start;
2705   else
2706     duration = priv->avg_in_diff;
2707
2708   /* if we have the time when the last buffer left us, calculate
2709    * processing time */
2710   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2711     if (entered > priv->last_left) {
2712       pt = entered - priv->last_left;
2713     } else {
2714       pt = 0;
2715     }
2716   } else {
2717     pt = priv->avg_pt;
2718   }
2719
2720   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2721       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2722       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2723       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2724       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2725       GST_TIME_ARGS (duration), jitter);
2726
2727   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2728       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2729       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2730       priv->avg_rate);
2731
2732   /* collect running averages. for first observations, we copy the
2733    * values */
2734   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2735     priv->avg_duration = duration;
2736   else
2737     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2738
2739   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2740     priv->avg_pt = pt;
2741   else
2742     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2743
2744   if (priv->avg_duration != 0)
2745     rate =
2746         gst_guint64_to_gdouble (priv->avg_pt) /
2747         gst_guint64_to_gdouble (priv->avg_duration);
2748   else
2749     rate = 1.0;
2750
2751   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2752     if (dropped || priv->avg_rate < 0.0) {
2753       priv->avg_rate = rate;
2754     } else {
2755       if (rate > 1.0)
2756         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2757       else
2758         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2759     }
2760   }
2761
2762   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2763       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2764       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2765       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2766
2767
2768   if (priv->avg_rate >= 0.0) {
2769     GstQOSType type;
2770     GstClockTimeDiff diff;
2771
2772     /* if we have a valid rate, start sending QoS messages */
2773     if (priv->current_jitter < 0) {
2774       /* make sure we never go below 0 when adding the jitter to the
2775        * timestamp. */
2776       if (priv->current_rstart < -priv->current_jitter)
2777         priv->current_jitter = -priv->current_rstart;
2778     }
2779
2780     if (priv->throttle_time > 0) {
2781       diff = priv->throttle_time;
2782       type = GST_QOS_TYPE_THROTTLE;
2783     } else {
2784       diff = priv->current_jitter;
2785       if (diff <= 0)
2786         type = GST_QOS_TYPE_OVERFLOW;
2787       else
2788         type = GST_QOS_TYPE_UNDERFLOW;
2789     }
2790
2791     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2792         diff);
2793   }
2794
2795   /* record when this buffer will leave us */
2796   priv->last_left = left;
2797 }
2798
2799 /* reset all qos measuring */
2800 static void
2801 gst_base_sink_reset_qos (GstBaseSink * sink)
2802 {
2803   GstBaseSinkPrivate *priv;
2804
2805   priv = sink->priv;
2806
2807   priv->last_render_time = GST_CLOCK_TIME_NONE;
2808   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2809   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2810   priv->last_left = GST_CLOCK_TIME_NONE;
2811   priv->avg_duration = GST_CLOCK_TIME_NONE;
2812   priv->avg_pt = GST_CLOCK_TIME_NONE;
2813   priv->avg_rate = -1.0;
2814   priv->avg_render = GST_CLOCK_TIME_NONE;
2815   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2816   priv->rendered = 0;
2817   priv->dropped = 0;
2818
2819 }
2820
2821 /* Checks if the object was scheduled too late.
2822  *
2823  * rstart/rstop contain the running_time start and stop values
2824  * of the object.
2825  *
2826  * status and jitter contain the return values from the clock wait.
2827  *
2828  * returns TRUE if the buffer was too late.
2829  */
2830 static gboolean
2831 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2832     GstClockTime rstart, GstClockTime rstop,
2833     GstClockReturn status, GstClockTimeDiff jitter)
2834 {
2835   gboolean late;
2836   gint64 max_lateness;
2837   GstBaseSinkPrivate *priv;
2838
2839   priv = basesink->priv;
2840
2841   late = FALSE;
2842
2843   /* only for objects that were too late */
2844   if (G_LIKELY (status != GST_CLOCK_EARLY))
2845     goto in_time;
2846
2847   max_lateness = basesink->abidata.ABI.max_lateness;
2848
2849   /* check if frame dropping is enabled */
2850   if (max_lateness == -1)
2851     goto no_drop;
2852
2853   /* only check for buffers */
2854   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2855     goto not_buffer;
2856
2857   /* can't do check if we don't have a timestamp */
2858   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2859     goto no_timestamp;
2860
2861   /* we can add a valid stop time */
2862   if (GST_CLOCK_TIME_IS_VALID (rstop))
2863     max_lateness += rstop;
2864   else {
2865     max_lateness += rstart;
2866     /* no stop time, use avg frame diff */
2867     if (priv->avg_in_diff != -1)
2868       max_lateness += priv->avg_in_diff;
2869   }
2870
2871   /* if the jitter bigger than duration and lateness we are too late */
2872   if ((late = rstart + jitter > max_lateness)) {
2873     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2874         "buffer is too late %" GST_TIME_FORMAT
2875         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2876         GST_TIME_ARGS (max_lateness));
2877     /* !!emergency!!, if we did not receive anything valid for more than a
2878      * second, render it anyway so the user sees something */
2879     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2880         rstart - priv->last_render_time > GST_SECOND) {
2881       late = FALSE;
2882       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2883           (_("A lot of buffers are being dropped.")),
2884           ("There may be a timestamping problem, or this computer is too slow."));
2885       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2886           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2887           GST_TIME_ARGS (priv->last_render_time));
2888     }
2889   }
2890
2891 done:
2892   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2893     priv->last_render_time = rstart;
2894     /* the next allowed input timestamp */
2895     if (priv->throttle_time > 0)
2896       priv->earliest_in_time = rstart + priv->throttle_time;
2897   }
2898   return late;
2899
2900   /* all is fine */
2901 in_time:
2902   {
2903     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2904     goto done;
2905   }
2906 no_drop:
2907   {
2908     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2909     goto done;
2910   }
2911 not_buffer:
2912   {
2913     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2914     return FALSE;
2915   }
2916 no_timestamp:
2917   {
2918     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2919     return FALSE;
2920   }
2921 }
2922
2923 /* called before and after calling the render vmethod. It keeps track of how
2924  * much time was spent in the render method and is used to check if we are
2925  * flooded */
2926 static void
2927 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2928 {
2929   GstBaseSinkPrivate *priv;
2930
2931   priv = basesink->priv;
2932
2933   if (start) {
2934     priv->start = gst_util_get_timestamp ();
2935   } else {
2936     GstClockTime elapsed;
2937
2938     priv->stop = gst_util_get_timestamp ();
2939
2940     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2941
2942     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2943       priv->avg_render = elapsed;
2944     else
2945       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2946
2947     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2948         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2949   }
2950 }
2951
2952 /* with STREAM_LOCK, PREROLL_LOCK,
2953  *
2954  * Synchronize the object on the clock and then render it.
2955  *
2956  * takes ownership of obj.
2957  */
2958 static GstFlowReturn
2959 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2960     guint8 obj_type, gpointer obj)
2961 {
2962   GstFlowReturn ret;
2963   GstBaseSinkClass *bclass;
2964   gboolean late, step_end;
2965   gpointer sync_obj;
2966   GstBaseSinkPrivate *priv;
2967
2968   priv = basesink->priv;
2969
2970   if (OBJ_IS_BUFFERLIST (obj_type)) {
2971     /*
2972      * If buffer list, use the first group buffer within the list
2973      * for syncing
2974      */
2975     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2976     g_assert (NULL != sync_obj);
2977   } else {
2978     sync_obj = obj;
2979   }
2980
2981 again:
2982   late = FALSE;
2983   step_end = FALSE;
2984
2985   /* synchronize this object, non syncable objects return OK
2986    * immediately. */
2987   ret =
2988       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2989       obj_type);
2990   if (G_UNLIKELY (ret != GST_FLOW_OK))
2991     goto sync_failed;
2992
2993   /* and now render, event or buffer/buffer list. */
2994   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2995     /* drop late buffers unconditionally, let's hope it's unlikely */
2996     if (G_UNLIKELY (late))
2997       goto dropped;
2998
2999     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3000
3001     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
3002             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
3003       gint do_qos;
3004
3005       /* read once, to get same value before and after */
3006       do_qos = g_atomic_int_get (&priv->qos_enabled);
3007
3008       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
3009
3010       /* record rendering time for QoS and stats */
3011       if (do_qos)
3012         gst_base_sink_do_render_stats (basesink, TRUE);
3013
3014       if (!OBJ_IS_BUFFERLIST (obj_type)) {
3015         GstBuffer *buf;
3016
3017         /* For buffer lists do not set last buffer. Creating buffer
3018          * with meaningful data can be done only with memcpy which will
3019          * significantly affect performance */
3020         buf = GST_BUFFER_CAST (obj);
3021         gst_base_sink_set_last_buffer (basesink, buf);
3022
3023         ret = bclass->render (basesink, buf);
3024       } else {
3025         GstBufferList *buflist;
3026
3027         buflist = GST_BUFFER_LIST_CAST (obj);
3028
3029         ret = bclass->render_list (basesink, buflist);
3030       }
3031
3032       if (do_qos)
3033         gst_base_sink_do_render_stats (basesink, FALSE);
3034
3035       if (ret == GST_FLOW_STEP)
3036         goto again;
3037
3038       if (G_UNLIKELY (basesink->flushing))
3039         goto flushing;
3040
3041       priv->rendered++;
3042     }
3043   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3044     GstEvent *event = GST_EVENT_CAST (obj);
3045     gboolean event_res = TRUE;
3046     GstEventType type;
3047
3048     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3049
3050     type = GST_EVENT_TYPE (event);
3051
3052     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3053         gst_event_type_get_name (type));
3054
3055     if (bclass->event)
3056       event_res = bclass->event (basesink, event);
3057
3058     /* when we get here we could be flushing again when the event handler calls
3059      * _wait_eos(). We have to ignore this object in that case. */
3060     if (G_UNLIKELY (basesink->flushing))
3061       goto flushing;
3062
3063     if (G_LIKELY (event_res)) {
3064       guint32 seqnum;
3065
3066       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3067       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3068
3069       switch (type) {
3070         case GST_EVENT_EOS:
3071         {
3072           GstMessage *message;
3073
3074           /* the EOS event is completely handled so we mark
3075            * ourselves as being in the EOS state. eos is also
3076            * protected by the object lock so we can read it when
3077            * answering the POSITION query. */
3078           GST_OBJECT_LOCK (basesink);
3079           basesink->eos = TRUE;
3080           GST_OBJECT_UNLOCK (basesink);
3081
3082           /* ok, now we can post the message */
3083           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3084
3085           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3086           gst_message_set_seqnum (message, seqnum);
3087           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3088           break;
3089         }
3090         case GST_EVENT_NEWSEGMENT:
3091           /* configure the segment */
3092           gst_base_sink_configure_segment (basesink, pad, event,
3093               &basesink->segment);
3094           break;
3095         case GST_EVENT_SINK_MESSAGE:{
3096           GstMessage *msg = NULL;
3097
3098           gst_event_parse_sink_message (event, &msg);
3099
3100           if (msg)
3101             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3102         }
3103         default:
3104           break;
3105       }
3106     }
3107   } else {
3108     g_return_val_if_reached (GST_FLOW_ERROR);
3109   }
3110
3111 done:
3112   if (step_end) {
3113     /* the step ended, check if we need to activate a new step */
3114     GST_DEBUG_OBJECT (basesink, "step ended");
3115     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3116         priv->current_rstart, priv->current_rstop, basesink->eos);
3117     goto again;
3118   }
3119
3120   gst_base_sink_perform_qos (basesink, late);
3121
3122   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3123   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3124   return ret;
3125
3126   /* ERRORS */
3127 sync_failed:
3128   {
3129     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3130     goto done;
3131   }
3132 dropped:
3133   {
3134     priv->dropped++;
3135     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3136
3137     if (g_atomic_int_get (&priv->qos_enabled)) {
3138       GstMessage *qos_msg;
3139       GstClockTime timestamp, duration;
3140
3141       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3142       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3143
3144       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3145           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3146           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3147           GST_TIME_ARGS (priv->current_rstart),
3148           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3149           GST_TIME_ARGS (duration));
3150       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3151           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3152           priv->rendered, priv->dropped);
3153
3154       qos_msg =
3155           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3156           priv->current_rstart, priv->current_sstart, timestamp, duration);
3157       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3158           1000000);
3159       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3160           priv->dropped);
3161       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3162     }
3163     goto done;
3164   }
3165 flushing:
3166   {
3167     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3168     gst_mini_object_unref (obj);
3169     return GST_FLOW_WRONG_STATE;
3170   }
3171 }
3172
3173 /* with STREAM_LOCK, PREROLL_LOCK
3174  *
3175  * Perform preroll on the given object. For buffers this means
3176  * calling the preroll subclass method.
3177  * If that succeeds, the state will be commited.
3178  *
3179  * function does not take ownership of obj.
3180  */
3181 static GstFlowReturn
3182 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3183     GstMiniObject * obj)
3184 {
3185   GstFlowReturn ret;
3186
3187   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3188
3189   /* if it's a buffer, we need to call the preroll method */
3190   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3191     GstBaseSinkClass *bclass;
3192     GstBuffer *buf;
3193     GstClockTime timestamp;
3194
3195     if (OBJ_IS_BUFFERLIST (obj_type)) {
3196       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3197       g_assert (NULL != buf);
3198     } else {
3199       buf = GST_BUFFER_CAST (obj);
3200     }
3201
3202     timestamp = GST_BUFFER_TIMESTAMP (buf);
3203
3204     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3205         GST_TIME_ARGS (timestamp));
3206
3207     /*
3208      * For buffer lists do not set last buffer. Creating buffer
3209      * with meaningful data can be done only with memcpy which will
3210      * significantly affect performance
3211      */
3212     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3213       gst_base_sink_set_last_buffer (basesink, buf);
3214     }
3215
3216     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3217     if (bclass->preroll)
3218       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3219         goto preroll_failed;
3220
3221     basesink->priv->call_preroll = FALSE;
3222   }
3223
3224   /* commit state */
3225   if (G_LIKELY (basesink->playing_async)) {
3226     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3227       goto stopping;
3228   }
3229
3230   return GST_FLOW_OK;
3231
3232   /* ERRORS */
3233 preroll_failed:
3234   {
3235     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3236     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3237     return ret;
3238   }
3239 stopping:
3240   {
3241     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3242     return GST_FLOW_WRONG_STATE;
3243   }
3244 }
3245
3246 /* with STREAM_LOCK, PREROLL_LOCK
3247  *
3248  * Queue an object for rendering.
3249  * The first prerollable object queued will complete the preroll. If the
3250  * preroll queue is filled, we render all the objects in the queue.
3251  *
3252  * This function takes ownership of the object.
3253  */
3254 static GstFlowReturn
3255 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3256     guint8 obj_type, gpointer obj, gboolean prerollable)
3257 {
3258   GstFlowReturn ret = GST_FLOW_OK;
3259   gint length;
3260   GQueue *q;
3261
3262   if (G_UNLIKELY (basesink->need_preroll)) {
3263     if (G_LIKELY (prerollable))
3264       basesink->preroll_queued++;
3265
3266     length = basesink->preroll_queued;
3267
3268     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3269
3270     /* first prerollable item needs to finish the preroll */
3271     if (length == 1) {
3272       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3273       if (G_UNLIKELY (ret != GST_FLOW_OK))
3274         goto preroll_failed;
3275     }
3276     /* need to recheck if we need preroll, commit state during preroll
3277      * could have made us not need more preroll. */
3278     if (G_UNLIKELY (basesink->need_preroll)) {
3279       /* see if we can render now, if we can't add the object to the preroll
3280        * queue. */
3281       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3282         goto more_preroll;
3283     }
3284   }
3285   /* we can start rendering (or blocking) the queued object
3286    * if any. */
3287   q = basesink->preroll_queue;
3288   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3289     GstMiniObject *o;
3290     guint8 ot;
3291
3292     o = g_queue_pop_head (q);
3293     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3294
3295     ot = get_object_type (o);
3296
3297     /* do something with the return value */
3298     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3299     if (ret != GST_FLOW_OK)
3300       goto dequeue_failed;
3301   }
3302
3303   /* now render the object */
3304   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3305   basesink->preroll_queued = 0;
3306
3307   return ret;
3308
3309   /* special cases */
3310 preroll_failed:
3311   {
3312     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3313         gst_flow_get_name (ret));
3314     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3315     return ret;
3316   }
3317 more_preroll:
3318   {
3319     /* add object to the queue and return */
3320     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3321         length, basesink->preroll_queue_max_len);
3322     g_queue_push_tail (basesink->preroll_queue, obj);
3323     return GST_FLOW_OK;
3324   }
3325 dequeue_failed:
3326   {
3327     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3328         gst_flow_get_name (ret));
3329     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3330     return ret;
3331   }
3332 }
3333
3334 /* with STREAM_LOCK
3335  *
3336  * This function grabs the PREROLL_LOCK and adds the object to
3337  * the queue.
3338  *
3339  * This function takes ownership of obj.
3340  *
3341  * Note: Only GstEvent seem to be passed to this private method
3342  */
3343 static GstFlowReturn
3344 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3345     GstMiniObject * obj, gboolean prerollable)
3346 {
3347   GstFlowReturn ret;
3348
3349   GST_PAD_PREROLL_LOCK (pad);
3350   if (G_UNLIKELY (basesink->flushing))
3351     goto flushing;
3352
3353   if (G_UNLIKELY (basesink->priv->received_eos))
3354     goto was_eos;
3355
3356   ret =
3357       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3358       prerollable);
3359   GST_PAD_PREROLL_UNLOCK (pad);
3360
3361   return ret;
3362
3363   /* ERRORS */
3364 flushing:
3365   {
3366     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3367     GST_PAD_PREROLL_UNLOCK (pad);
3368     gst_mini_object_unref (obj);
3369     return GST_FLOW_WRONG_STATE;
3370   }
3371 was_eos:
3372   {
3373     GST_DEBUG_OBJECT (basesink,
3374         "we are EOS, dropping object, return UNEXPECTED");
3375     GST_PAD_PREROLL_UNLOCK (pad);
3376     gst_mini_object_unref (obj);
3377     return GST_FLOW_UNEXPECTED;
3378   }
3379 }
3380
3381 static void
3382 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3383 {
3384   /* make sure we are not blocked on the clock also clear any pending
3385    * eos state. */
3386   gst_base_sink_set_flushing (basesink, pad, TRUE);
3387
3388   /* we grab the stream lock but that is not needed since setting the
3389    * sink to flushing would make sure no state commit is being done
3390    * anymore */
3391   GST_PAD_STREAM_LOCK (pad);
3392   gst_base_sink_reset_qos (basesink);
3393   /* and we need to commit our state again on the next
3394    * prerolled buffer */
3395   basesink->playing_async = TRUE;
3396   if (basesink->priv->async_enabled) {
3397     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3398   } else {
3399     /* start time reset in above case as well;
3400      * arranges for a.o. proper position reporting when flushing in PAUSED */
3401     gst_element_set_start_time (GST_ELEMENT_CAST (basesink), 0);
3402     basesink->priv->have_latency = TRUE;
3403   }
3404   gst_base_sink_set_last_buffer (basesink, NULL);
3405   GST_PAD_STREAM_UNLOCK (pad);
3406 }
3407
3408 static void
3409 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3410 {
3411   /* unset flushing so we can accept new data, this also flushes out any EOS
3412    * event. */
3413   gst_base_sink_set_flushing (basesink, pad, FALSE);
3414
3415   /* for position reporting */
3416   GST_OBJECT_LOCK (basesink);
3417   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3418   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3419   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3420   basesink->priv->call_preroll = TRUE;
3421   basesink->priv->current_step.valid = FALSE;
3422   basesink->priv->pending_step.valid = FALSE;
3423   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3424     /* we need new segment info after the flush. */
3425     basesink->have_newsegment = FALSE;
3426     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3427     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3428   }
3429   GST_OBJECT_UNLOCK (basesink);
3430 }
3431
3432 static gboolean
3433 gst_base_sink_event (GstPad * pad, GstEvent * event)
3434 {
3435   GstBaseSink *basesink;
3436   gboolean result = TRUE;
3437   GstBaseSinkClass *bclass;
3438
3439   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3440   if (G_UNLIKELY (basesink == NULL)) {
3441     gst_event_unref (event);
3442     return FALSE;
3443   }
3444
3445   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3446
3447   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3448       event);
3449
3450   switch (GST_EVENT_TYPE (event)) {
3451     case GST_EVENT_EOS:
3452     {
3453       GstFlowReturn ret;
3454
3455       GST_PAD_PREROLL_LOCK (pad);
3456       if (G_UNLIKELY (basesink->flushing))
3457         goto flushing;
3458
3459       if (G_UNLIKELY (basesink->priv->received_eos)) {
3460         /* we can't accept anything when we are EOS */
3461         result = FALSE;
3462         gst_event_unref (event);
3463       } else {
3464         /* we set the received EOS flag here so that we can use it when testing if
3465          * we are prerolled and to refuse more buffers. */
3466         basesink->priv->received_eos = TRUE;
3467
3468         /* EOS is a prerollable object, we call the unlocked version because it
3469          * does not check the received_eos flag. */
3470         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3471             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3472         if (G_UNLIKELY (ret != GST_FLOW_OK))
3473           result = FALSE;
3474       }
3475       GST_PAD_PREROLL_UNLOCK (pad);
3476       break;
3477     }
3478     case GST_EVENT_NEWSEGMENT:
3479     {
3480       GstFlowReturn ret;
3481       gboolean update;
3482
3483       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3484
3485       GST_PAD_PREROLL_LOCK (pad);
3486       if (G_UNLIKELY (basesink->flushing))
3487         goto flushing;
3488
3489       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3490           NULL, NULL);
3491
3492       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3493         /* we can't accept anything when we are EOS */
3494         result = FALSE;
3495         gst_event_unref (event);
3496       } else {
3497         /* the new segment is a non prerollable item and does not block anything,
3498          * we need to configure the current clipping segment and insert the event
3499          * in the queue to serialize it with the buffers for rendering. */
3500         gst_base_sink_configure_segment (basesink, pad, event,
3501             basesink->abidata.ABI.clip_segment);
3502
3503         ret =
3504             gst_base_sink_queue_object_unlocked (basesink, pad,
3505             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3506         if (G_UNLIKELY (ret != GST_FLOW_OK))
3507           result = FALSE;
3508         else {
3509           GST_OBJECT_LOCK (basesink);
3510           basesink->have_newsegment = TRUE;
3511           GST_OBJECT_UNLOCK (basesink);
3512         }
3513       }
3514       GST_PAD_PREROLL_UNLOCK (pad);
3515       break;
3516     }
3517     case GST_EVENT_FLUSH_START:
3518       if (bclass->event)
3519         bclass->event (basesink, event);
3520
3521       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3522
3523       gst_base_sink_flush_start (basesink, pad);
3524
3525       gst_event_unref (event);
3526       break;
3527     case GST_EVENT_FLUSH_STOP:
3528       if (bclass->event)
3529         bclass->event (basesink, event);
3530
3531       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3532
3533       gst_base_sink_flush_stop (basesink, pad);
3534
3535       gst_event_unref (event);
3536       break;
3537     default:
3538       /* other events are sent to queue or subclass depending on if they
3539        * are serialized. */
3540       if (GST_EVENT_IS_SERIALIZED (event)) {
3541         gst_base_sink_queue_object (basesink, pad,
3542             GST_MINI_OBJECT_CAST (event), FALSE);
3543       } else {
3544         if (bclass->event)
3545           bclass->event (basesink, event);
3546         gst_event_unref (event);
3547       }
3548       break;
3549   }
3550 done:
3551   gst_object_unref (basesink);
3552
3553   return result;
3554
3555   /* ERRORS */
3556 flushing:
3557   {
3558     GST_DEBUG_OBJECT (basesink, "we are flushing");
3559     GST_PAD_PREROLL_UNLOCK (pad);
3560     result = FALSE;
3561     gst_event_unref (event);
3562     goto done;
3563   }
3564 }
3565
3566 /* default implementation to calculate the start and end
3567  * timestamps on a buffer, subclasses can override
3568  */
3569 static void
3570 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3571     GstClockTime * start, GstClockTime * end)
3572 {
3573   GstClockTime timestamp, duration;
3574
3575   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3576   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3577
3578     /* get duration to calculate end time */
3579     duration = GST_BUFFER_DURATION (buffer);
3580     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3581       *end = timestamp + duration;
3582     }
3583     *start = timestamp;
3584   }
3585 }
3586
3587 /* must be called with PREROLL_LOCK */
3588 static gboolean
3589 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3590 {
3591   gboolean is_prerolled, res;
3592
3593   /* we have 2 cases where the PREROLL_LOCK is released:
3594    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3595    *  2) we are syncing on the clock
3596    */
3597   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3598   res = !is_prerolled;
3599
3600   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3601       basesink->have_preroll, basesink->priv->received_eos, res);
3602
3603   return res;
3604 }
3605
3606 /* with STREAM_LOCK, PREROLL_LOCK
3607  *
3608  * Takes a buffer and compare the timestamps with the last segment.
3609  * If the buffer falls outside of the segment boundaries, drop it.
3610  * Else queue the buffer for preroll and rendering.
3611  *
3612  * This function takes ownership of the buffer.
3613  */
3614 static GstFlowReturn
3615 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3616     guint8 obj_type, gpointer obj)
3617 {
3618   GstBaseSinkClass *bclass;
3619   GstFlowReturn result;
3620   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3621   GstSegment *clip_segment;
3622   GstBuffer *time_buf;
3623
3624   if (G_UNLIKELY (basesink->flushing))
3625     goto flushing;
3626
3627   if (G_UNLIKELY (basesink->priv->received_eos))
3628     goto was_eos;
3629
3630   if (OBJ_IS_BUFFERLIST (obj_type)) {
3631     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3632     g_assert (NULL != time_buf);
3633   } else {
3634     time_buf = GST_BUFFER_CAST (obj);
3635   }
3636
3637   /* for code clarity */
3638   clip_segment = basesink->abidata.ABI.clip_segment;
3639
3640   if (G_UNLIKELY (!basesink->have_newsegment)) {
3641     gboolean sync;
3642
3643     sync = gst_base_sink_get_sync (basesink);
3644     if (sync) {
3645       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3646           (_("Internal data flow problem.")),
3647           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3648     }
3649
3650     /* this means this sink will assume timestamps start from 0 */
3651     GST_OBJECT_LOCK (basesink);
3652     clip_segment->start = 0;
3653     clip_segment->stop = -1;
3654     basesink->segment.start = 0;
3655     basesink->segment.stop = -1;
3656     basesink->have_newsegment = TRUE;
3657     GST_OBJECT_UNLOCK (basesink);
3658   }
3659
3660   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3661
3662   /* check if the buffer needs to be dropped, we first ask the subclass for the
3663    * start and end */
3664   if (bclass->get_times)
3665     bclass->get_times (basesink, time_buf, &start, &end);
3666
3667   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3668     /* if the subclass does not want sync, we use our own values so that we at
3669      * least clip the buffer to the segment */
3670     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3671   }
3672
3673   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3674       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3675
3676   /* a dropped buffer does not participate in anything */
3677   if (GST_CLOCK_TIME_IS_VALID (start) &&
3678       (clip_segment->format == GST_FORMAT_TIME)) {
3679     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3680                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3681       goto out_of_segment;
3682   }
3683
3684   /* now we can process the buffer in the queue, this function takes ownership
3685    * of the buffer */
3686   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3687       obj_type, obj, TRUE);
3688   return result;
3689
3690   /* ERRORS */
3691 flushing:
3692   {
3693     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3694     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3695     return GST_FLOW_WRONG_STATE;
3696   }
3697 was_eos:
3698   {
3699     GST_DEBUG_OBJECT (basesink,
3700         "we are EOS, dropping object, return UNEXPECTED");
3701     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3702     return GST_FLOW_UNEXPECTED;
3703   }
3704 out_of_segment:
3705   {
3706     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3707     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3708     return GST_FLOW_OK;
3709   }
3710 }
3711
3712 /* with STREAM_LOCK
3713  */
3714 static GstFlowReturn
3715 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3716     guint8 obj_type, gpointer obj)
3717 {
3718   GstFlowReturn result;
3719
3720   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3721     goto wrong_mode;
3722
3723   GST_PAD_PREROLL_LOCK (pad);
3724   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3725   GST_PAD_PREROLL_UNLOCK (pad);
3726
3727 done:
3728   return result;
3729
3730   /* ERRORS */
3731 wrong_mode:
3732   {
3733     GST_OBJECT_LOCK (pad);
3734     GST_WARNING_OBJECT (basesink,
3735         "Push on pad %s:%s, but it was not activated in push mode",
3736         GST_DEBUG_PAD_NAME (pad));
3737     GST_OBJECT_UNLOCK (pad);
3738     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3739     /* we don't post an error message this will signal to the peer
3740      * pushing that EOS is reached. */
3741     result = GST_FLOW_UNEXPECTED;
3742     goto done;
3743   }
3744 }
3745
3746 static GstFlowReturn
3747 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3748 {
3749   GstBaseSink *basesink;
3750
3751   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3752
3753   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3754 }
3755
3756 static GstFlowReturn
3757 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3758 {
3759   GstBaseSink *basesink;
3760   GstBaseSinkClass *bclass;
3761   GstFlowReturn result;
3762
3763   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3764   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3765
3766   if (G_LIKELY (bclass->render_list)) {
3767     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3768   } else {
3769     GstBufferListIterator *it;
3770     GstBuffer *group;
3771
3772     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3773
3774     it = gst_buffer_list_iterate (list);
3775
3776     if (gst_buffer_list_iterator_next_group (it)) {
3777       do {
3778         group = gst_buffer_list_iterator_merge_group (it);
3779         if (group == NULL) {
3780           group = gst_buffer_new ();
3781           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3782         } else {
3783           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3784         }
3785         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3786       } while (result == GST_FLOW_OK
3787           && gst_buffer_list_iterator_next_group (it));
3788     } else {
3789       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3790       result =
3791           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3792           gst_buffer_new ());
3793     }
3794     gst_buffer_list_iterator_free (it);
3795     gst_buffer_list_unref (list);
3796   }
3797   return result;
3798 }
3799
3800
3801 static gboolean
3802 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3803 {
3804   gboolean res = TRUE;
3805
3806   /* update our offset if the start/stop position was updated */
3807   if (segment->format == GST_FORMAT_BYTES) {
3808     segment->time = segment->start;
3809   } else if (segment->start == 0) {
3810     /* seek to start, we can implement a default for this. */
3811     segment->time = 0;
3812   } else {
3813     res = FALSE;
3814     GST_INFO_OBJECT (sink, "Can't do a default seek");
3815   }
3816
3817   return res;
3818 }
3819
3820 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3821
3822 static gboolean
3823 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3824     GstEvent * event, GstSegment * segment)
3825 {
3826   /* By default, we try one of 2 things:
3827    *   - For absolute seek positions, convert the requested position to our
3828    *     configured processing format and place it in the output segment \
3829    *   - For relative seek positions, convert our current (input) values to the
3830    *     seek format, adjust by the relative seek offset and then convert back to
3831    *     the processing format
3832    */
3833   GstSeekType cur_type, stop_type;
3834   gint64 cur, stop;
3835   GstSeekFlags flags;
3836   GstFormat seek_format, dest_format;
3837   gdouble rate;
3838   gboolean update;
3839   gboolean res = TRUE;
3840
3841   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3842       &cur_type, &cur, &stop_type, &stop);
3843   dest_format = segment->format;
3844
3845   if (seek_format == dest_format) {
3846     gst_segment_set_seek (segment, rate, seek_format, flags,
3847         cur_type, cur, stop_type, stop, &update);
3848     return TRUE;
3849   }
3850
3851   if (cur_type != GST_SEEK_TYPE_NONE) {
3852     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3853     res =
3854         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3855         &cur);
3856     cur_type = GST_SEEK_TYPE_SET;
3857   }
3858
3859   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3860     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3861     res =
3862         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3863         &stop);
3864     stop_type = GST_SEEK_TYPE_SET;
3865   }
3866
3867   /* And finally, configure our output segment in the desired format */
3868   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3869       stop_type, stop, &update);
3870
3871   if (!res)
3872     goto no_format;
3873
3874   return res;
3875
3876 no_format:
3877   {
3878     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3879     return FALSE;
3880   }
3881 }
3882
3883 /* perform a seek, only executed in pull mode */
3884 static gboolean
3885 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3886 {
3887   gboolean flush;
3888   gdouble rate;
3889   GstFormat seek_format, dest_format;
3890   GstSeekFlags flags;
3891   GstSeekType cur_type, stop_type;
3892   gboolean seekseg_configured = FALSE;
3893   gint64 cur, stop;
3894   gboolean update, res = TRUE;
3895   GstSegment seeksegment;
3896
3897   dest_format = sink->segment.format;
3898
3899   if (event) {
3900     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3901     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3902         &cur_type, &cur, &stop_type, &stop);
3903
3904     flush = flags & GST_SEEK_FLAG_FLUSH;
3905   } else {
3906     GST_DEBUG_OBJECT (sink, "performing seek without event");
3907     flush = FALSE;
3908   }
3909
3910   if (flush) {
3911     GST_DEBUG_OBJECT (sink, "flushing upstream");
3912     gst_pad_push_event (pad, gst_event_new_flush_start ());
3913     gst_base_sink_flush_start (sink, pad);
3914   } else {
3915     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3916   }
3917
3918   GST_PAD_STREAM_LOCK (pad);
3919
3920   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3921    * copy the current segment info into the temp segment that we can actually
3922    * attempt the seek with. We only update the real segment if the seek succeeds. */
3923   if (!seekseg_configured) {
3924     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3925
3926     /* now configure the final seek segment */
3927     if (event) {
3928       if (sink->segment.format != seek_format) {
3929         /* OK, here's where we give the subclass a chance to convert the relative
3930          * seek into an absolute one in the processing format. We set up any
3931          * absolute seek above, before taking the stream lock. */
3932         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3933                 &seeksegment)) {
3934           GST_DEBUG_OBJECT (sink,
3935               "Preparing the seek failed after flushing. " "Aborting seek");
3936           res = FALSE;
3937         }
3938       } else {
3939         /* The seek format matches our processing format, no need to ask the
3940          * the subclass to configure the segment. */
3941         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3942             cur_type, cur, stop_type, stop, &update);
3943       }
3944     }
3945     /* Else, no seek event passed, so we're just (re)starting the
3946        current segment. */
3947   }
3948
3949   if (res) {
3950     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3951         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3952         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3953
3954     /* do the seek, segment.last_stop contains the new position. */
3955     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3956   }
3957
3958
3959   if (flush) {
3960     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3961     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3962     gst_base_sink_flush_stop (sink, pad);
3963   } else if (res && sink->abidata.ABI.running) {
3964     /* we are running the current segment and doing a non-flushing seek,
3965      * close the segment first based on the last_stop. */
3966     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3967         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3968   }
3969
3970   /* The subclass must have converted the segment to the processing format
3971    * by now */
3972   if (res && seeksegment.format != dest_format) {
3973     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3974         "in the correct format. Aborting seek.");
3975     res = FALSE;
3976   }
3977
3978   /* if successful seek, we update our real segment and push
3979    * out the new segment. */
3980   if (res) {
3981     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3982
3983     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3984       gst_element_post_message (GST_ELEMENT (sink),
3985           gst_message_new_segment_start (GST_OBJECT (sink),
3986               sink->segment.format, sink->segment.last_stop));
3987     }
3988   }
3989
3990   sink->priv->discont = TRUE;
3991   sink->abidata.ABI.running = TRUE;
3992
3993   GST_PAD_STREAM_UNLOCK (pad);
3994
3995   return res;
3996 }
3997
3998 static void
3999 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
4000     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
4001     gboolean flush, gboolean intermediate)
4002 {
4003   GST_OBJECT_LOCK (sink);
4004   pending->seqnum = seqnum;
4005   pending->format = format;
4006   pending->amount = amount;
4007   pending->position = 0;
4008   pending->rate = rate;
4009   pending->flush = flush;
4010   pending->intermediate = intermediate;
4011   pending->valid = TRUE;
4012   /* flush invalidates the current stepping segment */
4013   if (flush)
4014     current->valid = FALSE;
4015   GST_OBJECT_UNLOCK (sink);
4016 }
4017
4018 static gboolean
4019 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
4020 {
4021   GstBaseSinkPrivate *priv;
4022   GstBaseSinkClass *bclass;
4023   gboolean flush, intermediate;
4024   gdouble rate;
4025   GstFormat format;
4026   guint64 amount;
4027   guint seqnum;
4028   GstStepInfo *pending, *current;
4029   GstMessage *message;
4030
4031   bclass = GST_BASE_SINK_GET_CLASS (sink);
4032   priv = sink->priv;
4033
4034   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
4035
4036   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
4037   seqnum = gst_event_get_seqnum (event);
4038
4039   pending = &priv->pending_step;
4040   current = &priv->current_step;
4041
4042   /* post message first */
4043   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4044       amount, rate, flush, intermediate);
4045   gst_message_set_seqnum (message, seqnum);
4046   gst_element_post_message (GST_ELEMENT (sink), message);
4047
4048   if (flush) {
4049     /* we need to call ::unlock before locking PREROLL_LOCK
4050      * since we lock it before going into ::render */
4051     if (bclass->unlock)
4052       bclass->unlock (sink);
4053
4054     GST_PAD_PREROLL_LOCK (sink->sinkpad);
4055     /* now that we have the PREROLL lock, clear our unlock request */
4056     if (bclass->unlock_stop)
4057       bclass->unlock_stop (sink);
4058
4059     /* update the stepinfo and make it valid */
4060     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4061         intermediate);
4062
4063     if (sink->priv->async_enabled) {
4064       /* and we need to commit our state again on the next
4065        * prerolled buffer */
4066       sink->playing_async = TRUE;
4067       priv->pending_step.need_preroll = TRUE;
4068       sink->need_preroll = FALSE;
4069       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4070     } else {
4071       sink->priv->have_latency = TRUE;
4072       sink->need_preroll = FALSE;
4073     }
4074     priv->current_sstart = GST_CLOCK_TIME_NONE;
4075     priv->current_sstop = GST_CLOCK_TIME_NONE;
4076     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4077     priv->call_preroll = TRUE;
4078     gst_base_sink_set_last_buffer (sink, NULL);
4079     gst_base_sink_reset_qos (sink);
4080
4081     if (sink->clock_id) {
4082       gst_clock_id_unschedule (sink->clock_id);
4083     }
4084
4085     if (sink->have_preroll) {
4086       GST_DEBUG_OBJECT (sink, "signal waiter");
4087       priv->step_unlock = TRUE;
4088       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4089     }
4090     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4091   } else {
4092     /* update the stepinfo and make it valid */
4093     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4094         intermediate);
4095   }
4096
4097   return TRUE;
4098 }
4099
4100 /* with STREAM_LOCK
4101  */
4102 static void
4103 gst_base_sink_loop (GstPad * pad)
4104 {
4105   GstBaseSink *basesink;
4106   GstBuffer *buf = NULL;
4107   GstFlowReturn result;
4108   guint blocksize;
4109   guint64 offset;
4110
4111   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4112
4113   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4114
4115   if ((blocksize = basesink->priv->blocksize) == 0)
4116     blocksize = -1;
4117
4118   offset = basesink->segment.last_stop;
4119
4120   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4121       offset, blocksize);
4122
4123   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4124   if (G_UNLIKELY (result != GST_FLOW_OK))
4125     goto paused;
4126
4127   if (G_UNLIKELY (buf == NULL))
4128     goto no_buffer;
4129
4130   offset += GST_BUFFER_SIZE (buf);
4131
4132   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4133
4134   GST_PAD_PREROLL_LOCK (pad);
4135   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4136   GST_PAD_PREROLL_UNLOCK (pad);
4137   if (G_UNLIKELY (result != GST_FLOW_OK))
4138     goto paused;
4139
4140   return;
4141
4142   /* ERRORS */
4143 paused:
4144   {
4145     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4146         gst_flow_get_name (result));
4147     gst_pad_pause_task (pad);
4148     if (result == GST_FLOW_UNEXPECTED) {
4149       /* perform EOS logic */
4150       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4151         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4152             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4153                 basesink->segment.format, basesink->segment.last_stop));
4154       } else {
4155         gst_base_sink_event (pad, gst_event_new_eos ());
4156       }
4157     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4158       /* for fatal errors we post an error message, post the error
4159        * first so the app knows about the error first. 
4160        * wrong-state is not a fatal error because it happens due to
4161        * flushing and posting an error message in that case is the
4162        * wrong thing to do, e.g. when basesrc is doing a flushing
4163        * seek. */
4164       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4165           (_("Internal data stream error.")),
4166           ("stream stopped, reason %s", gst_flow_get_name (result)));
4167       gst_base_sink_event (pad, gst_event_new_eos ());
4168     }
4169     return;
4170   }
4171 no_buffer:
4172   {
4173     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4174     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4175         (_("Internal data flow error.")), ("element returned NULL buffer"));
4176     result = GST_FLOW_ERROR;
4177     goto paused;
4178   }
4179 }
4180
4181 static gboolean
4182 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4183     gboolean flushing)
4184 {
4185   GstBaseSinkClass *bclass;
4186
4187   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4188
4189   if (flushing) {
4190     /* unlock any subclasses, we need to do this before grabbing the
4191      * PREROLL_LOCK since we hold this lock before going into ::render. */
4192     if (bclass->unlock)
4193       bclass->unlock (basesink);
4194   }
4195
4196   GST_PAD_PREROLL_LOCK (pad);
4197   basesink->flushing = flushing;
4198   if (flushing) {
4199     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4200     if (bclass->unlock_stop)
4201       bclass->unlock_stop (basesink);
4202
4203     /* set need_preroll before we unblock the clock. If the clock is unblocked
4204      * before timing out, we can reuse the buffer for preroll. */
4205     basesink->need_preroll = TRUE;
4206
4207     /* step 2, unblock clock sync (if any) or any other blocking thing */
4208     if (basesink->clock_id) {
4209       gst_clock_id_unschedule (basesink->clock_id);
4210     }
4211
4212     /* flush out the data thread if it's locked in finish_preroll, this will
4213      * also flush out the EOS state */
4214     GST_DEBUG_OBJECT (basesink,
4215         "flushing out data thread, need preroll to TRUE");
4216     gst_base_sink_preroll_queue_flush (basesink, pad);
4217   }
4218   GST_PAD_PREROLL_UNLOCK (pad);
4219
4220   return TRUE;
4221 }
4222
4223 static gboolean
4224 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4225 {
4226   gboolean result;
4227
4228   if (active) {
4229     /* start task */
4230     result = gst_pad_start_task (basesink->sinkpad,
4231         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4232   } else {
4233     /* step 2, make sure streaming finishes */
4234     result = gst_pad_stop_task (basesink->sinkpad);
4235   }
4236
4237   return result;
4238 }
4239
4240 static gboolean
4241 gst_base_sink_pad_activate (GstPad * pad)
4242 {
4243   gboolean result = FALSE;
4244   GstBaseSink *basesink;
4245
4246   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4247
4248   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4249
4250   gst_base_sink_set_flushing (basesink, pad, FALSE);
4251
4252   /* we need to have the pull mode enabled */
4253   if (!basesink->can_activate_pull) {
4254     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4255     goto fallback;
4256   }
4257
4258   /* check if downstreams supports pull mode at all */
4259   if (!gst_pad_check_pull_range (pad)) {
4260     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4261     goto fallback;
4262   }
4263
4264   /* set the pad mode before starting the task so that it's in the
4265    * correct state for the new thread. also the sink set_caps and get_caps
4266    * function checks this */
4267   basesink->pad_mode = GST_ACTIVATE_PULL;
4268
4269   /* we first try to negotiate a format so that when we try to activate
4270    * downstream, it knows about our format */
4271   if (!gst_base_sink_negotiate_pull (basesink)) {
4272     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4273     goto fallback;
4274   }
4275
4276   /* ok activate now */
4277   if (!gst_pad_activate_pull (pad, TRUE)) {
4278     /* clear any pending caps */
4279     GST_OBJECT_LOCK (basesink);
4280     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4281     GST_OBJECT_UNLOCK (basesink);
4282     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4283     goto fallback;
4284   }
4285
4286   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4287   result = TRUE;
4288   goto done;
4289
4290   /* push mode fallback */
4291 fallback:
4292   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4293   if ((result = gst_pad_activate_push (pad, TRUE))) {
4294     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4295   }
4296
4297 done:
4298   if (!result) {
4299     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4300     gst_base_sink_set_flushing (basesink, pad, TRUE);
4301   }
4302
4303   gst_object_unref (basesink);
4304
4305   return result;
4306 }
4307
4308 static gboolean
4309 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4310 {
4311   gboolean result;
4312   GstBaseSink *basesink;
4313
4314   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4315
4316   if (active) {
4317     if (!basesink->can_activate_push) {
4318       result = FALSE;
4319       basesink->pad_mode = GST_ACTIVATE_NONE;
4320     } else {
4321       result = TRUE;
4322       basesink->pad_mode = GST_ACTIVATE_PUSH;
4323     }
4324   } else {
4325     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4326       g_warning ("Internal GStreamer activation error!!!");
4327       result = FALSE;
4328     } else {
4329       gst_base_sink_set_flushing (basesink, pad, TRUE);
4330       result = TRUE;
4331       basesink->pad_mode = GST_ACTIVATE_NONE;
4332     }
4333   }
4334
4335   gst_object_unref (basesink);
4336
4337   return result;
4338 }
4339
4340 static gboolean
4341 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4342 {
4343   GstCaps *caps;
4344   gboolean result;
4345
4346   result = FALSE;
4347
4348   /* this returns the intersection between our caps and the peer caps. If there
4349    * is no peer, it returns NULL and we can't operate in pull mode so we can
4350    * fail the negotiation. */
4351   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4352   if (caps == NULL || gst_caps_is_empty (caps))
4353     goto no_caps_possible;
4354
4355   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4356
4357   caps = gst_caps_make_writable (caps);
4358   /* get the first (preferred) format */
4359   gst_caps_truncate (caps);
4360   /* try to fixate */
4361   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4362
4363   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4364
4365   if (gst_caps_is_any (caps)) {
4366     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4367         "allowing pull()");
4368     /* neither side has template caps in this case, so they are prepared for
4369        pull() without setcaps() */
4370     result = TRUE;
4371   } else if (gst_caps_is_fixed (caps)) {
4372     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4373       goto could_not_set_caps;
4374
4375     GST_OBJECT_LOCK (basesink);
4376     gst_caps_replace (&basesink->priv->pull_caps, caps);
4377     GST_OBJECT_UNLOCK (basesink);
4378
4379     result = TRUE;
4380   }
4381
4382   gst_caps_unref (caps);
4383
4384   return result;
4385
4386 no_caps_possible:
4387   {
4388     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4389     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4390     if (caps)
4391       gst_caps_unref (caps);
4392     return FALSE;
4393   }
4394 could_not_set_caps:
4395   {
4396     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4397     gst_caps_unref (caps);
4398     return FALSE;
4399   }
4400 }
4401
4402 /* this won't get called until we implement an activate function */
4403 static gboolean
4404 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4405 {
4406   gboolean result = FALSE;
4407   GstBaseSink *basesink;
4408   GstBaseSinkClass *bclass;
4409
4410   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4411   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4412
4413   if (active) {
4414     GstFormat format;
4415     gint64 duration;
4416
4417     /* we mark we have a newsegment here because pull based
4418      * mode works just fine without having a newsegment before the
4419      * first buffer */
4420     format = GST_FORMAT_BYTES;
4421
4422     gst_segment_init (&basesink->segment, format);
4423     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4424     GST_OBJECT_LOCK (basesink);
4425     basesink->have_newsegment = TRUE;
4426     GST_OBJECT_UNLOCK (basesink);
4427
4428     /* get the peer duration in bytes */
4429     result = gst_pad_query_peer_duration (pad, &format, &duration);
4430     if (result) {
4431       GST_DEBUG_OBJECT (basesink,
4432           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4433       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4434           duration);
4435       gst_segment_set_duration (&basesink->segment, format, duration);
4436     } else {
4437       GST_DEBUG_OBJECT (basesink, "unknown duration");
4438     }
4439
4440     if (bclass->activate_pull)
4441       result = bclass->activate_pull (basesink, TRUE);
4442     else
4443       result = FALSE;
4444
4445     if (!result)
4446       goto activate_failed;
4447
4448   } else {
4449     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4450       g_warning ("Internal GStreamer activation error!!!");
4451       result = FALSE;
4452     } else {
4453       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4454       if (bclass->activate_pull)
4455         result &= bclass->activate_pull (basesink, FALSE);
4456       basesink->pad_mode = GST_ACTIVATE_NONE;
4457       /* clear any pending caps */
4458       GST_OBJECT_LOCK (basesink);
4459       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4460       GST_OBJECT_UNLOCK (basesink);
4461     }
4462   }
4463   gst_object_unref (basesink);
4464
4465   return result;
4466
4467   /* ERRORS */
4468 activate_failed:
4469   {
4470     /* reset, as starting the thread failed */
4471     basesink->pad_mode = GST_ACTIVATE_NONE;
4472
4473     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4474     return FALSE;
4475   }
4476 }
4477
4478 /* send an event to our sinkpad peer. */
4479 static gboolean
4480 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4481 {
4482   GstPad *pad;
4483   GstBaseSink *basesink = GST_BASE_SINK (element);
4484   gboolean forward, result = TRUE;
4485   GstActivateMode mode;
4486
4487   GST_OBJECT_LOCK (element);
4488   /* get the pad and the scheduling mode */
4489   pad = gst_object_ref (basesink->sinkpad);
4490   mode = basesink->pad_mode;
4491   GST_OBJECT_UNLOCK (element);
4492
4493   /* only push UPSTREAM events upstream */
4494   forward = GST_EVENT_IS_UPSTREAM (event);
4495
4496   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4497       event);
4498
4499   switch (GST_EVENT_TYPE (event)) {
4500     case GST_EVENT_LATENCY:
4501     {
4502       GstClockTime latency;
4503
4504       gst_event_parse_latency (event, &latency);
4505
4506       /* store the latency. We use this to adjust the running_time before syncing
4507        * it to the clock. */
4508       GST_OBJECT_LOCK (element);
4509       basesink->priv->latency = latency;
4510       if (!basesink->priv->have_latency)
4511         forward = FALSE;
4512       GST_OBJECT_UNLOCK (element);
4513       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4514           GST_TIME_ARGS (latency));
4515
4516       /* We forward this event so that all elements know about the global pipeline
4517        * latency. This is interesting for an element when it wants to figure out
4518        * when a particular piece of data will be rendered. */
4519       break;
4520     }
4521     case GST_EVENT_SEEK:
4522       /* in pull mode we will execute the seek */
4523       if (mode == GST_ACTIVATE_PULL)
4524         result = gst_base_sink_perform_seek (basesink, pad, event);
4525       break;
4526     case GST_EVENT_STEP:
4527       result = gst_base_sink_perform_step (basesink, pad, event);
4528       forward = FALSE;
4529       break;
4530     default:
4531       break;
4532   }
4533
4534   if (forward) {
4535     result = gst_pad_push_event (pad, event);
4536   } else {
4537     /* not forwarded, unref the event */
4538     gst_event_unref (event);
4539   }
4540
4541   gst_object_unref (pad);
4542
4543   GST_DEBUG_OBJECT (basesink, "handled event %p %" GST_PTR_FORMAT ": %d", event,
4544       event, result);
4545
4546   return result;
4547 }
4548
4549 static gboolean
4550 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4551     gint64 * cur, gboolean * upstream)
4552 {
4553   GstClock *clock = NULL;
4554   gboolean res = FALSE;
4555   GstFormat oformat, tformat;
4556   GstSegment *segment;
4557   GstClockTime now, latency;
4558   GstClockTimeDiff base;
4559   gint64 time, accum, duration;
4560   gdouble rate;
4561   gint64 last;
4562   gboolean last_seen, with_clock, in_paused;
4563
4564   GST_OBJECT_LOCK (basesink);
4565   /* we can only get the segment when we are not NULL or READY */
4566   if (!basesink->have_newsegment)
4567     goto wrong_state;
4568
4569   in_paused = FALSE;
4570   /* when not in PLAYING or when we're busy with a state change, we
4571    * cannot read from the clock so we report time based on the
4572    * last seen timestamp. */
4573   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4574       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4575     in_paused = TRUE;
4576   }
4577
4578   /* we don't use the clip segment in pull mode, when seeking we update the
4579    * main segment directly with the new segment values without it having to be
4580    * activated by the rendering after preroll */
4581   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4582     segment = basesink->abidata.ABI.clip_segment;
4583   else
4584     segment = &basesink->segment;
4585
4586   /* our intermediate time format */
4587   tformat = GST_FORMAT_TIME;
4588   /* get the format in the segment */
4589   oformat = segment->format;
4590
4591   /* report with last seen position when EOS */
4592   last_seen = basesink->eos;
4593
4594   /* assume we will use the clock for getting the current position */
4595   with_clock = TRUE;
4596   if (basesink->sync == FALSE)
4597     with_clock = FALSE;
4598
4599   /* and we need a clock */
4600   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4601     with_clock = FALSE;
4602   else
4603     gst_object_ref (clock);
4604
4605   /* mainloop might be querying position when going to playing async,
4606    * while (audio) rendering might be quickly advancing stream position,
4607    * so use clock asap rather than last reported position */
4608   if (in_paused && with_clock && g_atomic_int_get (&basesink->priv->to_playing)) {
4609     GST_DEBUG_OBJECT (basesink, "going to PLAYING, so not PAUSED");
4610     in_paused = FALSE;
4611   }
4612
4613   /* collect all data we need holding the lock */
4614   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4615     time = segment->time;
4616   else
4617     time = 0;
4618
4619   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4620     duration = segment->stop - segment->start;
4621   else
4622     duration = 0;
4623
4624   accum = segment->accum;
4625   rate = segment->rate * segment->applied_rate;
4626   latency = basesink->priv->latency;
4627
4628   if (oformat == GST_FORMAT_TIME) {
4629     gint64 start, stop;
4630
4631     start = basesink->priv->current_sstart;
4632     stop = basesink->priv->current_sstop;
4633
4634     if (in_paused) {
4635       /* in paused we use the last position as a lower bound */
4636       if (stop == -1 || segment->rate > 0.0)
4637         last = start;
4638       else
4639         last = stop;
4640     } else {
4641       /* in playing, use last stop time as upper bound */
4642       if (start == -1 || segment->rate > 0.0)
4643         last = stop;
4644       else
4645         last = start;
4646     }
4647   } else {
4648     /* convert last stop to stream time */
4649     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4650   }
4651
4652   if (in_paused) {
4653     /* in paused, use start_time */
4654     base = GST_ELEMENT_START_TIME (basesink);
4655     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4656         GST_TIME_ARGS (base));
4657   } else if (with_clock) {
4658     /* else use clock when needed */
4659     base = GST_ELEMENT_CAST (basesink)->base_time;
4660     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4661         GST_TIME_ARGS (base));
4662   } else {
4663     /* else, no sync or clock -> no base time */
4664     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4665     base = -1;
4666   }
4667
4668   /* no base, we can't calculate running_time, use last seem timestamp to report
4669    * time */
4670   if (base == -1)
4671     last_seen = TRUE;
4672
4673   /* need to release the object lock before we can get the time,
4674    * a clock might take the LOCK of the provider, which could be
4675    * a basesink subclass. */
4676   GST_OBJECT_UNLOCK (basesink);
4677
4678   if (last_seen) {
4679     /* in EOS or when no valid stream_time, report the value of last seen
4680      * timestamp */
4681     if (last == -1) {
4682       /* no timestamp, we need to ask upstream */
4683       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4684       res = FALSE;
4685       *upstream = TRUE;
4686       goto done;
4687     }
4688     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4689         GST_TIME_ARGS (last));
4690     *cur = last;
4691   } else {
4692     if (oformat != tformat) {
4693       /* convert accum, time and duration to time */
4694       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4695               &accum))
4696         goto convert_failed;
4697       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4698               &tformat, &duration))
4699         goto convert_failed;
4700       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4701               &time))
4702         goto convert_failed;
4703       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4704               &last))
4705         goto convert_failed;
4706
4707       /* assume time format from now on */
4708       oformat = tformat;
4709     }
4710
4711     if (!in_paused && with_clock) {
4712       now = gst_clock_get_time (clock);
4713     } else {
4714       now = base;
4715       base = 0;
4716     }
4717
4718     /* subtract base time and accumulated time from the clock time.
4719      * Make sure we don't go negative. This is the current time in
4720      * the segment which we need to scale with the combined
4721      * rate and applied rate. */
4722     base += accum;
4723     base += latency;
4724     if (GST_CLOCK_DIFF (base, now) < 0)
4725       base = now;
4726
4727     /* for negative rates we need to count back from the segment
4728      * duration. */
4729     if (rate < 0.0)
4730       time += duration;
4731
4732     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4733
4734     if (in_paused) {
4735       /* never report less than segment values in paused */
4736       if (last != -1)
4737         *cur = MAX (last, *cur);
4738     } else {
4739       /* never report more than last seen position in playing */
4740       if (last != -1)
4741         *cur = MIN (last, *cur);
4742     }
4743
4744     GST_DEBUG_OBJECT (basesink,
4745         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4746         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4747         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4748         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4749   }
4750
4751   if (oformat != format) {
4752     /* convert to final format */
4753     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4754       goto convert_failed;
4755   }
4756
4757   res = TRUE;
4758
4759 done:
4760   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4761       res, GST_TIME_ARGS (*cur));
4762
4763   if (clock)
4764     gst_object_unref (clock);
4765
4766   return res;
4767
4768   /* special cases */
4769 wrong_state:
4770   {
4771     /* in NULL or READY we always return FALSE and -1 */
4772     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4773     res = FALSE;
4774     *cur = -1;
4775     GST_OBJECT_UNLOCK (basesink);
4776     goto done;
4777   }
4778 convert_failed:
4779   {
4780     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4781     *upstream = TRUE;
4782     res = FALSE;
4783     goto done;
4784   }
4785 }
4786
4787 static gboolean
4788 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4789     gint64 * dur, gboolean * upstream)
4790 {
4791   gboolean res = FALSE;
4792
4793   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4794     GstFormat uformat = GST_FORMAT_BYTES;
4795     gint64 uduration;
4796
4797     /* get the duration in bytes, in pull mode that's all we are sure to
4798      * know. We have to explicitly get this value from upstream instead of
4799      * using our cached value because it might change. Duration caching
4800      * should be done at a higher level. */
4801     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4802     if (res) {
4803       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4804       if (format != uformat) {
4805         /* convert to the requested format */
4806         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4807             &format, dur);
4808       } else {
4809         *dur = uduration;
4810       }
4811     }
4812     *upstream = FALSE;
4813   } else {
4814     *upstream = TRUE;
4815   }
4816
4817   return res;
4818 }
4819
4820 static const GstQueryType *
4821 gst_base_sink_get_query_types (GstElement * element)
4822 {
4823   static const GstQueryType query_types[] = {
4824     GST_QUERY_DURATION,
4825     GST_QUERY_POSITION,
4826     GST_QUERY_SEGMENT,
4827     GST_QUERY_LATENCY,
4828     0
4829   };
4830
4831   return query_types;
4832 }
4833
4834 static gboolean
4835 default_element_query (GstElement * element, GstQuery * query)
4836 {
4837   gboolean res = FALSE;
4838
4839   GstBaseSink *basesink = GST_BASE_SINK (element);
4840
4841   switch (GST_QUERY_TYPE (query)) {
4842     case GST_QUERY_POSITION:
4843     {
4844       gint64 cur = 0;
4845       GstFormat format;
4846       gboolean upstream = FALSE;
4847
4848       gst_query_parse_position (query, &format, NULL);
4849
4850       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4851           gst_format_get_name (format));
4852
4853       /* first try to get the position based on the clock */
4854       if ((res =
4855               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4856         gst_query_set_position (query, format, cur);
4857       } else if (upstream) {
4858         /* fallback to peer query */
4859         res = gst_pad_peer_query (basesink->sinkpad, query);
4860       }
4861       if (!res) {
4862         /* we can handle a few things if upstream failed */
4863         if (format == GST_FORMAT_PERCENT) {
4864           gint64 dur = 0;
4865           GstFormat uformat = GST_FORMAT_TIME;
4866
4867           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4868               &upstream);
4869           if (!res && upstream) {
4870             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4871                 &cur);
4872           }
4873           if (res) {
4874             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4875                 &upstream);
4876             if (!res && upstream) {
4877               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4878                   &dur);
4879             }
4880           }
4881           if (res) {
4882             gint64 pos;
4883
4884             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4885                 dur);
4886             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4887           }
4888         }
4889       }
4890       break;
4891     }
4892     case GST_QUERY_DURATION:
4893     {
4894       gint64 dur = 0;
4895       GstFormat format;
4896       gboolean upstream = FALSE;
4897
4898       gst_query_parse_duration (query, &format, NULL);
4899
4900       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4901           gst_format_get_name (format));
4902
4903       if ((res =
4904               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4905         gst_query_set_duration (query, format, dur);
4906       } else if (upstream) {
4907         /* fallback to peer query */
4908         res = gst_pad_peer_query (basesink->sinkpad, query);
4909       }
4910       if (!res) {
4911         /* we can handle a few things if upstream failed */
4912         if (format == GST_FORMAT_PERCENT) {
4913           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4914               GST_FORMAT_PERCENT_MAX);
4915           res = TRUE;
4916         }
4917       }
4918       break;
4919     }
4920     case GST_QUERY_LATENCY:
4921     {
4922       gboolean live, us_live;
4923       GstClockTime min, max;
4924
4925       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4926                   &max))) {
4927         gst_query_set_latency (query, live, min, max);
4928       }
4929       break;
4930     }
4931     case GST_QUERY_JITTER:
4932       break;
4933     case GST_QUERY_RATE:
4934       /* gst_query_set_rate (query, basesink->segment_rate); */
4935       res = TRUE;
4936       break;
4937     case GST_QUERY_SEGMENT:
4938     {
4939       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4940         gst_query_set_segment (query, basesink->segment.rate,
4941             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4942         res = TRUE;
4943       } else {
4944         res = gst_pad_peer_query (basesink->sinkpad, query);
4945       }
4946       break;
4947     }
4948     case GST_QUERY_SEEKING:
4949     case GST_QUERY_CONVERT:
4950     case GST_QUERY_FORMATS:
4951     default:
4952       res = gst_pad_peer_query (basesink->sinkpad, query);
4953       break;
4954   }
4955   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4956       GST_QUERY_TYPE_NAME (query), res);
4957   return res;
4958 }
4959
4960 static gboolean
4961 default_sink_query (GstBaseSink * basesink, GstQuery * query)
4962 {
4963   return gst_pad_query_default (basesink->sinkpad, query);
4964 }
4965
4966 static gboolean
4967 gst_base_sink_sink_query (GstPad * pad, GstQuery * query)
4968 {
4969   GstBaseSink *basesink;
4970   GstBaseSinkClass *bclass;
4971   gboolean res;
4972
4973   basesink = GST_BASE_SINK_CAST (gst_pad_get_parent (pad));
4974   if (G_UNLIKELY (basesink == NULL)) {
4975     gst_query_unref (query);
4976     return FALSE;
4977   }
4978
4979   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4980
4981   if (bclass->query)
4982     res = bclass->query (basesink, query);
4983   else
4984     res = FALSE;
4985
4986   gst_object_unref (basesink);
4987
4988   return res;
4989 }
4990
4991 static GstStateChangeReturn
4992 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4993 {
4994   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4995   GstBaseSink *basesink = GST_BASE_SINK (element);
4996   GstBaseSinkClass *bclass;
4997   GstBaseSinkPrivate *priv;
4998
4999   priv = basesink->priv;
5000
5001   bclass = GST_BASE_SINK_GET_CLASS (basesink);
5002
5003   switch (transition) {
5004     case GST_STATE_CHANGE_NULL_TO_READY:
5005       if (bclass->start)
5006         if (!bclass->start (basesink))
5007           goto start_failed;
5008       break;
5009     case GST_STATE_CHANGE_READY_TO_PAUSED:
5010       /* need to complete preroll before this state change completes, there
5011        * is no data flow in READY so we can safely assume we need to preroll. */
5012       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5013       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
5014       basesink->have_newsegment = FALSE;
5015       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
5016       gst_segment_init (basesink->abidata.ABI.clip_segment,
5017           GST_FORMAT_UNDEFINED);
5018       basesink->offset = 0;
5019       basesink->have_preroll = FALSE;
5020       priv->step_unlock = FALSE;
5021       basesink->need_preroll = TRUE;
5022       basesink->playing_async = TRUE;
5023       priv->current_sstart = GST_CLOCK_TIME_NONE;
5024       priv->current_sstop = GST_CLOCK_TIME_NONE;
5025       priv->eos_rtime = GST_CLOCK_TIME_NONE;
5026       priv->latency = 0;
5027       basesink->eos = FALSE;
5028       priv->received_eos = FALSE;
5029       gst_base_sink_reset_qos (basesink);
5030       priv->commited = FALSE;
5031       priv->call_preroll = TRUE;
5032       priv->current_step.valid = FALSE;
5033       priv->pending_step.valid = FALSE;
5034       if (priv->async_enabled) {
5035         GST_DEBUG_OBJECT (basesink, "doing async state change");
5036         /* when async enabled, post async-start message and return ASYNC from
5037          * the state change function */
5038         ret = GST_STATE_CHANGE_ASYNC;
5039         gst_element_post_message (GST_ELEMENT_CAST (basesink),
5040             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5041       } else {
5042         priv->have_latency = TRUE;
5043       }
5044       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5045       break;
5046     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5047       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5048       g_atomic_int_set (&basesink->priv->to_playing, TRUE);
5049       if (!gst_base_sink_needs_preroll (basesink)) {
5050         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
5051         /* no preroll needed anymore now. */
5052         basesink->playing_async = FALSE;
5053         basesink->need_preroll = FALSE;
5054         if (basesink->eos) {
5055           GstMessage *message;
5056
5057           /* need to post EOS message here */
5058           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
5059           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
5060           gst_message_set_seqnum (message, basesink->priv->seqnum);
5061           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
5062         } else {
5063           GST_DEBUG_OBJECT (basesink, "signal preroll");
5064           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
5065         }
5066       } else {
5067         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
5068         basesink->need_preroll = TRUE;
5069         basesink->playing_async = TRUE;
5070         priv->call_preroll = TRUE;
5071         priv->commited = FALSE;
5072         if (priv->async_enabled) {
5073           GST_DEBUG_OBJECT (basesink, "doing async state change");
5074           ret = GST_STATE_CHANGE_ASYNC;
5075           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5076               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5077         }
5078       }
5079       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5080       break;
5081     default:
5082       break;
5083   }
5084
5085   {
5086     GstStateChangeReturn bret;
5087
5088     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5089     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5090       goto activate_failed;
5091   }
5092
5093   switch (transition) {
5094     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
5095       /* completed transition, so need not be marked any longer
5096        * And it should be unmarked, since e.g. losing our position upon flush
5097        * does not really change state to PAUSED ... */
5098       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5099       break;
5100     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5101       g_atomic_int_set (&basesink->priv->to_playing, FALSE);
5102       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5103       /* FIXME, make sure we cannot enter _render first */
5104
5105       /* we need to call ::unlock before locking PREROLL_LOCK
5106        * since we lock it before going into ::render */
5107       if (bclass->unlock)
5108         bclass->unlock (basesink);
5109
5110       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5111       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5112       /* now that we have the PREROLL lock, clear our unlock request */
5113       if (bclass->unlock_stop)
5114         bclass->unlock_stop (basesink);
5115
5116       /* we need preroll again and we set the flag before unlocking the clockid
5117        * because if the clockid is unlocked before a current buffer expired, we
5118        * can use that buffer to preroll with */
5119       basesink->need_preroll = TRUE;
5120
5121       if (basesink->clock_id) {
5122         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5123         gst_clock_id_unschedule (basesink->clock_id);
5124       }
5125
5126       /* if we don't have a preroll buffer we need to wait for a preroll and
5127        * return ASYNC. */
5128       if (!gst_base_sink_needs_preroll (basesink)) {
5129         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5130         basesink->playing_async = FALSE;
5131       } else {
5132         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5133           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5134           ret = GST_STATE_CHANGE_SUCCESS;
5135         } else {
5136           GST_DEBUG_OBJECT (basesink,
5137               "PLAYING to PAUSED, we are not prerolled");
5138           basesink->playing_async = TRUE;
5139           priv->commited = FALSE;
5140           priv->call_preroll = TRUE;
5141           if (priv->async_enabled) {
5142             GST_DEBUG_OBJECT (basesink, "doing async state change");
5143             ret = GST_STATE_CHANGE_ASYNC;
5144             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5145                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5146                     FALSE));
5147           }
5148         }
5149       }
5150       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5151           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5152
5153       gst_base_sink_reset_qos (basesink);
5154       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5155       break;
5156     case GST_STATE_CHANGE_PAUSED_TO_READY:
5157       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5158       /* start by resetting our position state with the object lock so that the
5159        * position query gets the right idea. We do this before we post the
5160        * messages so that the message handlers pick this up. */
5161       GST_OBJECT_LOCK (basesink);
5162       basesink->have_newsegment = FALSE;
5163       priv->current_sstart = GST_CLOCK_TIME_NONE;
5164       priv->current_sstop = GST_CLOCK_TIME_NONE;
5165       priv->have_latency = FALSE;
5166       if (priv->cached_clock_id) {
5167         gst_clock_id_unref (priv->cached_clock_id);
5168         priv->cached_clock_id = NULL;
5169       }
5170       GST_OBJECT_UNLOCK (basesink);
5171
5172       gst_base_sink_set_last_buffer (basesink, NULL);
5173       priv->call_preroll = FALSE;
5174
5175       if (!priv->commited) {
5176         if (priv->async_enabled) {
5177           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5178
5179           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5180               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5181                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5182
5183           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5184               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5185         }
5186         priv->commited = TRUE;
5187       } else {
5188         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5189       }
5190       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5191       break;
5192     case GST_STATE_CHANGE_READY_TO_NULL:
5193       if (bclass->stop) {
5194         if (!bclass->stop (basesink)) {
5195           GST_WARNING_OBJECT (basesink, "failed to stop");
5196         }
5197       }
5198       gst_base_sink_set_last_buffer (basesink, NULL);
5199       priv->call_preroll = FALSE;
5200       break;
5201     default:
5202       break;
5203   }
5204
5205   return ret;
5206
5207   /* ERRORS */
5208 start_failed:
5209   {
5210     GST_DEBUG_OBJECT (basesink, "failed to start");
5211     return GST_STATE_CHANGE_FAILURE;
5212   }
5213 activate_failed:
5214   {
5215     GST_DEBUG_OBJECT (basesink,
5216         "element failed to change states -- activation problem?");
5217     return GST_STATE_CHANGE_FAILURE;
5218   }
5219 }