basesink: add new enable-last-buffer property.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199
200   /* EOS sync time in running time */
201   GstClockTime eos_rtime;
202
203   /* last buffer that arrived in time, running time */
204   GstClockTime last_in_time;
205   /* when the last buffer left the sink, running time */
206   GstClockTime last_left;
207
208   /* running averages go here these are done on running time */
209   GstClockTime avg_pt;
210   GstClockTime avg_duration;
211   gdouble avg_rate;
212
213   /* these are done on system time. avg_jitter and avg_render are
214    * compared to eachother to see if the rendering time takes a
215    * huge amount of the processing, If so we are flooded with
216    * buffers. */
217   GstClockTime last_left_systime;
218   GstClockTime avg_jitter;
219   GstClockTime start, stop;
220   GstClockTime avg_render;
221
222   /* number of rendered and dropped frames */
223   guint64 rendered;
224   guint64 dropped;
225
226   /* latency stuff */
227   GstClockTime latency;
228
229   /* if we already commited the state */
230   gboolean commited;
231
232   /* when we received EOS */
233   gboolean received_eos;
234
235   /* when we are prerolled and able to report latency */
236   gboolean have_latency;
237
238   /* the last buffer we prerolled or rendered. Useful for making snapshots */
239   gboolean enable_last_buffer;
240   GstBuffer *last_buffer;
241
242   /* caps for pull based scheduling */
243   GstCaps *pull_caps;
244
245   /* blocksize for pulling */
246   guint blocksize;
247
248   gboolean discont;
249
250   /* seqnum of the stream */
251   guint32 seqnum;
252
253   gboolean call_preroll;
254   gboolean step_unlock;
255
256   /* we have a pending and a current step operation */
257   GstStepInfo current_step;
258   GstStepInfo pending_step;
259 };
260
261 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
262
263 /* generic running average, this has a neutral window size */
264 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
265
266 /* the windows for these running averages are experimentally obtained.
267  * possitive values get averaged more while negative values use a small
268  * window so we can react faster to badness. */
269 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
270 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
271
272 /* BaseSink properties */
273
274 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
275 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
276
277 #define DEFAULT_PREROLL_QUEUE_LEN   0
278 #define DEFAULT_SYNC                TRUE
279 #define DEFAULT_MAX_LATENESS        -1
280 #define DEFAULT_QOS                 FALSE
281 #define DEFAULT_ASYNC               TRUE
282 #define DEFAULT_TS_OFFSET           0
283 #define DEFAULT_BLOCKSIZE           4096
284 #define DEFAULT_RENDER_DELAY        0
285 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
286
287 enum
288 {
289   PROP_0,
290   PROP_PREROLL_QUEUE_LEN,
291   PROP_SYNC,
292   PROP_MAX_LATENESS,
293   PROP_QOS,
294   PROP_ASYNC,
295   PROP_TS_OFFSET,
296   PROP_ENABLE_LAST_BUFFER,
297   PROP_LAST_BUFFER,
298   PROP_BLOCKSIZE,
299   PROP_RENDER_DELAY,
300   PROP_LAST
301 };
302
303 static GstElementClass *parent_class = NULL;
304
305 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
306 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
307 static void gst_base_sink_finalize (GObject * object);
308
309 GType
310 gst_base_sink_get_type (void)
311 {
312   static volatile gsize base_sink_type = 0;
313
314   if (g_once_init_enter (&base_sink_type)) {
315     GType _type;
316     static const GTypeInfo base_sink_info = {
317       sizeof (GstBaseSinkClass),
318       NULL,
319       NULL,
320       (GClassInitFunc) gst_base_sink_class_init,
321       NULL,
322       NULL,
323       sizeof (GstBaseSink),
324       0,
325       (GInstanceInitFunc) gst_base_sink_init,
326     };
327
328     _type = g_type_register_static (GST_TYPE_ELEMENT,
329         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
330     g_once_init_leave (&base_sink_type, _type);
331   }
332   return base_sink_type;
333 }
334
335 static void gst_base_sink_set_property (GObject * object, guint prop_id,
336     const GValue * value, GParamSpec * pspec);
337 static void gst_base_sink_get_property (GObject * object, guint prop_id,
338     GValue * value, GParamSpec * pspec);
339
340 static gboolean gst_base_sink_send_event (GstElement * element,
341     GstEvent * event);
342 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
343
344 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
345 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
346 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
347     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
348 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
349     GstClockTime * start, GstClockTime * end);
350 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
351     GstPad * pad, gboolean flushing);
352 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
353     gboolean active);
354 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
355     GstSegment * segment);
356 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
357     GstEvent * event, GstSegment * segment);
358
359 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
360     GstStateChange transition);
361
362 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
363 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
364     GstBufferList * list);
365
366 static void gst_base_sink_loop (GstPad * pad);
367 static gboolean gst_base_sink_pad_activate (GstPad * pad);
368 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
369 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
370 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
371
372 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
373 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
374 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
375 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
376 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
377     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
378
379
380 /* check if an object was too late */
381 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
382     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
383     GstClockReturn status, GstClockTimeDiff jitter);
384 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
385     gboolean is_list, GstMiniObject * obj);
386
387 static void
388 gst_base_sink_class_init (GstBaseSinkClass * klass)
389 {
390   GObjectClass *gobject_class;
391   GstElementClass *gstelement_class;
392
393   gobject_class = G_OBJECT_CLASS (klass);
394   gstelement_class = GST_ELEMENT_CLASS (klass);
395
396   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
397       "basesink element");
398
399   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
400
401   parent_class = g_type_class_peek_parent (klass);
402
403   gobject_class->finalize = gst_base_sink_finalize;
404   gobject_class->set_property = gst_base_sink_set_property;
405   gobject_class->get_property = gst_base_sink_get_property;
406
407   /* FIXME, this next value should be configured using an event from the
408    * upstream element, ie, the BUFFER_SIZE event. */
409   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
410       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
411           "Number of buffers to queue during preroll", 0, G_MAXUINT,
412           DEFAULT_PREROLL_QUEUE_LEN,
413           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
414
415   g_object_class_install_property (gobject_class, PROP_SYNC,
416       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
417           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418
419   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
420       g_param_spec_int64 ("max-lateness", "Max Lateness",
421           "Maximum number of nanoseconds that a buffer can be late before it "
422           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
425   g_object_class_install_property (gobject_class, PROP_QOS,
426       g_param_spec_boolean ("qos", "Qos",
427           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
428           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429   /**
430    * GstBaseSink:async
431    *
432    * If set to #TRUE, the basesink will perform asynchronous state changes.
433    * When set to #FALSE, the sink will not signal the parent when it prerolls.
434    * Use this option when dealing with sparse streams or when synchronisation is
435    * not required.
436    *
437    * Since: 0.10.15
438    */
439   g_object_class_install_property (gobject_class, PROP_ASYNC,
440       g_param_spec_boolean ("async", "Async",
441           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443   /**
444    * GstBaseSink:ts-offset
445    *
446    * Controls the final synchronisation, a negative value will render the buffer
447    * earlier while a positive value delays playback. This property can be
448    * used to fix synchronisation in bad files.
449    *
450    * Since: 0.10.15
451    */
452   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
453       g_param_spec_int64 ("ts-offset", "TS Offset",
454           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
455           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456
457   /**
458    * GstBaseSink:enable-last-buffer
459    *
460    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
461    * reference to the last buffer arrived and the last-buffer property is always
462    * set to NULL. This can be useful if you need buffers to be released as soon
463    * as possible, eg. if you're using a buffer pool.
464    *
465    * Since: 0.10.30
466    */
467   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
468       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
469           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471
472   /**
473    * GstBaseSink:last-buffer
474    *
475    * The last buffer that arrived in the sink and was used for preroll or for
476    * rendering. This property can be used to generate thumbnails. This property
477    * can be NULL when the sink has not yet received a bufer.
478    *
479    * Since: 0.10.15
480    */
481   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
482       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
483           "The last buffer received in the sink", GST_TYPE_BUFFER,
484           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
485   /**
486    * GstBaseSink:blocksize
487    *
488    * The amount of bytes to pull when operating in pull mode.
489    *
490    * Since: 0.10.22
491    */
492   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
493       g_param_spec_uint ("blocksize", "Block size",
494           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
495           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496   /**
497    * GstBaseSink:render-delay
498    *
499    * The additional delay between synchronisation and actual rendering of the
500    * media. This property will add additional latency to the device in order to
501    * make other sinks compensate for the delay.
502    *
503    * Since: 0.10.22
504    */
505   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
506       g_param_spec_uint64 ("render-delay", "Render Delay",
507           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
508           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
509
510   gstelement_class->change_state =
511       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
512   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
513   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
514
515   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
516   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
517   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
518   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
519   klass->activate_pull =
520       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
521
522   /* Registering debug symbols for function pointers */
523   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
524   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
525   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
526   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
527   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
528   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
529   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
530   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
531   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
532   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
533 }
534
535 static GstCaps *
536 gst_base_sink_pad_getcaps (GstPad * pad)
537 {
538   GstBaseSinkClass *bclass;
539   GstBaseSink *bsink;
540   GstCaps *caps = NULL;
541
542   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
543   bclass = GST_BASE_SINK_GET_CLASS (bsink);
544
545   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
546     /* if we are operating in pull mode we only accept the negotiated caps */
547     GST_OBJECT_LOCK (pad);
548     if ((caps = GST_PAD_CAPS (pad)))
549       gst_caps_ref (caps);
550     GST_OBJECT_UNLOCK (pad);
551   }
552   if (caps == NULL) {
553     if (bclass->get_caps)
554       caps = bclass->get_caps (bsink);
555
556     if (caps == NULL) {
557       GstPadTemplate *pad_template;
558
559       pad_template =
560           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
561           "sink");
562       if (pad_template != NULL) {
563         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
564       }
565     }
566   }
567   gst_object_unref (bsink);
568
569   return caps;
570 }
571
572 static gboolean
573 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
574 {
575   GstBaseSinkClass *bclass;
576   GstBaseSink *bsink;
577   gboolean res = TRUE;
578
579   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
580   bclass = GST_BASE_SINK_GET_CLASS (bsink);
581
582   if (res && bclass->set_caps)
583     res = bclass->set_caps (bsink, caps);
584
585   gst_object_unref (bsink);
586
587   return res;
588 }
589
590 static void
591 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
592 {
593   GstBaseSinkClass *bclass;
594   GstBaseSink *bsink;
595
596   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
597   bclass = GST_BASE_SINK_GET_CLASS (bsink);
598
599   if (bclass->fixate)
600     bclass->fixate (bsink, caps);
601
602   gst_object_unref (bsink);
603 }
604
605 static GstFlowReturn
606 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
607     GstCaps * caps, GstBuffer ** buf)
608 {
609   GstBaseSinkClass *bclass;
610   GstBaseSink *bsink;
611   GstFlowReturn result = GST_FLOW_OK;
612
613   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
614   bclass = GST_BASE_SINK_GET_CLASS (bsink);
615
616   if (bclass->buffer_alloc)
617     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
618   else
619     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
620
621   gst_object_unref (bsink);
622
623   return result;
624 }
625
626 static void
627 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
628 {
629   GstPadTemplate *pad_template;
630   GstBaseSinkPrivate *priv;
631
632   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
633
634   pad_template =
635       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
636   g_return_if_fail (pad_template != NULL);
637
638   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
639
640   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
641   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
642   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
643   gst_pad_set_bufferalloc_function (basesink->sinkpad,
644       gst_base_sink_pad_buffer_alloc);
645   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
646   gst_pad_set_activatepush_function (basesink->sinkpad,
647       gst_base_sink_pad_activate_push);
648   gst_pad_set_activatepull_function (basesink->sinkpad,
649       gst_base_sink_pad_activate_pull);
650   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
651   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
652   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
653   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
654
655   basesink->pad_mode = GST_ACTIVATE_NONE;
656   basesink->preroll_queue = g_queue_new ();
657   basesink->abidata.ABI.clip_segment = gst_segment_new ();
658   priv->have_latency = FALSE;
659
660   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
661   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
662
663   basesink->sync = DEFAULT_SYNC;
664   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
665   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
666   priv->async_enabled = DEFAULT_ASYNC;
667   priv->ts_offset = DEFAULT_TS_OFFSET;
668   priv->render_delay = DEFAULT_RENDER_DELAY;
669   priv->blocksize = DEFAULT_BLOCKSIZE;
670   priv->enable_last_buffer = DEFAULT_ENABLE_LAST_BUFFER;
671
672   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
673 }
674
675 static void
676 gst_base_sink_finalize (GObject * object)
677 {
678   GstBaseSink *basesink;
679
680   basesink = GST_BASE_SINK (object);
681
682   g_queue_free (basesink->preroll_queue);
683   gst_segment_free (basesink->abidata.ABI.clip_segment);
684
685   G_OBJECT_CLASS (parent_class)->finalize (object);
686 }
687
688 /**
689  * gst_base_sink_set_sync:
690  * @sink: the sink
691  * @sync: the new sync value.
692  *
693  * Configures @sink to synchronize on the clock or not. When
694  * @sync is FALSE, incomming samples will be played as fast as
695  * possible. If @sync is TRUE, the timestamps of the incomming
696  * buffers will be used to schedule the exact render time of its
697  * contents.
698  *
699  * Since: 0.10.4
700  */
701 void
702 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
703 {
704   g_return_if_fail (GST_IS_BASE_SINK (sink));
705
706   GST_OBJECT_LOCK (sink);
707   sink->sync = sync;
708   GST_OBJECT_UNLOCK (sink);
709 }
710
711 /**
712  * gst_base_sink_get_sync:
713  * @sink: the sink
714  *
715  * Checks if @sink is currently configured to synchronize against the
716  * clock.
717  *
718  * Returns: TRUE if the sink is configured to synchronize against the clock.
719  *
720  * Since: 0.10.4
721  */
722 gboolean
723 gst_base_sink_get_sync (GstBaseSink * sink)
724 {
725   gboolean res;
726
727   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
728
729   GST_OBJECT_LOCK (sink);
730   res = sink->sync;
731   GST_OBJECT_UNLOCK (sink);
732
733   return res;
734 }
735
736 /**
737  * gst_base_sink_set_max_lateness:
738  * @sink: the sink
739  * @max_lateness: the new max lateness value.
740  *
741  * Sets the new max lateness value to @max_lateness. This value is
742  * used to decide if a buffer should be dropped or not based on the
743  * buffer timestamp and the current clock time. A value of -1 means
744  * an unlimited time.
745  *
746  * Since: 0.10.4
747  */
748 void
749 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
750 {
751   g_return_if_fail (GST_IS_BASE_SINK (sink));
752
753   GST_OBJECT_LOCK (sink);
754   sink->abidata.ABI.max_lateness = max_lateness;
755   GST_OBJECT_UNLOCK (sink);
756 }
757
758 /**
759  * gst_base_sink_get_max_lateness:
760  * @sink: the sink
761  *
762  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
763  * more details.
764  *
765  * Returns: The maximum time in nanoseconds that a buffer can be late
766  * before it is dropped and not rendered. A value of -1 means an
767  * unlimited time.
768  *
769  * Since: 0.10.4
770  */
771 gint64
772 gst_base_sink_get_max_lateness (GstBaseSink * sink)
773 {
774   gint64 res;
775
776   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
777
778   GST_OBJECT_LOCK (sink);
779   res = sink->abidata.ABI.max_lateness;
780   GST_OBJECT_UNLOCK (sink);
781
782   return res;
783 }
784
785 /**
786  * gst_base_sink_set_qos_enabled:
787  * @sink: the sink
788  * @enabled: the new qos value.
789  *
790  * Configures @sink to send Quality-of-Service events upstream.
791  *
792  * Since: 0.10.5
793  */
794 void
795 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
796 {
797   g_return_if_fail (GST_IS_BASE_SINK (sink));
798
799   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
800 }
801
802 /**
803  * gst_base_sink_is_qos_enabled:
804  * @sink: the sink
805  *
806  * Checks if @sink is currently configured to send Quality-of-Service events
807  * upstream.
808  *
809  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
810  *
811  * Since: 0.10.5
812  */
813 gboolean
814 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
815 {
816   gboolean res;
817
818   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
819
820   res = g_atomic_int_get (&sink->priv->qos_enabled);
821
822   return res;
823 }
824
825 /**
826  * gst_base_sink_set_async_enabled:
827  * @sink: the sink
828  * @enabled: the new async value.
829  *
830  * Configures @sink to perform all state changes asynchronusly. When async is
831  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
832  * preroll buffer. This feature is usefull if the sink does not synchronize
833  * against the clock or when it is dealing with sparse streams.
834  *
835  * Since: 0.10.15
836  */
837 void
838 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
839 {
840   g_return_if_fail (GST_IS_BASE_SINK (sink));
841
842   GST_PAD_PREROLL_LOCK (sink->sinkpad);
843   sink->priv->async_enabled = enabled;
844   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
845   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
846 }
847
848 /**
849  * gst_base_sink_is_async_enabled:
850  * @sink: the sink
851  *
852  * Checks if @sink is currently configured to perform asynchronous state
853  * changes to PAUSED.
854  *
855  * Returns: TRUE if the sink is configured to perform asynchronous state
856  * changes.
857  *
858  * Since: 0.10.15
859  */
860 gboolean
861 gst_base_sink_is_async_enabled (GstBaseSink * sink)
862 {
863   gboolean res;
864
865   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
866
867   GST_PAD_PREROLL_LOCK (sink->sinkpad);
868   res = sink->priv->async_enabled;
869   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
870
871   return res;
872 }
873
874 /**
875  * gst_base_sink_set_ts_offset:
876  * @sink: the sink
877  * @offset: the new offset
878  *
879  * Adjust the synchronisation of @sink with @offset. A negative value will
880  * render buffers earlier than their timestamp. A positive value will delay
881  * rendering. This function can be used to fix playback of badly timestamped
882  * buffers.
883  *
884  * Since: 0.10.15
885  */
886 void
887 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
888 {
889   g_return_if_fail (GST_IS_BASE_SINK (sink));
890
891   GST_OBJECT_LOCK (sink);
892   sink->priv->ts_offset = offset;
893   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
894   GST_OBJECT_UNLOCK (sink);
895 }
896
897 /**
898  * gst_base_sink_get_ts_offset:
899  * @sink: the sink
900  *
901  * Get the synchronisation offset of @sink.
902  *
903  * Returns: The synchronisation offset.
904  *
905  * Since: 0.10.15
906  */
907 GstClockTimeDiff
908 gst_base_sink_get_ts_offset (GstBaseSink * sink)
909 {
910   GstClockTimeDiff res;
911
912   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
913
914   GST_OBJECT_LOCK (sink);
915   res = sink->priv->ts_offset;
916   GST_OBJECT_UNLOCK (sink);
917
918   return res;
919 }
920
921 /**
922  * gst_base_sink_get_last_buffer:
923  * @sink: the sink
924  *
925  * Get the last buffer that arrived in the sink and was used for preroll or for
926  * rendering. This property can be used to generate thumbnails.
927  *
928  * The #GstCaps on the buffer can be used to determine the type of the buffer.
929  *
930  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
931  * NULL when no buffer has arrived in the sink yet or when the sink is not in
932  * PAUSED or PLAYING.
933  *
934  * Since: 0.10.15
935  */
936 GstBuffer *
937 gst_base_sink_get_last_buffer (GstBaseSink * sink)
938 {
939   GstBuffer *res;
940
941   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
942
943   GST_OBJECT_LOCK (sink);
944   if ((res = sink->priv->last_buffer))
945     gst_buffer_ref (res);
946   GST_OBJECT_UNLOCK (sink);
947
948   return res;
949 }
950
951 /* with OBJECT_LOCK */
952 static void
953 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
954 {
955   GstBuffer *old;
956
957   old = sink->priv->last_buffer;
958   if (G_LIKELY (old != buffer)) {
959     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
960     if (G_LIKELY (buffer))
961       gst_buffer_ref (buffer);
962     sink->priv->last_buffer = buffer;
963   } else {
964     old = NULL;
965   }
966   /* avoid unreffing with the lock because cleanup code might want to take the
967    * lock too */
968   if (G_LIKELY (old)) {
969     GST_OBJECT_UNLOCK (sink);
970     gst_buffer_unref (old);
971     GST_OBJECT_LOCK (sink);
972   }
973 }
974
975 static void
976 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
977 {
978   GST_OBJECT_LOCK (sink);
979   if (sink->priv->enable_last_buffer == FALSE)
980     goto out;
981
982   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
983
984 out:
985   GST_OBJECT_UNLOCK (sink);
986 }
987
988 /**
989  * gst_base_sink_get_latency:
990  * @sink: the sink
991  *
992  * Get the currently configured latency.
993  *
994  * Returns: The configured latency.
995  *
996  * Since: 0.10.12
997  */
998 GstClockTime
999 gst_base_sink_get_latency (GstBaseSink * sink)
1000 {
1001   GstClockTime res;
1002
1003   GST_OBJECT_LOCK (sink);
1004   res = sink->priv->latency;
1005   GST_OBJECT_UNLOCK (sink);
1006
1007   return res;
1008 }
1009
1010 /**
1011  * gst_base_sink_query_latency:
1012  * @sink: the sink
1013  * @live: if the sink is live
1014  * @upstream_live: if an upstream element is live
1015  * @min_latency: the min latency of the upstream elements
1016  * @max_latency: the max latency of the upstream elements
1017  *
1018  * Query the sink for the latency parameters. The latency will be queried from
1019  * the upstream elements. @live will be TRUE if @sink is configured to
1020  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1021  * element is live.
1022  *
1023  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1024  * for the latency introduced by the upstream elements by setting the
1025  * @min_latency to a strictly possitive value.
1026  *
1027  * This function is mostly used by subclasses.
1028  *
1029  * Returns: TRUE if the query succeeded.
1030  *
1031  * Since: 0.10.12
1032  */
1033 gboolean
1034 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1035     gboolean * upstream_live, GstClockTime * min_latency,
1036     GstClockTime * max_latency)
1037 {
1038   gboolean l, us_live, res, have_latency;
1039   GstClockTime min, max, render_delay;
1040   GstQuery *query;
1041   GstClockTime us_min, us_max;
1042
1043   /* we are live when we sync to the clock */
1044   GST_OBJECT_LOCK (sink);
1045   l = sink->sync;
1046   have_latency = sink->priv->have_latency;
1047   render_delay = sink->priv->render_delay;
1048   GST_OBJECT_UNLOCK (sink);
1049
1050   /* assume no latency */
1051   min = 0;
1052   max = -1;
1053   us_live = FALSE;
1054
1055   if (have_latency) {
1056     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1057     /* we are ready for a latency query this is when we preroll or when we are
1058      * not async. */
1059     query = gst_query_new_latency ();
1060
1061     /* ask the peer for the latency */
1062     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1063       /* get upstream min and max latency */
1064       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1065
1066       if (us_live) {
1067         /* upstream live, use its latency, subclasses should use these
1068          * values to create the complete latency. */
1069         min = us_min;
1070         max = us_max;
1071       }
1072       if (l) {
1073         /* we need to add the render delay if we are live */
1074         if (min != -1)
1075           min += render_delay;
1076         if (max != -1)
1077           max += render_delay;
1078       }
1079     }
1080     gst_query_unref (query);
1081   } else {
1082     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1083     res = FALSE;
1084   }
1085
1086   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1087   if (!res) {
1088     if (!l) {
1089       res = TRUE;
1090       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1091     } else {
1092       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1093     }
1094   }
1095
1096   if (res) {
1097     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1098         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1099         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1100
1101     if (live)
1102       *live = l;
1103     if (upstream_live)
1104       *upstream_live = us_live;
1105     if (min_latency)
1106       *min_latency = min;
1107     if (max_latency)
1108       *max_latency = max;
1109   }
1110   return res;
1111 }
1112
1113 /**
1114  * gst_base_sink_set_render_delay:
1115  * @sink: a #GstBaseSink
1116  * @delay: the new delay
1117  *
1118  * Set the render delay in @sink to @delay. The render delay is the time
1119  * between actual rendering of a buffer and its synchronisation time. Some
1120  * devices might delay media rendering which can be compensated for with this
1121  * function.
1122  *
1123  * After calling this function, this sink will report additional latency and
1124  * other sinks will adjust their latency to delay the rendering of their media.
1125  *
1126  * This function is usually called by subclasses.
1127  *
1128  * Since: 0.10.21
1129  */
1130 void
1131 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1132 {
1133   GstClockTime old_render_delay;
1134
1135   g_return_if_fail (GST_IS_BASE_SINK (sink));
1136
1137   GST_OBJECT_LOCK (sink);
1138   old_render_delay = sink->priv->render_delay;
1139   sink->priv->render_delay = delay;
1140   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1141       GST_TIME_ARGS (delay));
1142   GST_OBJECT_UNLOCK (sink);
1143
1144   if (delay != old_render_delay) {
1145     GST_DEBUG_OBJECT (sink, "posting latency changed");
1146     gst_element_post_message (GST_ELEMENT_CAST (sink),
1147         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1148   }
1149 }
1150
1151 /**
1152  * gst_base_sink_get_render_delay:
1153  * @sink: a #GstBaseSink
1154  *
1155  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1156  * information about the render delay.
1157  *
1158  * Returns: the render delay of @sink.
1159  *
1160  * Since: 0.10.21
1161  */
1162 GstClockTime
1163 gst_base_sink_get_render_delay (GstBaseSink * sink)
1164 {
1165   GstClockTimeDiff res;
1166
1167   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1168
1169   GST_OBJECT_LOCK (sink);
1170   res = sink->priv->render_delay;
1171   GST_OBJECT_UNLOCK (sink);
1172
1173   return res;
1174 }
1175
1176 /**
1177  * gst_base_sink_set_blocksize:
1178  * @sink: a #GstBaseSink
1179  * @blocksize: the blocksize in bytes
1180  *
1181  * Set the number of bytes that the sink will pull when it is operating in pull
1182  * mode.
1183  *
1184  * Since: 0.10.22
1185  */
1186 void
1187 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1188 {
1189   g_return_if_fail (GST_IS_BASE_SINK (sink));
1190
1191   GST_OBJECT_LOCK (sink);
1192   sink->priv->blocksize = blocksize;
1193   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1194   GST_OBJECT_UNLOCK (sink);
1195 }
1196
1197 /**
1198  * gst_base_sink_get_blocksize:
1199  * @sink: a #GstBaseSink
1200  *
1201  * Get the number of bytes that the sink will pull when it is operating in pull
1202  * mode.
1203  *
1204  * Returns: the number of bytes @sink will pull in pull mode.
1205  *
1206  * Since: 0.10.22
1207  */
1208 guint
1209 gst_base_sink_get_blocksize (GstBaseSink * sink)
1210 {
1211   guint res;
1212
1213   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1214
1215   GST_OBJECT_LOCK (sink);
1216   res = sink->priv->blocksize;
1217   GST_OBJECT_UNLOCK (sink);
1218
1219   return res;
1220 }
1221
1222 static void
1223 gst_base_sink_set_property (GObject * object, guint prop_id,
1224     const GValue * value, GParamSpec * pspec)
1225 {
1226   GstBaseSink *sink = GST_BASE_SINK (object);
1227
1228   switch (prop_id) {
1229     case PROP_PREROLL_QUEUE_LEN:
1230       /* preroll lock necessary to serialize with finish_preroll */
1231       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1232       sink->preroll_queue_max_len = g_value_get_uint (value);
1233       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1234       break;
1235     case PROP_SYNC:
1236       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1237       break;
1238     case PROP_MAX_LATENESS:
1239       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1240       break;
1241     case PROP_QOS:
1242       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1243       break;
1244     case PROP_ASYNC:
1245       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1246       break;
1247     case PROP_TS_OFFSET:
1248       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1249       break;
1250     case PROP_BLOCKSIZE:
1251       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1252       break;
1253     case PROP_RENDER_DELAY:
1254       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1255       break;
1256     case PROP_ENABLE_LAST_BUFFER:
1257     {
1258       gboolean enable;
1259       enable = g_value_get_boolean (value);
1260
1261       GST_OBJECT_LOCK (sink);
1262       if (enable != sink->priv->enable_last_buffer) {
1263         sink->priv->enable_last_buffer = enable;
1264         if (!enable)
1265           gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1266       }
1267       GST_OBJECT_UNLOCK (sink);
1268       break;
1269     }
1270     default:
1271       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1272       break;
1273   }
1274 }
1275
1276 static void
1277 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1278     GParamSpec * pspec)
1279 {
1280   GstBaseSink *sink = GST_BASE_SINK (object);
1281
1282   switch (prop_id) {
1283     case PROP_PREROLL_QUEUE_LEN:
1284       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1285       g_value_set_uint (value, sink->preroll_queue_max_len);
1286       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1287       break;
1288     case PROP_SYNC:
1289       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1290       break;
1291     case PROP_MAX_LATENESS:
1292       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1293       break;
1294     case PROP_QOS:
1295       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1296       break;
1297     case PROP_ASYNC:
1298       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1299       break;
1300     case PROP_TS_OFFSET:
1301       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1302       break;
1303     case PROP_LAST_BUFFER:
1304       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1305       break;
1306     case PROP_ENABLE_LAST_BUFFER:
1307       g_value_set_boolean (value, sink->priv->enable_last_buffer);
1308       break;
1309     case PROP_BLOCKSIZE:
1310       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1311       break;
1312     case PROP_RENDER_DELAY:
1313       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1314       break;
1315     default:
1316       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1317       break;
1318   }
1319 }
1320
1321
1322 static GstCaps *
1323 gst_base_sink_get_caps (GstBaseSink * sink)
1324 {
1325   return NULL;
1326 }
1327
1328 static gboolean
1329 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1330 {
1331   return TRUE;
1332 }
1333
1334 static GstFlowReturn
1335 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1336     GstCaps * caps, GstBuffer ** buf)
1337 {
1338   *buf = NULL;
1339   return GST_FLOW_OK;
1340 }
1341
1342 /* with PREROLL_LOCK, STREAM_LOCK */
1343 static void
1344 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1345 {
1346   GstMiniObject *obj;
1347
1348   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1349   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1350     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1351     gst_mini_object_unref (obj);
1352   }
1353   /* we can't have EOS anymore now */
1354   basesink->eos = FALSE;
1355   basesink->priv->received_eos = FALSE;
1356   basesink->have_preroll = FALSE;
1357   basesink->priv->step_unlock = FALSE;
1358   basesink->eos_queued = FALSE;
1359   basesink->preroll_queued = 0;
1360   basesink->buffers_queued = 0;
1361   basesink->events_queued = 0;
1362   /* can't report latency anymore until we preroll again */
1363   if (basesink->priv->async_enabled) {
1364     GST_OBJECT_LOCK (basesink);
1365     basesink->priv->have_latency = FALSE;
1366     GST_OBJECT_UNLOCK (basesink);
1367   }
1368   /* and signal any waiters now */
1369   GST_PAD_PREROLL_SIGNAL (pad);
1370 }
1371
1372 /* with STREAM_LOCK, configures given segment with the event information. */
1373 static void
1374 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1375     GstEvent * event, GstSegment * segment)
1376 {
1377   gboolean update;
1378   gdouble rate, arate;
1379   GstFormat format;
1380   gint64 start;
1381   gint64 stop;
1382   gint64 time;
1383
1384   /* the newsegment event is needed to bring the buffer timestamps to the
1385    * stream time and to drop samples outside of the playback segment. */
1386   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1387       &start, &stop, &time);
1388
1389   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1390    * We protect with the OBJECT_LOCK so that we can use the values to
1391    * safely answer a POSITION query. */
1392   GST_OBJECT_LOCK (basesink);
1393   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1394       stop, time);
1395
1396   if (format == GST_FORMAT_TIME) {
1397     GST_DEBUG_OBJECT (basesink,
1398         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1399         "format GST_FORMAT_TIME, "
1400         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1401         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1402         update, rate, arate, GST_TIME_ARGS (segment->start),
1403         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1404         GST_TIME_ARGS (segment->accum));
1405   } else {
1406     GST_DEBUG_OBJECT (basesink,
1407         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1408         "format %d, "
1409         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1410         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1411         segment->format, segment->start, segment->stop, segment->time,
1412         segment->accum);
1413   }
1414   GST_OBJECT_UNLOCK (basesink);
1415 }
1416
1417 /* with PREROLL_LOCK, STREAM_LOCK */
1418 static gboolean
1419 gst_base_sink_commit_state (GstBaseSink * basesink)
1420 {
1421   /* commit state and proceed to next pending state */
1422   GstState current, next, pending, post_pending;
1423   gboolean post_paused = FALSE;
1424   gboolean post_async_done = FALSE;
1425   gboolean post_playing = FALSE;
1426
1427   /* we are certainly not playing async anymore now */
1428   basesink->playing_async = FALSE;
1429
1430   GST_OBJECT_LOCK (basesink);
1431   current = GST_STATE (basesink);
1432   next = GST_STATE_NEXT (basesink);
1433   pending = GST_STATE_PENDING (basesink);
1434   post_pending = pending;
1435
1436   switch (pending) {
1437     case GST_STATE_PLAYING:
1438     {
1439       GstBaseSinkClass *bclass;
1440       GstStateChangeReturn ret;
1441
1442       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1443
1444       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1445
1446       basesink->need_preroll = FALSE;
1447       post_async_done = TRUE;
1448       basesink->priv->commited = TRUE;
1449       post_playing = TRUE;
1450       /* post PAUSED too when we were READY */
1451       if (current == GST_STATE_READY) {
1452         post_paused = TRUE;
1453       }
1454
1455       /* make sure we notify the subclass of async playing */
1456       if (bclass->async_play) {
1457         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1458         ret = bclass->async_play (basesink);
1459         if (ret == GST_STATE_CHANGE_FAILURE)
1460           goto async_failed;
1461       }
1462       break;
1463     }
1464     case GST_STATE_PAUSED:
1465       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1466       post_paused = TRUE;
1467       post_async_done = TRUE;
1468       basesink->priv->commited = TRUE;
1469       post_pending = GST_STATE_VOID_PENDING;
1470       break;
1471     case GST_STATE_READY:
1472     case GST_STATE_NULL:
1473       goto stopping;
1474     case GST_STATE_VOID_PENDING:
1475       goto nothing_pending;
1476     default:
1477       break;
1478   }
1479
1480   /* we can report latency queries now */
1481   basesink->priv->have_latency = TRUE;
1482
1483   GST_STATE (basesink) = pending;
1484   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1485   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1486   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1487   GST_OBJECT_UNLOCK (basesink);
1488
1489   if (post_paused) {
1490     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1491     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1492         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1493             current, next, post_pending));
1494   }
1495   if (post_async_done) {
1496     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1497     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1498         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1499   }
1500   if (post_playing) {
1501     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1502     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1503         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1504             next, pending, GST_STATE_VOID_PENDING));
1505   }
1506
1507   GST_STATE_BROADCAST (basesink);
1508
1509   return TRUE;
1510
1511 nothing_pending:
1512   {
1513     /* Depending on the state, set our vars. We get in this situation when the
1514      * state change function got a change to update the state vars before the
1515      * streaming thread did. This is fine but we need to make sure that we
1516      * update the need_preroll var since it was TRUE when we got here and might
1517      * become FALSE if we got to PLAYING. */
1518     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1519         gst_element_state_get_name (current));
1520     switch (current) {
1521       case GST_STATE_PLAYING:
1522         basesink->need_preroll = FALSE;
1523         break;
1524       case GST_STATE_PAUSED:
1525         basesink->need_preroll = TRUE;
1526         break;
1527       default:
1528         basesink->need_preroll = FALSE;
1529         basesink->flushing = TRUE;
1530         break;
1531     }
1532     /* we can report latency queries now */
1533     basesink->priv->have_latency = TRUE;
1534     GST_OBJECT_UNLOCK (basesink);
1535     return TRUE;
1536   }
1537 stopping:
1538   {
1539     /* app is going to READY */
1540     GST_DEBUG_OBJECT (basesink, "stopping");
1541     basesink->need_preroll = FALSE;
1542     basesink->flushing = TRUE;
1543     GST_OBJECT_UNLOCK (basesink);
1544     return FALSE;
1545   }
1546 async_failed:
1547   {
1548     GST_DEBUG_OBJECT (basesink, "async commit failed");
1549     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1550     GST_OBJECT_UNLOCK (basesink);
1551     return FALSE;
1552   }
1553 }
1554
1555 static void
1556 start_stepping (GstBaseSink * sink, GstSegment * segment,
1557     GstStepInfo * pending, GstStepInfo * current)
1558 {
1559   gint64 end;
1560   GstMessage *message;
1561
1562   GST_DEBUG_OBJECT (sink, "update pending step");
1563
1564   GST_OBJECT_LOCK (sink);
1565   memcpy (current, pending, sizeof (GstStepInfo));
1566   pending->valid = FALSE;
1567   GST_OBJECT_UNLOCK (sink);
1568
1569   /* post message first */
1570   message =
1571       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1572       current->amount, current->rate, current->flush, current->intermediate);
1573   gst_message_set_seqnum (message, current->seqnum);
1574   gst_element_post_message (GST_ELEMENT (sink), message);
1575
1576   /* get the running time of where we paused and remember it */
1577   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1578   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1579
1580   /* set the new rate for the remainder of the segment */
1581   current->start_rate = segment->rate;
1582   segment->rate *= current->rate;
1583   segment->abs_rate = ABS (segment->rate);
1584
1585   /* save values */
1586   if (segment->rate > 0.0)
1587     current->start_stop = segment->stop;
1588   else
1589     current->start_start = segment->start;
1590
1591   if (current->format == GST_FORMAT_TIME) {
1592     end = current->start + current->amount;
1593     if (!current->flush) {
1594       /* update the segment clipping regions for non-flushing seeks */
1595       if (segment->rate > 0.0) {
1596         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1597         segment->last_stop = segment->stop;
1598       } else {
1599         gint64 position;
1600
1601         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1602         segment->time = position;
1603         segment->start = position;
1604         segment->last_stop = position;
1605       }
1606     }
1607   }
1608
1609   GST_DEBUG_OBJECT (sink,
1610       "segment now rate %lf, applied rate %lf, "
1611       "format GST_FORMAT_TIME, "
1612       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1613       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1614       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1615       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1616       GST_TIME_ARGS (segment->accum));
1617
1618   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1619       GST_TIME_ARGS (current->start));
1620
1621   if (current->amount == -1) {
1622     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1623     current->valid = FALSE;
1624   } else {
1625     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1626         "rate: %f", current->amount, gst_format_get_name (current->format),
1627         current->rate);
1628   }
1629 }
1630
1631 static void
1632 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1633     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1634 {
1635   gint64 stop, position;
1636   GstMessage *message;
1637
1638   GST_DEBUG_OBJECT (sink, "step complete");
1639
1640   if (segment->rate > 0.0)
1641     stop = rstart;
1642   else
1643     stop = rstop;
1644
1645   GST_DEBUG_OBJECT (sink,
1646       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1647
1648   if (stop == -1)
1649     current->duration = current->position;
1650   else
1651     current->duration = stop - current->start;
1652
1653   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1654       GST_TIME_ARGS (current->duration));
1655
1656   position = current->start + current->duration;
1657
1658   /* now move the segment to the new running time */
1659   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1660
1661   if (current->flush) {
1662     /* and remove the accumulated time we flushed, start time did not change */
1663     segment->accum = current->start;
1664   } else {
1665     /* start time is now the stepped position */
1666     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1667   }
1668
1669   /* restore the previous rate */
1670   segment->rate = current->start_rate;
1671   segment->abs_rate = ABS (segment->rate);
1672
1673   if (segment->rate > 0.0)
1674     segment->stop = current->start_stop;
1675   else
1676     segment->start = current->start_start;
1677
1678   /* the clip segment is used for position report in paused... */
1679   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1680
1681   /* post the step done when we know the stepped duration in TIME */
1682   message =
1683       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1684       current->amount, current->rate, current->flush, current->intermediate,
1685       current->duration, eos);
1686   gst_message_set_seqnum (message, current->seqnum);
1687   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1688
1689   if (!current->intermediate)
1690     sink->need_preroll = current->need_preroll;
1691
1692   /* and the current step info finished and becomes invalid */
1693   current->valid = FALSE;
1694 }
1695
1696 static gboolean
1697 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1698     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1699     gint64 * rstop)
1700 {
1701   gboolean step_end = FALSE;
1702
1703   /* see if we need to skip this buffer because of stepping */
1704   switch (current->format) {
1705     case GST_FORMAT_TIME:
1706     {
1707       guint64 end;
1708       gint64 first, last;
1709
1710       if (segment->rate > 0.0) {
1711         if (segment->stop == *cstop)
1712           *rstop = *rstart + current->amount;
1713
1714         first = *rstart;
1715         last = *rstop;
1716       } else {
1717         if (segment->start == *cstart)
1718           *rstart = *rstop + current->amount;
1719
1720         first = *rstop;
1721         last = *rstart;
1722       }
1723
1724       end = current->start + current->amount;
1725       current->position = first - current->start;
1726
1727       if (G_UNLIKELY (segment->abs_rate != 1.0))
1728         current->position /= segment->abs_rate;
1729
1730       GST_DEBUG_OBJECT (sink,
1731           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1732           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1733       GST_DEBUG_OBJECT (sink,
1734           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1735           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1736           GST_TIME_ARGS (last - current->start),
1737           GST_TIME_ARGS (current->amount));
1738
1739       if ((current->flush && current->position >= current->amount)
1740           || last >= end) {
1741         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1742         step_end = TRUE;
1743         if (segment->rate > 0.0) {
1744           *rstart = end;
1745           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1746         } else {
1747           *rstop = end;
1748           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1749         }
1750       }
1751       GST_DEBUG_OBJECT (sink,
1752           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1753           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1754       GST_DEBUG_OBJECT (sink,
1755           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1756           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1757       break;
1758     }
1759     case GST_FORMAT_BUFFERS:
1760       GST_DEBUG_OBJECT (sink,
1761           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1762           current->position, current->amount);
1763
1764       if (current->position < current->amount) {
1765         current->position++;
1766       } else {
1767         step_end = TRUE;
1768       }
1769       break;
1770     case GST_FORMAT_DEFAULT:
1771     default:
1772       GST_DEBUG_OBJECT (sink,
1773           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1774           current->position, current->amount);
1775       break;
1776   }
1777   return step_end;
1778 }
1779
1780 /* with STREAM_LOCK, PREROLL_LOCK
1781  *
1782  * Returns TRUE if the object needs synchronisation and takes therefore
1783  * part in prerolling.
1784  *
1785  * rsstart/rsstop contain the start/stop in stream time.
1786  * rrstart/rrstop contain the start/stop in running time.
1787  */
1788 static gboolean
1789 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1790     GstClockTime * rsstart, GstClockTime * rsstop,
1791     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1792     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1793     gboolean * step_end)
1794 {
1795   GstBaseSinkClass *bclass;
1796   GstBuffer *buffer;
1797   GstClockTime start, stop;     /* raw start/stop timestamps */
1798   gint64 cstart, cstop;         /* clipped raw timestamps */
1799   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1800   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1801   GstFormat format;
1802   GstBaseSinkPrivate *priv;
1803   gboolean eos;
1804
1805   priv = basesink->priv;
1806
1807   /* start with nothing */
1808   start = stop = GST_CLOCK_TIME_NONE;
1809
1810   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1811     GstEvent *event = GST_EVENT_CAST (obj);
1812
1813     switch (GST_EVENT_TYPE (event)) {
1814         /* EOS event needs syncing */
1815       case GST_EVENT_EOS:
1816       {
1817         if (basesink->segment.rate >= 0.0) {
1818           sstart = sstop = priv->current_sstop;
1819           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1820             /* we have not seen a buffer yet, use the segment values */
1821             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1822                 basesink->segment.format, basesink->segment.stop);
1823           }
1824         } else {
1825           sstart = sstop = priv->current_sstart;
1826           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1827             /* we have not seen a buffer yet, use the segment values */
1828             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1829                 basesink->segment.format, basesink->segment.start);
1830           }
1831         }
1832
1833         rstart = rstop = priv->eos_rtime;
1834         *do_sync = rstart != -1;
1835         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1836             GST_TIME_ARGS (rstart));
1837         /* if we are stepping, we end now */
1838         *step_end = step->valid;
1839         eos = TRUE;
1840         goto eos_done;
1841       }
1842       default:
1843         /* other events do not need syncing */
1844         /* FIXME, maybe NEWSEGMENT might need synchronisation
1845          * since the POSITION query depends on accumulated times and
1846          * we cannot accumulate the current segment before the previous
1847          * one completed.
1848          */
1849         return FALSE;
1850     }
1851   }
1852
1853   eos = FALSE;
1854
1855   /* else do buffer sync code */
1856   buffer = GST_BUFFER_CAST (obj);
1857
1858   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1859
1860   /* just get the times to see if we need syncing, if the start returns -1 we
1861    * don't sync. */
1862   if (bclass->get_times)
1863     bclass->get_times (basesink, buffer, &start, &stop);
1864
1865   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1866     /* we don't need to sync but we still want to get the timestamps for
1867      * tracking the position */
1868     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1869     *do_sync = FALSE;
1870   } else {
1871     *do_sync = TRUE;
1872   }
1873
1874   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1875       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1876       GST_TIME_ARGS (stop), *do_sync);
1877
1878   /* collect segment and format for code clarity */
1879   format = segment->format;
1880
1881   /* no timestamp clipping if we did not get a TIME segment format */
1882   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1883     cstart = start;
1884     cstop = stop;
1885     /* do running and stream time in TIME format */
1886     format = GST_FORMAT_TIME;
1887     GST_LOG_OBJECT (basesink, "not time format, don't clip");
1888     goto do_times;
1889   }
1890
1891   /* clip, only when we know about time */
1892   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1893               (gint64) start, (gint64) stop, &cstart, &cstop))) {
1894     if (step->valid) {
1895       GST_DEBUG_OBJECT (basesink, "step out of segment");
1896       /* when we are stepping, pretend we're at the end of the segment */
1897       if (segment->rate > 0.0) {
1898         cstart = segment->stop;
1899         cstop = segment->stop;
1900       } else {
1901         cstart = segment->start;
1902         cstop = segment->start;
1903       }
1904       goto do_times;
1905     }
1906     goto out_of_segment;
1907   }
1908
1909   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1910     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1911         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1912         GST_TIME_ARGS (cstop));
1913   }
1914
1915   /* set last stop position */
1916   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1917     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1918   else
1919     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1920
1921 do_times:
1922   rstart = gst_segment_to_running_time (segment, format, cstart);
1923   rstop = gst_segment_to_running_time (segment, format, cstop);
1924
1925   if (G_UNLIKELY (step->valid)) {
1926     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
1927                 &rstart, &rstop))) {
1928       /* step is still busy, we discard data when we are flushing */
1929       *stepped = step->flush;
1930       GST_DEBUG_OBJECT (basesink, "stepping busy");
1931     }
1932   }
1933   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1934    * upstream is behaving very badly */
1935   sstart = gst_segment_to_stream_time (segment, format, cstart);
1936   sstop = gst_segment_to_stream_time (segment, format, cstop);
1937
1938 eos_done:
1939   /* eos_done label only called when doing EOS, we also stop stepping then */
1940   if (*step_end && step->flush) {
1941     GST_DEBUG_OBJECT (basesink, "flushing step ended");
1942     stop_stepping (basesink, segment, step, rstart, rstop, eos);
1943     *step_end = FALSE;
1944   }
1945
1946   /* save times */
1947   *rsstart = sstart;
1948   *rsstop = sstop;
1949   *rrstart = rstart;
1950   *rrstop = rstop;
1951
1952   /* buffers and EOS always need syncing and preroll */
1953   return TRUE;
1954
1955   /* special cases */
1956 out_of_segment:
1957   {
1958     /* we usually clip in the chain function already but stepping could cause
1959      * the segment to be updated later. we return FALSE so that we don't try
1960      * to sync on it. */
1961     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1962     return FALSE;
1963   }
1964 }
1965
1966 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1967  * adjust a timestamp with the latency and timestamp offset */
1968 static GstClockTime
1969 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1970 {
1971   GstClockTimeDiff ts_offset;
1972
1973   /* don't do anything funny with invalid timestamps */
1974   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1975     return time;
1976
1977   time += basesink->priv->latency;
1978
1979   /* apply offset, be carefull for underflows */
1980   ts_offset = basesink->priv->ts_offset;
1981   if (ts_offset < 0) {
1982     ts_offset = -ts_offset;
1983     if (ts_offset < time)
1984       time -= ts_offset;
1985     else
1986       time = 0;
1987   } else
1988     time += ts_offset;
1989
1990   return time;
1991 }
1992
1993 /**
1994  * gst_base_sink_wait_clock:
1995  * @sink: the sink
1996  * @time: the running_time to be reached
1997  * @jitter: the jitter to be filled with time diff (can be NULL)
1998  *
1999  * This function will block until @time is reached. It is usually called by
2000  * subclasses that use their own internal synchronisation.
2001  *
2002  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2003  * returned. Likewise, if synchronisation is disabled in the element or there
2004  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2005  *
2006  * This function should only be called with the PREROLL_LOCK held, like when
2007  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2008  * receiving a buffer in
2009  * the #GstBaseSinkClass.render() vmethod.
2010  *
2011  * The @time argument should be the running_time of when this method should
2012  * return and is not adjusted with any latency or offset configured in the
2013  * sink.
2014  *
2015  * Since 0.10.20
2016  *
2017  * Returns: #GstClockReturn
2018  */
2019 GstClockReturn
2020 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2021     GstClockTimeDiff * jitter)
2022 {
2023   GstClockID id;
2024   GstClockReturn ret;
2025   GstClock *clock;
2026   GstClockTime base_time;
2027
2028   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2029     goto invalid_time;
2030
2031   GST_OBJECT_LOCK (sink);
2032   if (G_UNLIKELY (!sink->sync))
2033     goto no_sync;
2034
2035   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2036     goto no_clock;
2037
2038   base_time = GST_ELEMENT_CAST (sink)->base_time;
2039   GST_LOG_OBJECT (sink,
2040       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2041       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2042
2043   /* add base_time to running_time to get the time against the clock */
2044   time += base_time;
2045
2046   id = gst_clock_new_single_shot_id (clock, time);
2047   GST_OBJECT_UNLOCK (sink);
2048
2049   /* A blocking wait is performed on the clock. We save the ClockID
2050    * so we can unlock the entry at any time. While we are blocking, we
2051    * release the PREROLL_LOCK so that other threads can interrupt the
2052    * entry. */
2053   sink->clock_id = id;
2054   /* release the preroll lock while waiting */
2055   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2056
2057   ret = gst_clock_id_wait (id, jitter);
2058
2059   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2060   gst_clock_id_unref (id);
2061   sink->clock_id = NULL;
2062
2063   return ret;
2064
2065   /* no syncing needed */
2066 invalid_time:
2067   {
2068     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2069     return GST_CLOCK_BADTIME;
2070   }
2071 no_sync:
2072   {
2073     GST_DEBUG_OBJECT (sink, "sync disabled");
2074     GST_OBJECT_UNLOCK (sink);
2075     return GST_CLOCK_BADTIME;
2076   }
2077 no_clock:
2078   {
2079     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2080     GST_OBJECT_UNLOCK (sink);
2081     return GST_CLOCK_BADTIME;
2082   }
2083 }
2084
2085 /**
2086  * gst_base_sink_wait_preroll:
2087  * @sink: the sink
2088  *
2089  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2090  * against the clock it must unblock when going from PLAYING to the PAUSED state
2091  * and call this method before continuing to render the remaining data.
2092  *
2093  * This function will block until a state change to PLAYING happens (in which
2094  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2095  * to a state change to READY or a FLUSH event (in which case this function
2096  * returns #GST_FLOW_WRONG_STATE).
2097  *
2098  * This function should only be called with the PREROLL_LOCK held, like in the
2099  * render function.
2100  *
2101  * Since: 0.10.11
2102  *
2103  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2104  * continue. Any other return value should be returned from the render vmethod.
2105  */
2106 GstFlowReturn
2107 gst_base_sink_wait_preroll (GstBaseSink * sink)
2108 {
2109   sink->have_preroll = TRUE;
2110   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2111   /* block until the state changes, or we get a flush, or something */
2112   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2113   sink->have_preroll = FALSE;
2114   if (G_UNLIKELY (sink->flushing))
2115     goto stopping;
2116   if (G_UNLIKELY (sink->priv->step_unlock))
2117     goto step_unlocked;
2118   GST_DEBUG_OBJECT (sink, "continue after preroll");
2119
2120   return GST_FLOW_OK;
2121
2122   /* ERRORS */
2123 stopping:
2124   {
2125     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2126     return GST_FLOW_WRONG_STATE;
2127   }
2128 step_unlocked:
2129   {
2130     sink->priv->step_unlock = FALSE;
2131     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2132     return GST_FLOW_STEP;
2133   }
2134 }
2135
2136 /**
2137  * gst_base_sink_do_preroll:
2138  * @sink: the sink
2139  * @obj: the object that caused the preroll
2140  *
2141  * If the @sink spawns its own thread for pulling buffers from upstream it
2142  * should call this method after it has pulled a buffer. If the element needed
2143  * to preroll, this function will perform the preroll and will then block
2144  * until the element state is changed.
2145  *
2146  * This function should be called with the PREROLL_LOCK held.
2147  *
2148  * Since 0.10.22
2149  *
2150  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2151  * continue. Any other return value should be returned from the render vmethod.
2152  */
2153 GstFlowReturn
2154 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2155 {
2156   GstFlowReturn ret;
2157
2158   while (G_UNLIKELY (sink->need_preroll)) {
2159     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2160
2161     ret = gst_base_sink_preroll_object (sink, FALSE, obj);
2162     if (ret != GST_FLOW_OK)
2163       goto preroll_failed;
2164
2165     /* need to recheck here because the commit state could have
2166      * made us not need the preroll anymore */
2167     if (G_LIKELY (sink->need_preroll)) {
2168       /* block until the state changes, or we get a flush, or something */
2169       ret = gst_base_sink_wait_preroll (sink);
2170       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2171         goto preroll_failed;
2172     }
2173   }
2174   return GST_FLOW_OK;
2175
2176   /* ERRORS */
2177 preroll_failed:
2178   {
2179     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
2180     return ret;
2181   }
2182 }
2183
2184 /**
2185  * gst_base_sink_wait_eos:
2186  * @sink: the sink
2187  * @time: the running_time to be reached
2188  * @jitter: the jitter to be filled with time diff (can be NULL)
2189  *
2190  * This function will block until @time is reached. It is usually called by
2191  * subclasses that use their own internal synchronisation but want to let the
2192  * EOS be handled by the base class.
2193  *
2194  * This function should only be called with the PREROLL_LOCK held, like when
2195  * receiving an EOS event in the ::event vmethod.
2196  *
2197  * The @time argument should be the running_time of when the EOS should happen
2198  * and will be adjusted with any latency and offset configured in the sink.
2199  *
2200  * Since 0.10.15
2201  *
2202  * Returns: #GstFlowReturn
2203  */
2204 GstFlowReturn
2205 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2206     GstClockTimeDiff * jitter)
2207 {
2208   GstClockReturn status;
2209   GstFlowReturn ret;
2210
2211   do {
2212     GstClockTime stime;
2213
2214     GST_DEBUG_OBJECT (sink, "checking preroll");
2215
2216     /* first wait for the playing state before we can continue */
2217     if (G_UNLIKELY (sink->need_preroll)) {
2218       ret = gst_base_sink_wait_preroll (sink);
2219       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2220         goto flushing;
2221     }
2222
2223     /* preroll done, we can sync since we are in PLAYING now. */
2224     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2225         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2226
2227     /* compensate for latency and ts_offset. We don't adjust for render delay
2228      * because we don't interact with the device on EOS normally. */
2229     stime = gst_base_sink_adjust_time (sink, time);
2230
2231     /* wait for the clock, this can be interrupted because we got shut down or
2232      * we PAUSED. */
2233     status = gst_base_sink_wait_clock (sink, stime, jitter);
2234
2235     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2236
2237     /* invalid time, no clock or sync disabled, just continue then */
2238     if (status == GST_CLOCK_BADTIME)
2239       break;
2240
2241     /* waiting could have been interrupted and we can be flushing now */
2242     if (G_UNLIKELY (sink->flushing))
2243       goto flushing;
2244
2245     /* retry if we got unscheduled, which means we did not reach the timeout
2246      * yet. if some other error occures, we continue. */
2247   } while (status == GST_CLOCK_UNSCHEDULED);
2248
2249   GST_DEBUG_OBJECT (sink, "end of stream");
2250
2251   return GST_FLOW_OK;
2252
2253   /* ERRORS */
2254 flushing:
2255   {
2256     GST_DEBUG_OBJECT (sink, "we are flushing");
2257     return GST_FLOW_WRONG_STATE;
2258   }
2259 }
2260
2261 /* with STREAM_LOCK, PREROLL_LOCK
2262  *
2263  * Make sure we are in PLAYING and synchronize an object to the clock.
2264  *
2265  * If we need preroll, we are not in PLAYING. We try to commit the state
2266  * if needed and then block if we still are not PLAYING.
2267  *
2268  * We start waiting on the clock in PLAYING. If we got interrupted, we
2269  * immediatly try to re-preroll.
2270  *
2271  * Some objects do not need synchronisation (most events) and so this function
2272  * immediatly returns GST_FLOW_OK.
2273  *
2274  * for objects that arrive later than max-lateness to be synchronized to the
2275  * clock have the @late boolean set to TRUE.
2276  *
2277  * This function keeps a running average of the jitter (the diff between the
2278  * clock time and the requested sync time). The jitter is negative for
2279  * objects that arrive in time and positive for late buffers.
2280  *
2281  * does not take ownership of obj.
2282  */
2283 static GstFlowReturn
2284 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2285     GstMiniObject * obj, gboolean * late, gboolean * step_end)
2286 {
2287   GstClockTimeDiff jitter = 0;
2288   gboolean syncable;
2289   GstClockReturn status = GST_CLOCK_OK;
2290   GstClockTime rstart, rstop, sstart, sstop, stime;
2291   gboolean do_sync;
2292   GstBaseSinkPrivate *priv;
2293   GstFlowReturn ret;
2294   GstStepInfo *current, *pending;
2295   gboolean stepped;
2296
2297   priv = basesink->priv;
2298
2299 do_step:
2300   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2301   do_sync = TRUE;
2302   stepped = FALSE;
2303
2304   priv->current_rstart = GST_CLOCK_TIME_NONE;
2305
2306   /* get stepping info */
2307   current = &priv->current_step;
2308   pending = &priv->pending_step;
2309
2310   /* get timing information for this object against the render segment */
2311   syncable = gst_base_sink_get_sync_times (basesink, obj,
2312       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2313       current, step_end);
2314
2315   if (G_UNLIKELY (stepped))
2316     goto step_skipped;
2317
2318   /* a syncable object needs to participate in preroll and
2319    * clocking. All buffers and EOS are syncable. */
2320   if (G_UNLIKELY (!syncable))
2321     goto not_syncable;
2322
2323   /* store timing info for current object */
2324   priv->current_rstart = rstart;
2325   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2326
2327   /* save sync time for eos when the previous object needed sync */
2328   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2329
2330 again:
2331   /* first do preroll, this makes sure we commit our state
2332    * to PAUSED and can continue to PLAYING. We cannot perform
2333    * any clock sync in PAUSED because there is no clock. */
2334   ret = gst_base_sink_do_preroll (basesink, obj);
2335   if (G_UNLIKELY (ret != GST_FLOW_OK))
2336     goto preroll_failed;
2337
2338   /* update the segment with a pending step if the current one is invalid and we
2339    * have a new pending one. We only accept new step updates after a preroll */
2340   if (G_UNLIKELY (pending->valid && !current->valid)) {
2341     start_stepping (basesink, &basesink->segment, pending, current);
2342     goto do_step;
2343   }
2344
2345   /* After rendering we store the position of the last buffer so that we can use
2346    * it to report the position. We need to take the lock here. */
2347   GST_OBJECT_LOCK (basesink);
2348   priv->current_sstart = sstart;
2349   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2350   GST_OBJECT_UNLOCK (basesink);
2351
2352   if (!do_sync)
2353     goto done;
2354
2355   /* adjust for latency */
2356   stime = gst_base_sink_adjust_time (basesink, rstart);
2357
2358   /* adjust for render-delay, avoid underflows */
2359   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2360     if (stime > priv->render_delay)
2361       stime -= priv->render_delay;
2362     else
2363       stime = 0;
2364   }
2365
2366   /* preroll done, we can sync since we are in PLAYING now. */
2367   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2368       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2369       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2370
2371   /* This function will return immediatly if start == -1, no clock
2372    * or sync is disabled with GST_CLOCK_BADTIME. */
2373   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2374
2375   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %" GST_TIME_FORMAT,
2376       status, GST_TIME_ARGS (jitter));
2377
2378   /* invalid time, no clock or sync disabled, just render */
2379   if (status == GST_CLOCK_BADTIME)
2380     goto done;
2381
2382   /* waiting could have been interrupted and we can be flushing now */
2383   if (G_UNLIKELY (basesink->flushing))
2384     goto flushing;
2385
2386   /* check for unlocked by a state change, we are not flushing so
2387    * we can try to preroll on the current buffer. */
2388   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2389     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2390     priv->call_preroll = TRUE;
2391     goto again;
2392   }
2393
2394   /* successful syncing done, record observation */
2395   priv->current_jitter = jitter;
2396
2397   /* check if the object should be dropped */
2398   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2399       status, jitter);
2400
2401 done:
2402   return GST_FLOW_OK;
2403
2404   /* ERRORS */
2405 step_skipped:
2406   {
2407     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2408     *late = TRUE;
2409     return GST_FLOW_OK;
2410   }
2411 not_syncable:
2412   {
2413     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2414     return GST_FLOW_OK;
2415   }
2416 flushing:
2417   {
2418     GST_DEBUG_OBJECT (basesink, "we are flushing");
2419     return GST_FLOW_WRONG_STATE;
2420   }
2421 preroll_failed:
2422   {
2423     GST_DEBUG_OBJECT (basesink, "preroll failed");
2424     *step_end = FALSE;
2425     return ret;
2426   }
2427 }
2428
2429 static gboolean
2430 gst_base_sink_send_qos (GstBaseSink * basesink,
2431     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2432 {
2433   GstEvent *event;
2434   gboolean res;
2435
2436   /* generate Quality-of-Service event */
2437   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2438       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2439       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2440
2441   event = gst_event_new_qos (proportion, diff, time);
2442
2443   /* send upstream */
2444   res = gst_pad_push_event (basesink->sinkpad, event);
2445
2446   return res;
2447 }
2448
2449 static void
2450 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2451 {
2452   GstBaseSinkPrivate *priv;
2453   GstClockTime start, stop;
2454   GstClockTimeDiff jitter;
2455   GstClockTime pt, entered, left;
2456   GstClockTime duration;
2457   gdouble rate;
2458
2459   priv = sink->priv;
2460
2461   start = priv->current_rstart;
2462
2463   if (priv->current_step.valid)
2464     return;
2465
2466   /* if Quality-of-Service disabled, do nothing */
2467   if (!g_atomic_int_get (&priv->qos_enabled) ||
2468       !GST_CLOCK_TIME_IS_VALID (start))
2469     return;
2470
2471   stop = priv->current_rstop;
2472   jitter = priv->current_jitter;
2473
2474   if (jitter < 0) {
2475     /* this is the time the buffer entered the sink */
2476     if (start < -jitter)
2477       entered = 0;
2478     else
2479       entered = start + jitter;
2480     left = start;
2481   } else {
2482     /* this is the time the buffer entered the sink */
2483     entered = start + jitter;
2484     /* this is the time the buffer left the sink */
2485     left = start + jitter;
2486   }
2487
2488   /* calculate duration of the buffer */
2489   if (GST_CLOCK_TIME_IS_VALID (stop))
2490     duration = stop - start;
2491   else
2492     duration = GST_CLOCK_TIME_NONE;
2493
2494   /* if we have the time when the last buffer left us, calculate
2495    * processing time */
2496   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2497     if (entered > priv->last_left) {
2498       pt = entered - priv->last_left;
2499     } else {
2500       pt = 0;
2501     }
2502   } else {
2503     pt = priv->avg_pt;
2504   }
2505
2506   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2507       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2508       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2509       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2510       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2511       jitter);
2512
2513   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2514       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2515       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2516       priv->avg_rate);
2517
2518   /* collect running averages. for first observations, we copy the
2519    * values */
2520   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2521     priv->avg_duration = duration;
2522   else
2523     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2524
2525   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2526     priv->avg_pt = pt;
2527   else
2528     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2529
2530   if (priv->avg_duration != 0)
2531     rate =
2532         gst_guint64_to_gdouble (priv->avg_pt) /
2533         gst_guint64_to_gdouble (priv->avg_duration);
2534   else
2535     rate = 0.0;
2536
2537   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2538     if (dropped || priv->avg_rate < 0.0) {
2539       priv->avg_rate = rate;
2540     } else {
2541       if (rate > 1.0)
2542         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2543       else
2544         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2545     }
2546   }
2547
2548   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2549       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2550       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2551       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2552
2553
2554   if (priv->avg_rate >= 0.0) {
2555     /* if we have a valid rate, start sending QoS messages */
2556     if (priv->current_jitter < 0) {
2557       /* make sure we never go below 0 when adding the jitter to the
2558        * timestamp. */
2559       if (priv->current_rstart < -priv->current_jitter)
2560         priv->current_jitter = -priv->current_rstart;
2561     }
2562     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2563         priv->current_jitter);
2564   }
2565
2566   /* record when this buffer will leave us */
2567   priv->last_left = left;
2568 }
2569
2570 /* reset all qos measuring */
2571 static void
2572 gst_base_sink_reset_qos (GstBaseSink * sink)
2573 {
2574   GstBaseSinkPrivate *priv;
2575
2576   priv = sink->priv;
2577
2578   priv->last_in_time = GST_CLOCK_TIME_NONE;
2579   priv->last_left = GST_CLOCK_TIME_NONE;
2580   priv->avg_duration = GST_CLOCK_TIME_NONE;
2581   priv->avg_pt = GST_CLOCK_TIME_NONE;
2582   priv->avg_rate = -1.0;
2583   priv->avg_render = GST_CLOCK_TIME_NONE;
2584   priv->rendered = 0;
2585   priv->dropped = 0;
2586
2587 }
2588
2589 /* Checks if the object was scheduled too late.
2590  *
2591  * start/stop contain the raw timestamp start and stop values
2592  * of the object.
2593  *
2594  * status and jitter contain the return values from the clock wait.
2595  *
2596  * returns TRUE if the buffer was too late.
2597  */
2598 static gboolean
2599 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2600     GstClockTime start, GstClockTime stop,
2601     GstClockReturn status, GstClockTimeDiff jitter)
2602 {
2603   gboolean late;
2604   gint64 max_lateness;
2605   GstBaseSinkPrivate *priv;
2606
2607   priv = basesink->priv;
2608
2609   late = FALSE;
2610
2611   /* only for objects that were too late */
2612   if (G_LIKELY (status != GST_CLOCK_EARLY))
2613     goto in_time;
2614
2615   max_lateness = basesink->abidata.ABI.max_lateness;
2616
2617   /* check if frame dropping is enabled */
2618   if (max_lateness == -1)
2619     goto no_drop;
2620
2621   /* only check for buffers */
2622   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2623     goto not_buffer;
2624
2625   /* can't do check if we don't have a timestamp */
2626   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (start)))
2627     goto no_timestamp;
2628
2629   /* we can add a valid stop time */
2630   if (GST_CLOCK_TIME_IS_VALID (stop))
2631     max_lateness += stop;
2632   else
2633     max_lateness += start;
2634
2635   /* if the jitter bigger than duration and lateness we are too late */
2636   if ((late = start + jitter > max_lateness)) {
2637     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2638         "buffer is too late %" GST_TIME_FORMAT
2639         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2640         GST_TIME_ARGS (max_lateness));
2641     /* !!emergency!!, if we did not receive anything valid for more than a
2642      * second, render it anyway so the user sees something */
2643     if (GST_CLOCK_TIME_IS_VALID (priv->last_in_time) &&
2644         start - priv->last_in_time > GST_SECOND) {
2645       late = FALSE;
2646       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2647           (_("A lot of buffers are being dropped.")),
2648           ("There may be a timestamping problem, or this computer is too slow."));
2649       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2650           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2651           GST_TIME_ARGS (priv->last_in_time));
2652     }
2653   }
2654
2655 done:
2656   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_in_time)) {
2657     priv->last_in_time = start;
2658   }
2659   return late;
2660
2661   /* all is fine */
2662 in_time:
2663   {
2664     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2665     goto done;
2666   }
2667 no_drop:
2668   {
2669     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2670     goto done;
2671   }
2672 not_buffer:
2673   {
2674     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2675     return FALSE;
2676   }
2677 no_timestamp:
2678   {
2679     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2680     return FALSE;
2681   }
2682 }
2683
2684 /* called before and after calling the render vmethod. It keeps track of how
2685  * much time was spent in the render method and is used to check if we are
2686  * flooded */
2687 static void
2688 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2689 {
2690   GstBaseSinkPrivate *priv;
2691
2692   priv = basesink->priv;
2693
2694   if (start) {
2695     priv->start = gst_util_get_timestamp ();
2696   } else {
2697     GstClockTime elapsed;
2698
2699     priv->stop = gst_util_get_timestamp ();
2700
2701     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2702
2703     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2704       priv->avg_render = elapsed;
2705     else
2706       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2707
2708     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2709         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2710   }
2711 }
2712
2713 /* with STREAM_LOCK, PREROLL_LOCK,
2714  *
2715  * Synchronize the object on the clock and then render it.
2716  *
2717  * takes ownership of obj.
2718  */
2719 static GstFlowReturn
2720 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2721     gboolean is_list, gpointer obj)
2722 {
2723   GstFlowReturn ret;
2724   GstBaseSinkClass *bclass;
2725   gboolean late, step_end;
2726   gpointer sync_obj;
2727   GstBaseSinkPrivate *priv;
2728
2729   priv = basesink->priv;
2730
2731   if (is_list) {
2732     /*
2733      * If buffer list, use the first group buffer within the list
2734      * for syncing
2735      */
2736     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2737     g_assert (NULL != sync_obj);
2738   } else {
2739     sync_obj = obj;
2740   }
2741
2742 again:
2743   late = FALSE;
2744   step_end = FALSE;
2745
2746   /* synchronize this object, non syncable objects return OK
2747    * immediatly. */
2748   ret = gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end);
2749   if (G_UNLIKELY (ret != GST_FLOW_OK))
2750     goto sync_failed;
2751
2752   /* and now render, event or buffer/buffer list. */
2753   if (G_LIKELY (is_list || GST_IS_BUFFER (obj))) {
2754     /* drop late buffers unconditionally, let's hope it's unlikely */
2755     if (G_UNLIKELY (late))
2756       goto dropped;
2757
2758     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2759
2760     if (G_LIKELY ((is_list && bclass->render_list) ||
2761             (!is_list && bclass->render))) {
2762       gint do_qos;
2763
2764       /* read once, to get same value before and after */
2765       do_qos = g_atomic_int_get (&priv->qos_enabled);
2766
2767       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2768
2769       /* record rendering time for QoS and stats */
2770       if (do_qos)
2771         gst_base_sink_do_render_stats (basesink, TRUE);
2772
2773       if (!is_list) {
2774         GstBuffer *buf;
2775
2776         /* For buffer lists do not set last buffer. Creating buffer
2777          * with meaningful data can be done only with memcpy which will
2778          * significantly affect performance */
2779         buf = GST_BUFFER_CAST (obj);
2780         gst_base_sink_set_last_buffer (basesink, buf);
2781
2782         ret = bclass->render (basesink, buf);
2783       } else {
2784         GstBufferList *buflist;
2785
2786         buflist = GST_BUFFER_LIST_CAST (obj);
2787
2788         ret = bclass->render_list (basesink, buflist);
2789       }
2790
2791       if (do_qos)
2792         gst_base_sink_do_render_stats (basesink, FALSE);
2793
2794       if (ret == GST_FLOW_STEP)
2795         goto again;
2796
2797       if (G_UNLIKELY (basesink->flushing))
2798         goto flushing;
2799
2800       priv->rendered++;
2801     }
2802   } else if (G_LIKELY (GST_IS_EVENT (obj))) {
2803     GstEvent *event = GST_EVENT_CAST (obj);
2804     gboolean event_res = TRUE;
2805     GstEventType type;
2806
2807     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2808
2809     type = GST_EVENT_TYPE (event);
2810
2811     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2812         gst_event_type_get_name (type));
2813
2814     if (bclass->event)
2815       event_res = bclass->event (basesink, event);
2816
2817     /* when we get here we could be flushing again when the event handler calls
2818      * _wait_eos(). We have to ignore this object in that case. */
2819     if (G_UNLIKELY (basesink->flushing))
2820       goto flushing;
2821
2822     if (G_LIKELY (event_res)) {
2823       guint32 seqnum;
2824
2825       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2826       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2827
2828       switch (type) {
2829         case GST_EVENT_EOS:
2830         {
2831           GstMessage *message;
2832
2833           /* the EOS event is completely handled so we mark
2834            * ourselves as being in the EOS state. eos is also
2835            * protected by the object lock so we can read it when
2836            * answering the POSITION query. */
2837           GST_OBJECT_LOCK (basesink);
2838           basesink->eos = TRUE;
2839           GST_OBJECT_UNLOCK (basesink);
2840
2841           /* ok, now we can post the message */
2842           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2843
2844           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2845           gst_message_set_seqnum (message, seqnum);
2846           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2847           break;
2848         }
2849         case GST_EVENT_NEWSEGMENT:
2850           /* configure the segment */
2851           gst_base_sink_configure_segment (basesink, pad, event,
2852               &basesink->segment);
2853           break;
2854         case GST_EVENT_SINK_MESSAGE:{
2855           GstMessage *msg = NULL;
2856
2857           gst_event_parse_sink_message (event, &msg);
2858
2859           if (msg)
2860             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
2861         }
2862         default:
2863           break;
2864       }
2865     }
2866   } else {
2867     g_return_val_if_reached (GST_FLOW_ERROR);
2868   }
2869
2870 done:
2871   if (step_end) {
2872     /* the step ended, check if we need to activate a new step */
2873     GST_DEBUG_OBJECT (basesink, "step ended");
2874     stop_stepping (basesink, &basesink->segment, &priv->current_step,
2875         priv->current_rstart, priv->current_rstop, basesink->eos);
2876     goto again;
2877   }
2878
2879   gst_base_sink_perform_qos (basesink, late);
2880
2881   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2882   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
2883   return ret;
2884
2885   /* ERRORS */
2886 sync_failed:
2887   {
2888     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2889     goto done;
2890   }
2891 dropped:
2892   {
2893     priv->dropped++;
2894     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2895
2896     if (g_atomic_int_get (&priv->qos_enabled)) {
2897       GstMessage *qos_msg;
2898       GstClockTime timestamp, duration;
2899
2900       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
2901       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
2902
2903       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2904           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
2905           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
2906           GST_TIME_ARGS (priv->current_rstart),
2907           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
2908           GST_TIME_ARGS (duration));
2909       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2910           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
2911           priv->rendered, priv->dropped);
2912
2913       qos_msg =
2914           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
2915           priv->current_rstart, priv->current_sstart, timestamp, duration);
2916       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
2917           1000000);
2918       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
2919           priv->dropped);
2920       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
2921     }
2922     goto done;
2923   }
2924 flushing:
2925   {
2926     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2927     gst_mini_object_unref (obj);
2928     return GST_FLOW_WRONG_STATE;
2929   }
2930 }
2931
2932 /* with STREAM_LOCK, PREROLL_LOCK
2933  *
2934  * Perform preroll on the given object. For buffers this means
2935  * calling the preroll subclass method.
2936  * If that succeeds, the state will be commited.
2937  *
2938  * function does not take ownership of obj.
2939  */
2940 static GstFlowReturn
2941 gst_base_sink_preroll_object (GstBaseSink * basesink, gboolean is_list,
2942     GstMiniObject * obj)
2943 {
2944   GstFlowReturn ret;
2945
2946   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2947
2948   /* if it's a buffer, we need to call the preroll method */
2949   if (G_LIKELY (is_list || GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2950     GstBaseSinkClass *bclass;
2951     GstBuffer *buf;
2952     GstClockTime timestamp;
2953
2954     if (is_list) {
2955       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2956       g_assert (NULL != buf);
2957     } else {
2958       buf = GST_BUFFER_CAST (obj);
2959     }
2960
2961     timestamp = GST_BUFFER_TIMESTAMP (buf);
2962
2963     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2964         GST_TIME_ARGS (timestamp));
2965
2966     /*
2967      * For buffer lists do not set last buffer. Creating buffer
2968      * with meaningful data can be done only with memcpy which will
2969      * significantly affect performance
2970      */
2971     if (!is_list) {
2972       gst_base_sink_set_last_buffer (basesink, buf);
2973     }
2974
2975     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2976     if (bclass->preroll)
2977       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2978         goto preroll_failed;
2979
2980     basesink->priv->call_preroll = FALSE;
2981   }
2982
2983   /* commit state */
2984   if (G_LIKELY (basesink->playing_async)) {
2985     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2986       goto stopping;
2987   }
2988
2989   return GST_FLOW_OK;
2990
2991   /* ERRORS */
2992 preroll_failed:
2993   {
2994     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2995     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2996     return ret;
2997   }
2998 stopping:
2999   {
3000     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3001     return GST_FLOW_WRONG_STATE;
3002   }
3003 }
3004
3005 /* with STREAM_LOCK, PREROLL_LOCK
3006  *
3007  * Queue an object for rendering.
3008  * The first prerollable object queued will complete the preroll. If the
3009  * preroll queue if filled, we render all the objects in the queue.
3010  *
3011  * This function takes ownership of the object.
3012  */
3013 static GstFlowReturn
3014 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3015     gboolean is_list, gpointer obj, gboolean prerollable)
3016 {
3017   GstFlowReturn ret = GST_FLOW_OK;
3018   gint length;
3019   GQueue *q;
3020
3021   if (G_UNLIKELY (basesink->need_preroll)) {
3022     if (G_LIKELY (prerollable))
3023       basesink->preroll_queued++;
3024
3025     length = basesink->preroll_queued;
3026
3027     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3028
3029     /* first prerollable item needs to finish the preroll */
3030     if (length == 1) {
3031       ret = gst_base_sink_preroll_object (basesink, is_list, obj);
3032       if (G_UNLIKELY (ret != GST_FLOW_OK))
3033         goto preroll_failed;
3034     }
3035     /* need to recheck if we need preroll, commmit state during preroll
3036      * could have made us not need more preroll. */
3037     if (G_UNLIKELY (basesink->need_preroll)) {
3038       /* see if we can render now, if we can't add the object to the preroll
3039        * queue. */
3040       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3041         goto more_preroll;
3042     }
3043   }
3044   /* we can start rendering (or blocking) the queued object
3045    * if any. */
3046   q = basesink->preroll_queue;
3047   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3048     GstMiniObject *o;
3049
3050     o = g_queue_pop_head (q);
3051     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3052
3053     /* do something with the return value */
3054     ret = gst_base_sink_render_object (basesink, pad, FALSE, o);
3055     if (ret != GST_FLOW_OK)
3056       goto dequeue_failed;
3057   }
3058
3059   /* now render the object */
3060   ret = gst_base_sink_render_object (basesink, pad, is_list, obj);
3061   basesink->preroll_queued = 0;
3062
3063   return ret;
3064
3065   /* special cases */
3066 preroll_failed:
3067   {
3068     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3069         gst_flow_get_name (ret));
3070     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3071     return ret;
3072   }
3073 more_preroll:
3074   {
3075     /* add object to the queue and return */
3076     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3077         length, basesink->preroll_queue_max_len);
3078     g_queue_push_tail (basesink->preroll_queue, obj);
3079     return GST_FLOW_OK;
3080   }
3081 dequeue_failed:
3082   {
3083     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3084         gst_flow_get_name (ret));
3085     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3086     return ret;
3087   }
3088 }
3089
3090 /* with STREAM_LOCK
3091  *
3092  * This function grabs the PREROLL_LOCK and adds the object to
3093  * the queue.
3094  *
3095  * This function takes ownership of obj.
3096  */
3097 static GstFlowReturn
3098 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3099     GstMiniObject * obj, gboolean prerollable)
3100 {
3101   GstFlowReturn ret;
3102
3103   GST_PAD_PREROLL_LOCK (pad);
3104   if (G_UNLIKELY (basesink->flushing))
3105     goto flushing;
3106
3107   if (G_UNLIKELY (basesink->priv->received_eos))
3108     goto was_eos;
3109
3110   ret =
3111       gst_base_sink_queue_object_unlocked (basesink, pad, FALSE, obj,
3112       prerollable);
3113   GST_PAD_PREROLL_UNLOCK (pad);
3114
3115   return ret;
3116
3117   /* ERRORS */
3118 flushing:
3119   {
3120     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3121     GST_PAD_PREROLL_UNLOCK (pad);
3122     gst_mini_object_unref (obj);
3123     return GST_FLOW_WRONG_STATE;
3124   }
3125 was_eos:
3126   {
3127     GST_DEBUG_OBJECT (basesink,
3128         "we are EOS, dropping object, return UNEXPECTED");
3129     GST_PAD_PREROLL_UNLOCK (pad);
3130     gst_mini_object_unref (obj);
3131     return GST_FLOW_UNEXPECTED;
3132   }
3133 }
3134
3135 static void
3136 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3137 {
3138   /* make sure we are not blocked on the clock also clear any pending
3139    * eos state. */
3140   gst_base_sink_set_flushing (basesink, pad, TRUE);
3141
3142   /* we grab the stream lock but that is not needed since setting the
3143    * sink to flushing would make sure no state commit is being done
3144    * anymore */
3145   GST_PAD_STREAM_LOCK (pad);
3146   gst_base_sink_reset_qos (basesink);
3147   if (basesink->priv->async_enabled) {
3148     /* and we need to commit our state again on the next
3149      * prerolled buffer */
3150     basesink->playing_async = TRUE;
3151     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3152   } else {
3153     basesink->priv->have_latency = TRUE;
3154     basesink->need_preroll = FALSE;
3155   }
3156   gst_base_sink_set_last_buffer (basesink, NULL);
3157   GST_PAD_STREAM_UNLOCK (pad);
3158 }
3159
3160 static void
3161 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3162 {
3163   /* unset flushing so we can accept new data, this also flushes out any EOS
3164    * event. */
3165   gst_base_sink_set_flushing (basesink, pad, FALSE);
3166
3167   /* for position reporting */
3168   GST_OBJECT_LOCK (basesink);
3169   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3170   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3171   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3172   basesink->priv->call_preroll = TRUE;
3173   basesink->priv->current_step.valid = FALSE;
3174   basesink->priv->pending_step.valid = FALSE;
3175   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3176     /* we need new segment info after the flush. */
3177     basesink->have_newsegment = FALSE;
3178     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3179     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3180   }
3181   GST_OBJECT_UNLOCK (basesink);
3182 }
3183
3184 static gboolean
3185 gst_base_sink_event (GstPad * pad, GstEvent * event)
3186 {
3187   GstBaseSink *basesink;
3188   gboolean result = TRUE;
3189   GstBaseSinkClass *bclass;
3190
3191   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3192
3193   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3194
3195   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
3196       GST_EVENT_TYPE_NAME (event));
3197
3198   switch (GST_EVENT_TYPE (event)) {
3199     case GST_EVENT_EOS:
3200     {
3201       GstFlowReturn ret;
3202
3203       GST_PAD_PREROLL_LOCK (pad);
3204       if (G_UNLIKELY (basesink->flushing))
3205         goto flushing;
3206
3207       if (G_UNLIKELY (basesink->priv->received_eos)) {
3208         /* we can't accept anything when we are EOS */
3209         result = FALSE;
3210         gst_event_unref (event);
3211       } else {
3212         /* we set the received EOS flag here so that we can use it when testing if
3213          * we are prerolled and to refuse more buffers. */
3214         basesink->priv->received_eos = TRUE;
3215
3216         /* EOS is a prerollable object, we call the unlocked version because it
3217          * does not check the received_eos flag. */
3218         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3219             FALSE, GST_MINI_OBJECT_CAST (event), TRUE);
3220         if (G_UNLIKELY (ret != GST_FLOW_OK))
3221           result = FALSE;
3222       }
3223       GST_PAD_PREROLL_UNLOCK (pad);
3224       break;
3225     }
3226     case GST_EVENT_NEWSEGMENT:
3227     {
3228       GstFlowReturn ret;
3229       gboolean update;
3230
3231       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3232
3233       GST_PAD_PREROLL_LOCK (pad);
3234       if (G_UNLIKELY (basesink->flushing))
3235         goto flushing;
3236
3237       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3238           NULL, NULL);
3239
3240       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3241         /* we can't accept anything when we are EOS */
3242         result = FALSE;
3243         gst_event_unref (event);
3244       } else {
3245         /* the new segment is a non prerollable item and does not block anything,
3246          * we need to configure the current clipping segment and insert the event
3247          * in the queue to serialize it with the buffers for rendering. */
3248         gst_base_sink_configure_segment (basesink, pad, event,
3249             basesink->abidata.ABI.clip_segment);
3250
3251         ret =
3252             gst_base_sink_queue_object_unlocked (basesink, pad,
3253             FALSE, GST_MINI_OBJECT_CAST (event), FALSE);
3254         if (G_UNLIKELY (ret != GST_FLOW_OK))
3255           result = FALSE;
3256         else {
3257           GST_OBJECT_LOCK (basesink);
3258           basesink->have_newsegment = TRUE;
3259           GST_OBJECT_UNLOCK (basesink);
3260         }
3261       }
3262       GST_PAD_PREROLL_UNLOCK (pad);
3263       break;
3264     }
3265     case GST_EVENT_FLUSH_START:
3266       if (bclass->event)
3267         bclass->event (basesink, event);
3268
3269       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3270
3271       gst_base_sink_flush_start (basesink, pad);
3272
3273       gst_event_unref (event);
3274       break;
3275     case GST_EVENT_FLUSH_STOP:
3276       if (bclass->event)
3277         bclass->event (basesink, event);
3278
3279       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3280
3281       gst_base_sink_flush_stop (basesink, pad);
3282
3283       gst_event_unref (event);
3284       break;
3285     default:
3286       /* other events are sent to queue or subclass depending on if they
3287        * are serialized. */
3288       if (GST_EVENT_IS_SERIALIZED (event)) {
3289         gst_base_sink_queue_object (basesink, pad,
3290             GST_MINI_OBJECT_CAST (event), FALSE);
3291       } else {
3292         if (bclass->event)
3293           bclass->event (basesink, event);
3294         gst_event_unref (event);
3295       }
3296       break;
3297   }
3298 done:
3299   gst_object_unref (basesink);
3300
3301   return result;
3302
3303   /* ERRORS */
3304 flushing:
3305   {
3306     GST_DEBUG_OBJECT (basesink, "we are flushing");
3307     GST_PAD_PREROLL_UNLOCK (pad);
3308     result = FALSE;
3309     gst_event_unref (event);
3310     goto done;
3311   }
3312 }
3313
3314 /* default implementation to calculate the start and end
3315  * timestamps on a buffer, subclasses can override
3316  */
3317 static void
3318 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3319     GstClockTime * start, GstClockTime * end)
3320 {
3321   GstClockTime timestamp, duration;
3322
3323   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3324   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3325
3326     /* get duration to calculate end time */
3327     duration = GST_BUFFER_DURATION (buffer);
3328     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3329       *end = timestamp + duration;
3330     }
3331     *start = timestamp;
3332   }
3333 }
3334
3335 /* must be called with PREROLL_LOCK */
3336 static gboolean
3337 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3338 {
3339   gboolean is_prerolled, res;
3340
3341   /* we have 2 cases where the PREROLL_LOCK is released:
3342    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3343    *  2) we are syncing on the clock
3344    */
3345   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3346   res = !is_prerolled;
3347
3348   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3349       basesink->have_preroll, basesink->priv->received_eos, res);
3350
3351   return res;
3352 }
3353
3354 /* with STREAM_LOCK, PREROLL_LOCK
3355  *
3356  * Takes a buffer and compare the timestamps with the last segment.
3357  * If the buffer falls outside of the segment boundaries, drop it.
3358  * Else queue the buffer for preroll and rendering.
3359  *
3360  * This function takes ownership of the buffer.
3361  */
3362 static GstFlowReturn
3363 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3364     gboolean is_list, gpointer obj)
3365 {
3366   GstBaseSinkClass *bclass;
3367   GstFlowReturn result;
3368   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3369   GstSegment *clip_segment;
3370   GstBuffer *time_buf;
3371
3372   if (G_UNLIKELY (basesink->flushing))
3373     goto flushing;
3374
3375   if (G_UNLIKELY (basesink->priv->received_eos))
3376     goto was_eos;
3377
3378   if (is_list) {
3379     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3380     g_assert (NULL != time_buf);
3381   } else {
3382     time_buf = GST_BUFFER_CAST (obj);
3383   }
3384
3385   /* for code clarity */
3386   clip_segment = basesink->abidata.ABI.clip_segment;
3387
3388   if (G_UNLIKELY (!basesink->have_newsegment)) {
3389     gboolean sync;
3390
3391     sync = gst_base_sink_get_sync (basesink);
3392     if (sync) {
3393       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3394           (_("Internal data flow problem.")),
3395           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3396     }
3397
3398     /* this means this sink will assume timestamps start from 0 */
3399     GST_OBJECT_LOCK (basesink);
3400     clip_segment->start = 0;
3401     clip_segment->stop = -1;
3402     basesink->segment.start = 0;
3403     basesink->segment.stop = -1;
3404     basesink->have_newsegment = TRUE;
3405     GST_OBJECT_UNLOCK (basesink);
3406   }
3407
3408   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3409
3410   /* check if the buffer needs to be dropped, we first ask the subclass for the
3411    * start and end */
3412   if (bclass->get_times)
3413     bclass->get_times (basesink, time_buf, &start, &end);
3414
3415   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3416     /* if the subclass does not want sync, we use our own values so that we at
3417      * least clip the buffer to the segment */
3418     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3419   }
3420
3421   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3422       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3423
3424   /* a dropped buffer does not participate in anything */
3425   if (GST_CLOCK_TIME_IS_VALID (start) &&
3426       (clip_segment->format == GST_FORMAT_TIME)) {
3427     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3428                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3429       goto out_of_segment;
3430   }
3431
3432   /* now we can process the buffer in the queue, this function takes ownership
3433    * of the buffer */
3434   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3435       is_list, obj, TRUE);
3436   return result;
3437
3438   /* ERRORS */
3439 flushing:
3440   {
3441     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3442     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3443     return GST_FLOW_WRONG_STATE;
3444   }
3445 was_eos:
3446   {
3447     GST_DEBUG_OBJECT (basesink,
3448         "we are EOS, dropping object, return UNEXPECTED");
3449     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3450     return GST_FLOW_UNEXPECTED;
3451   }
3452 out_of_segment:
3453   {
3454     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3455     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3456     return GST_FLOW_OK;
3457   }
3458 }
3459
3460 /* with STREAM_LOCK
3461  */
3462 static GstFlowReturn
3463 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3464     gboolean is_list, gpointer obj)
3465 {
3466   GstFlowReturn result;
3467
3468   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3469     goto wrong_mode;
3470
3471   GST_PAD_PREROLL_LOCK (pad);
3472   result = gst_base_sink_chain_unlocked (basesink, pad, is_list, obj);
3473   GST_PAD_PREROLL_UNLOCK (pad);
3474
3475 done:
3476   return result;
3477
3478   /* ERRORS */
3479 wrong_mode:
3480   {
3481     GST_OBJECT_LOCK (pad);
3482     GST_WARNING_OBJECT (basesink,
3483         "Push on pad %s:%s, but it was not activated in push mode",
3484         GST_DEBUG_PAD_NAME (pad));
3485     GST_OBJECT_UNLOCK (pad);
3486     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3487     /* we don't post an error message this will signal to the peer
3488      * pushing that EOS is reached. */
3489     result = GST_FLOW_UNEXPECTED;
3490     goto done;
3491   }
3492 }
3493
3494 static GstFlowReturn
3495 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3496 {
3497   GstBaseSink *basesink;
3498
3499   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3500
3501   return gst_base_sink_chain_main (basesink, pad, FALSE, buf);
3502 }
3503
3504 static GstFlowReturn
3505 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3506 {
3507   GstBaseSink *basesink;
3508   GstBaseSinkClass *bclass;
3509   GstFlowReturn result;
3510
3511   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3512   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3513
3514   if (G_LIKELY (bclass->render_list)) {
3515     result = gst_base_sink_chain_main (basesink, pad, TRUE, list);
3516   } else {
3517     GstBufferListIterator *it;
3518     GstBuffer *group;
3519
3520     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3521
3522     it = gst_buffer_list_iterate (list);
3523
3524     if (gst_buffer_list_iterator_next_group (it)) {
3525       do {
3526         group = gst_buffer_list_iterator_merge_group (it);
3527         if (group == NULL) {
3528           group = gst_buffer_new ();
3529           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3530         } else {
3531           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3532         }
3533         result = gst_base_sink_chain_main (basesink, pad, FALSE, group);
3534       } while (result == GST_FLOW_OK
3535           && gst_buffer_list_iterator_next_group (it));
3536     } else {
3537       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3538       result =
3539           gst_base_sink_chain_main (basesink, pad, FALSE, gst_buffer_new ());
3540     }
3541     gst_buffer_list_iterator_free (it);
3542     gst_buffer_list_unref (list);
3543   }
3544   return result;
3545 }
3546
3547
3548 static gboolean
3549 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3550 {
3551   gboolean res = TRUE;
3552
3553   /* update our offset if the start/stop position was updated */
3554   if (segment->format == GST_FORMAT_BYTES) {
3555     segment->time = segment->start;
3556   } else if (segment->start == 0) {
3557     /* seek to start, we can implement a default for this. */
3558     segment->time = 0;
3559   } else {
3560     res = FALSE;
3561     GST_INFO_OBJECT (sink, "Can't do a default seek");
3562   }
3563
3564   return res;
3565 }
3566
3567 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3568
3569 static gboolean
3570 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3571     GstEvent * event, GstSegment * segment)
3572 {
3573   /* By default, we try one of 2 things:
3574    *   - For absolute seek positions, convert the requested position to our
3575    *     configured processing format and place it in the output segment \
3576    *   - For relative seek positions, convert our current (input) values to the
3577    *     seek format, adjust by the relative seek offset and then convert back to
3578    *     the processing format
3579    */
3580   GstSeekType cur_type, stop_type;
3581   gint64 cur, stop;
3582   GstSeekFlags flags;
3583   GstFormat seek_format, dest_format;
3584   gdouble rate;
3585   gboolean update;
3586   gboolean res = TRUE;
3587
3588   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3589       &cur_type, &cur, &stop_type, &stop);
3590   dest_format = segment->format;
3591
3592   if (seek_format == dest_format) {
3593     gst_segment_set_seek (segment, rate, seek_format, flags,
3594         cur_type, cur, stop_type, stop, &update);
3595     return TRUE;
3596   }
3597
3598   if (cur_type != GST_SEEK_TYPE_NONE) {
3599     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3600     res =
3601         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3602         &cur);
3603     cur_type = GST_SEEK_TYPE_SET;
3604   }
3605
3606   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3607     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3608     res =
3609         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3610         &stop);
3611     stop_type = GST_SEEK_TYPE_SET;
3612   }
3613
3614   /* And finally, configure our output segment in the desired format */
3615   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3616       stop_type, stop, &update);
3617
3618   if (!res)
3619     goto no_format;
3620
3621   return res;
3622
3623 no_format:
3624   {
3625     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3626     return FALSE;
3627   }
3628 }
3629
3630 /* perform a seek, only executed in pull mode */
3631 static gboolean
3632 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3633 {
3634   gboolean flush;
3635   gdouble rate;
3636   GstFormat seek_format, dest_format;
3637   GstSeekFlags flags;
3638   GstSeekType cur_type, stop_type;
3639   gboolean seekseg_configured = FALSE;
3640   gint64 cur, stop;
3641   gboolean update, res = TRUE;
3642   GstSegment seeksegment;
3643
3644   dest_format = sink->segment.format;
3645
3646   if (event) {
3647     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3648     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3649         &cur_type, &cur, &stop_type, &stop);
3650
3651     flush = flags & GST_SEEK_FLAG_FLUSH;
3652   } else {
3653     GST_DEBUG_OBJECT (sink, "performing seek without event");
3654     flush = FALSE;
3655   }
3656
3657   if (flush) {
3658     GST_DEBUG_OBJECT (sink, "flushing upstream");
3659     gst_pad_push_event (pad, gst_event_new_flush_start ());
3660     gst_base_sink_flush_start (sink, pad);
3661   } else {
3662     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3663   }
3664
3665   GST_PAD_STREAM_LOCK (pad);
3666
3667   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3668    * copy the current segment info into the temp segment that we can actually
3669    * attempt the seek with. We only update the real segment if the seek suceeds. */
3670   if (!seekseg_configured) {
3671     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3672
3673     /* now configure the final seek segment */
3674     if (event) {
3675       if (sink->segment.format != seek_format) {
3676         /* OK, here's where we give the subclass a chance to convert the relative
3677          * seek into an absolute one in the processing format. We set up any
3678          * absolute seek above, before taking the stream lock. */
3679         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3680                 &seeksegment)) {
3681           GST_DEBUG_OBJECT (sink,
3682               "Preparing the seek failed after flushing. " "Aborting seek");
3683           res = FALSE;
3684         }
3685       } else {
3686         /* The seek format matches our processing format, no need to ask the
3687          * the subclass to configure the segment. */
3688         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3689             cur_type, cur, stop_type, stop, &update);
3690       }
3691     }
3692     /* Else, no seek event passed, so we're just (re)starting the
3693        current segment. */
3694   }
3695
3696   if (res) {
3697     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3698         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3699         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3700
3701     /* do the seek, segment.last_stop contains the new position. */
3702     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3703   }
3704
3705
3706   if (flush) {
3707     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3708     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3709     gst_base_sink_flush_stop (sink, pad);
3710   } else if (res && sink->abidata.ABI.running) {
3711     /* we are running the current segment and doing a non-flushing seek,
3712      * close the segment first based on the last_stop. */
3713     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3714         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3715   }
3716
3717   /* The subclass must have converted the segment to the processing format
3718    * by now */
3719   if (res && seeksegment.format != dest_format) {
3720     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3721         "in the correct format. Aborting seek.");
3722     res = FALSE;
3723   }
3724
3725   /* if successfull seek, we update our real segment and push
3726    * out the new segment. */
3727   if (res) {
3728     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3729
3730     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3731       gst_element_post_message (GST_ELEMENT (sink),
3732           gst_message_new_segment_start (GST_OBJECT (sink),
3733               sink->segment.format, sink->segment.last_stop));
3734     }
3735   }
3736
3737   sink->priv->discont = TRUE;
3738   sink->abidata.ABI.running = TRUE;
3739
3740   GST_PAD_STREAM_UNLOCK (pad);
3741
3742   return res;
3743 }
3744
3745 static void
3746 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3747     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3748     gboolean flush, gboolean intermediate)
3749 {
3750   GST_OBJECT_LOCK (sink);
3751   pending->seqnum = seqnum;
3752   pending->format = format;
3753   pending->amount = amount;
3754   pending->position = 0;
3755   pending->rate = rate;
3756   pending->flush = flush;
3757   pending->intermediate = intermediate;
3758   pending->valid = TRUE;
3759   /* flush invalidates the current stepping segment */
3760   if (flush)
3761     current->valid = FALSE;
3762   GST_OBJECT_UNLOCK (sink);
3763 }
3764
3765 static gboolean
3766 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3767 {
3768   GstBaseSinkPrivate *priv;
3769   GstBaseSinkClass *bclass;
3770   gboolean flush, intermediate;
3771   gdouble rate;
3772   GstFormat format;
3773   guint64 amount;
3774   guint seqnum;
3775   GstStepInfo *pending, *current;
3776   GstMessage *message;
3777
3778   bclass = GST_BASE_SINK_GET_CLASS (sink);
3779   priv = sink->priv;
3780
3781   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3782
3783   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3784   seqnum = gst_event_get_seqnum (event);
3785
3786   pending = &priv->pending_step;
3787   current = &priv->current_step;
3788
3789   /* post message first */
3790   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
3791       amount, rate, flush, intermediate);
3792   gst_message_set_seqnum (message, seqnum);
3793   gst_element_post_message (GST_ELEMENT (sink), message);
3794
3795   if (flush) {
3796     /* we need to call ::unlock before locking PREROLL_LOCK
3797      * since we lock it before going into ::render */
3798     if (bclass->unlock)
3799       bclass->unlock (sink);
3800
3801     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3802     /* now that we have the PREROLL lock, clear our unlock request */
3803     if (bclass->unlock_stop)
3804       bclass->unlock_stop (sink);
3805
3806     /* update the stepinfo and make it valid */
3807     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3808         intermediate);
3809
3810     if (sink->priv->async_enabled) {
3811       /* and we need to commit our state again on the next
3812        * prerolled buffer */
3813       sink->playing_async = TRUE;
3814       priv->pending_step.need_preroll = TRUE;
3815       sink->need_preroll = FALSE;
3816       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3817     } else {
3818       sink->priv->have_latency = TRUE;
3819       sink->need_preroll = FALSE;
3820     }
3821     priv->current_sstart = GST_CLOCK_TIME_NONE;
3822     priv->current_sstop = GST_CLOCK_TIME_NONE;
3823     priv->eos_rtime = GST_CLOCK_TIME_NONE;
3824     priv->call_preroll = TRUE;
3825     gst_base_sink_set_last_buffer (sink, NULL);
3826     gst_base_sink_reset_qos (sink);
3827
3828     if (sink->clock_id) {
3829       gst_clock_id_unschedule (sink->clock_id);
3830     }
3831
3832     if (sink->have_preroll) {
3833       GST_DEBUG_OBJECT (sink, "signal waiter");
3834       priv->step_unlock = TRUE;
3835       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3836     }
3837     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3838   } else {
3839     /* update the stepinfo and make it valid */
3840     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
3841         intermediate);
3842   }
3843
3844   return TRUE;
3845 }
3846
3847 /* with STREAM_LOCK
3848  */
3849 static void
3850 gst_base_sink_loop (GstPad * pad)
3851 {
3852   GstBaseSink *basesink;
3853   GstBuffer *buf = NULL;
3854   GstFlowReturn result;
3855   guint blocksize;
3856   guint64 offset;
3857
3858   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3859
3860   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3861
3862   if ((blocksize = basesink->priv->blocksize) == 0)
3863     blocksize = -1;
3864
3865   offset = basesink->segment.last_stop;
3866
3867   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3868       offset, blocksize);
3869
3870   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3871   if (G_UNLIKELY (result != GST_FLOW_OK))
3872     goto paused;
3873
3874   if (G_UNLIKELY (buf == NULL))
3875     goto no_buffer;
3876
3877   offset += GST_BUFFER_SIZE (buf);
3878
3879   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3880
3881   GST_PAD_PREROLL_LOCK (pad);
3882   result = gst_base_sink_chain_unlocked (basesink, pad, FALSE, buf);
3883   GST_PAD_PREROLL_UNLOCK (pad);
3884   if (G_UNLIKELY (result != GST_FLOW_OK))
3885     goto paused;
3886
3887   return;
3888
3889   /* ERRORS */
3890 paused:
3891   {
3892     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3893         gst_flow_get_name (result));
3894     gst_pad_pause_task (pad);
3895     /* fatal errors and NOT_LINKED cause EOS */
3896     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3897       if (result == GST_FLOW_UNEXPECTED) {
3898         /* perform EOS logic */
3899         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3900           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3901               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3902                   basesink->segment.format, basesink->segment.last_stop));
3903         } else {
3904           gst_base_sink_event (pad, gst_event_new_eos ());
3905         }
3906       } else {
3907         /* for fatal errors we post an error message, post the error
3908          * first so the app knows about the error first. */
3909         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3910             (_("Internal data stream error.")),
3911             ("stream stopped, reason %s", gst_flow_get_name (result)));
3912         gst_base_sink_event (pad, gst_event_new_eos ());
3913       }
3914     }
3915     return;
3916   }
3917 no_buffer:
3918   {
3919     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3920     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3921         (_("Internal data flow error.")), ("element returned NULL buffer"));
3922     result = GST_FLOW_ERROR;
3923     goto paused;
3924   }
3925 }
3926
3927 static gboolean
3928 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3929     gboolean flushing)
3930 {
3931   GstBaseSinkClass *bclass;
3932
3933   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3934
3935   if (flushing) {
3936     /* unlock any subclasses, we need to do this before grabbing the
3937      * PREROLL_LOCK since we hold this lock before going into ::render. */
3938     if (bclass->unlock)
3939       bclass->unlock (basesink);
3940   }
3941
3942   GST_PAD_PREROLL_LOCK (pad);
3943   basesink->flushing = flushing;
3944   if (flushing) {
3945     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3946     if (bclass->unlock_stop)
3947       bclass->unlock_stop (basesink);
3948
3949     /* set need_preroll before we unblock the clock. If the clock is unblocked
3950      * before timing out, we can reuse the buffer for preroll. */
3951     basesink->need_preroll = TRUE;
3952
3953     /* step 2, unblock clock sync (if any) or any other blocking thing */
3954     if (basesink->clock_id) {
3955       gst_clock_id_unschedule (basesink->clock_id);
3956     }
3957
3958     /* flush out the data thread if it's locked in finish_preroll, this will
3959      * also flush out the EOS state */
3960     GST_DEBUG_OBJECT (basesink,
3961         "flushing out data thread, need preroll to TRUE");
3962     gst_base_sink_preroll_queue_flush (basesink, pad);
3963   }
3964   GST_PAD_PREROLL_UNLOCK (pad);
3965
3966   return TRUE;
3967 }
3968
3969 static gboolean
3970 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3971 {
3972   gboolean result;
3973
3974   if (active) {
3975     /* start task */
3976     result = gst_pad_start_task (basesink->sinkpad,
3977         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3978   } else {
3979     /* step 2, make sure streaming finishes */
3980     result = gst_pad_stop_task (basesink->sinkpad);
3981   }
3982
3983   return result;
3984 }
3985
3986 static gboolean
3987 gst_base_sink_pad_activate (GstPad * pad)
3988 {
3989   gboolean result = FALSE;
3990   GstBaseSink *basesink;
3991
3992   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3993
3994   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3995
3996   gst_base_sink_set_flushing (basesink, pad, FALSE);
3997
3998   /* we need to have the pull mode enabled */
3999   if (!basesink->can_activate_pull) {
4000     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4001     goto fallback;
4002   }
4003
4004   /* check if downstreams supports pull mode at all */
4005   if (!gst_pad_check_pull_range (pad)) {
4006     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4007     goto fallback;
4008   }
4009
4010   /* set the pad mode before starting the task so that it's in the
4011    * correct state for the new thread. also the sink set_caps and get_caps
4012    * function checks this */
4013   basesink->pad_mode = GST_ACTIVATE_PULL;
4014
4015   /* we first try to negotiate a format so that when we try to activate
4016    * downstream, it knows about our format */
4017   if (!gst_base_sink_negotiate_pull (basesink)) {
4018     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4019     goto fallback;
4020   }
4021
4022   /* ok activate now */
4023   if (!gst_pad_activate_pull (pad, TRUE)) {
4024     /* clear any pending caps */
4025     GST_OBJECT_LOCK (basesink);
4026     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4027     GST_OBJECT_UNLOCK (basesink);
4028     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4029     goto fallback;
4030   }
4031
4032   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4033   result = TRUE;
4034   goto done;
4035
4036   /* push mode fallback */
4037 fallback:
4038   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4039   if ((result = gst_pad_activate_push (pad, TRUE))) {
4040     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4041   }
4042
4043 done:
4044   if (!result) {
4045     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4046     gst_base_sink_set_flushing (basesink, pad, TRUE);
4047   }
4048
4049   gst_object_unref (basesink);
4050
4051   return result;
4052 }
4053
4054 static gboolean
4055 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4056 {
4057   gboolean result;
4058   GstBaseSink *basesink;
4059
4060   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4061
4062   if (active) {
4063     if (!basesink->can_activate_push) {
4064       result = FALSE;
4065       basesink->pad_mode = GST_ACTIVATE_NONE;
4066     } else {
4067       result = TRUE;
4068       basesink->pad_mode = GST_ACTIVATE_PUSH;
4069     }
4070   } else {
4071     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4072       g_warning ("Internal GStreamer activation error!!!");
4073       result = FALSE;
4074     } else {
4075       gst_base_sink_set_flushing (basesink, pad, TRUE);
4076       result = TRUE;
4077       basesink->pad_mode = GST_ACTIVATE_NONE;
4078     }
4079   }
4080
4081   gst_object_unref (basesink);
4082
4083   return result;
4084 }
4085
4086 static gboolean
4087 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4088 {
4089   GstCaps *caps;
4090   gboolean result;
4091
4092   result = FALSE;
4093
4094   /* this returns the intersection between our caps and the peer caps. If there
4095    * is no peer, it returns NULL and we can't operate in pull mode so we can
4096    * fail the negotiation. */
4097   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4098   if (caps == NULL || gst_caps_is_empty (caps))
4099     goto no_caps_possible;
4100
4101   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4102
4103   caps = gst_caps_make_writable (caps);
4104   /* get the first (prefered) format */
4105   gst_caps_truncate (caps);
4106   /* try to fixate */
4107   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4108
4109   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4110
4111   if (gst_caps_is_any (caps)) {
4112     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4113         "allowing pull()");
4114     /* neither side has template caps in this case, so they are prepared for
4115        pull() without setcaps() */
4116     result = TRUE;
4117   } else if (gst_caps_is_fixed (caps)) {
4118     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4119       goto could_not_set_caps;
4120
4121     GST_OBJECT_LOCK (basesink);
4122     gst_caps_replace (&basesink->priv->pull_caps, caps);
4123     GST_OBJECT_UNLOCK (basesink);
4124
4125     result = TRUE;
4126   }
4127
4128   gst_caps_unref (caps);
4129
4130   return result;
4131
4132 no_caps_possible:
4133   {
4134     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4135     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4136     if (caps)
4137       gst_caps_unref (caps);
4138     return FALSE;
4139   }
4140 could_not_set_caps:
4141   {
4142     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4143     gst_caps_unref (caps);
4144     return FALSE;
4145   }
4146 }
4147
4148 /* this won't get called until we implement an activate function */
4149 static gboolean
4150 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4151 {
4152   gboolean result = FALSE;
4153   GstBaseSink *basesink;
4154   GstBaseSinkClass *bclass;
4155
4156   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4157   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4158
4159   if (active) {
4160     GstFormat format;
4161     gint64 duration;
4162
4163     /* we mark we have a newsegment here because pull based
4164      * mode works just fine without having a newsegment before the
4165      * first buffer */
4166     format = GST_FORMAT_BYTES;
4167
4168     gst_segment_init (&basesink->segment, format);
4169     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4170     GST_OBJECT_LOCK (basesink);
4171     basesink->have_newsegment = TRUE;
4172     GST_OBJECT_UNLOCK (basesink);
4173
4174     /* get the peer duration in bytes */
4175     result = gst_pad_query_peer_duration (pad, &format, &duration);
4176     if (result) {
4177       GST_DEBUG_OBJECT (basesink,
4178           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4179       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4180           duration);
4181       gst_segment_set_duration (&basesink->segment, format, duration);
4182     } else {
4183       GST_DEBUG_OBJECT (basesink, "unknown duration");
4184     }
4185
4186     if (bclass->activate_pull)
4187       result = bclass->activate_pull (basesink, TRUE);
4188     else
4189       result = FALSE;
4190
4191     if (!result)
4192       goto activate_failed;
4193
4194   } else {
4195     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4196       g_warning ("Internal GStreamer activation error!!!");
4197       result = FALSE;
4198     } else {
4199       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4200       if (bclass->activate_pull)
4201         result &= bclass->activate_pull (basesink, FALSE);
4202       basesink->pad_mode = GST_ACTIVATE_NONE;
4203       /* clear any pending caps */
4204       GST_OBJECT_LOCK (basesink);
4205       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4206       GST_OBJECT_UNLOCK (basesink);
4207     }
4208   }
4209   gst_object_unref (basesink);
4210
4211   return result;
4212
4213   /* ERRORS */
4214 activate_failed:
4215   {
4216     /* reset, as starting the thread failed */
4217     basesink->pad_mode = GST_ACTIVATE_NONE;
4218
4219     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4220     return FALSE;
4221   }
4222 }
4223
4224 /* send an event to our sinkpad peer. */
4225 static gboolean
4226 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4227 {
4228   GstPad *pad;
4229   GstBaseSink *basesink = GST_BASE_SINK (element);
4230   gboolean forward, result = TRUE;
4231   GstActivateMode mode;
4232
4233   GST_OBJECT_LOCK (element);
4234   /* get the pad and the scheduling mode */
4235   pad = gst_object_ref (basesink->sinkpad);
4236   mode = basesink->pad_mode;
4237   GST_OBJECT_UNLOCK (element);
4238
4239   /* only push UPSTREAM events upstream */
4240   forward = GST_EVENT_IS_UPSTREAM (event);
4241
4242   switch (GST_EVENT_TYPE (event)) {
4243     case GST_EVENT_LATENCY:
4244     {
4245       GstClockTime latency;
4246
4247       gst_event_parse_latency (event, &latency);
4248
4249       /* store the latency. We use this to adjust the running_time before syncing
4250        * it to the clock. */
4251       GST_OBJECT_LOCK (element);
4252       basesink->priv->latency = latency;
4253       if (!basesink->priv->have_latency)
4254         forward = FALSE;
4255       GST_OBJECT_UNLOCK (element);
4256       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4257           GST_TIME_ARGS (latency));
4258
4259       /* We forward this event so that all elements know about the global pipeline
4260        * latency. This is interesting for an element when it wants to figure out
4261        * when a particular piece of data will be rendered. */
4262       break;
4263     }
4264     case GST_EVENT_SEEK:
4265       /* in pull mode we will execute the seek */
4266       if (mode == GST_ACTIVATE_PULL)
4267         result = gst_base_sink_perform_seek (basesink, pad, event);
4268       break;
4269     case GST_EVENT_STEP:
4270       result = gst_base_sink_perform_step (basesink, pad, event);
4271       forward = FALSE;
4272       break;
4273     default:
4274       break;
4275   }
4276
4277   if (forward) {
4278     result = gst_pad_push_event (pad, event);
4279   } else {
4280     /* not forwarded, unref the event */
4281     gst_event_unref (event);
4282   }
4283
4284   gst_object_unref (pad);
4285   return result;
4286 }
4287
4288 /* get the end position of the last seen object, this is used
4289  * for EOS and for making sure that we don't report a position we
4290  * have not reached yet. With LOCK. */
4291 static gboolean
4292 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
4293     gint64 * cur, gboolean * upstream)
4294 {
4295   GstFormat oformat;
4296   GstSegment *segment;
4297   gboolean ret = TRUE;
4298
4299   segment = &basesink->segment;
4300   oformat = segment->format;
4301
4302   if (oformat == GST_FORMAT_TIME) {
4303     /* return last observed stream time, we keep the stream time around in the
4304      * time format. */
4305     *cur = basesink->priv->current_sstop;
4306   } else {
4307     /* convert last stop to stream time */
4308     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4309   }
4310
4311   if (*cur != -1 && oformat != format) {
4312     GST_OBJECT_UNLOCK (basesink);
4313     /* convert to the target format if we need to, release lock first */
4314     ret =
4315         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4316     if (!ret) {
4317       *cur = -1;
4318       *upstream = TRUE;
4319     }
4320     GST_OBJECT_LOCK (basesink);
4321   }
4322
4323   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
4324       GST_TIME_ARGS (*cur));
4325
4326   return ret;
4327 }
4328
4329 /* get the position when we are PAUSED, this is the stream time of the buffer
4330  * that prerolled. If no buffer is prerolled (we are still flushing), this
4331  * value will be -1. With LOCK. */
4332 static gboolean
4333 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
4334     gint64 * cur, gboolean * upstream)
4335 {
4336   gboolean res;
4337   gint64 time;
4338   GstSegment *segment;
4339   GstFormat oformat;
4340
4341   /* we don't use the clip segment in pull mode, when seeking we update the
4342    * main segment directly with the new segment values without it having to be
4343    * activated by the rendering after preroll */
4344   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4345     segment = basesink->abidata.ABI.clip_segment;
4346   else
4347     segment = &basesink->segment;
4348   oformat = segment->format;
4349
4350   if (oformat == GST_FORMAT_TIME) {
4351     *cur = basesink->priv->current_sstart;
4352     if (segment->rate < 0.0 &&
4353         GST_CLOCK_TIME_IS_VALID (basesink->priv->current_sstop)) {
4354       /* for reverse playback we prefer the stream time stop position if we have
4355        * one */
4356       *cur = basesink->priv->current_sstop;
4357     }
4358   } else {
4359     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4360   }
4361
4362   time = segment->time;
4363
4364   if (*cur != -1) {
4365     *cur = MAX (*cur, time);
4366     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
4367         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
4368   } else {
4369     /* we have no buffer, use the segment times. */
4370     if (segment->rate >= 0.0) {
4371       /* forward, next position is always the time of the segment */
4372       *cur = time;
4373       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
4374           GST_TIME_ARGS (*cur));
4375     } else {
4376       /* reverse, next expected timestamp is segment->stop. We use the function
4377        * to get things right for negative applied_rates. */
4378       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
4379       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
4380           GST_TIME_ARGS (*cur));
4381     }
4382   }
4383
4384   res = (*cur != -1);
4385   if (res && oformat != format) {
4386     GST_OBJECT_UNLOCK (basesink);
4387     res =
4388         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
4389     if (!res) {
4390       *cur = -1;
4391       *upstream = TRUE;
4392     }
4393     GST_OBJECT_LOCK (basesink);
4394   }
4395
4396   return res;
4397 }
4398
4399 static gboolean
4400 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4401     gint64 * cur, gboolean * upstream)
4402 {
4403   GstClock *clock;
4404   gboolean res = FALSE;
4405   GstFormat oformat, tformat;
4406   GstClockTime now, latency;
4407   GstClockTimeDiff base;
4408   gint64 time, accum, duration;
4409   gdouble rate;
4410   gint64 last;
4411
4412   GST_OBJECT_LOCK (basesink);
4413   /* our intermediate time format */
4414   tformat = GST_FORMAT_TIME;
4415   /* get the format in the segment */
4416   oformat = basesink->segment.format;
4417
4418   /* can only give answer based on the clock if not EOS */
4419   if (G_UNLIKELY (basesink->eos))
4420     goto in_eos;
4421
4422   /* we can only get the segment when we are not NULL or READY */
4423   if (!basesink->have_newsegment)
4424     goto wrong_state;
4425
4426   /* when not in PLAYING or when we're busy with a state change, we
4427    * cannot read from the clock so we report time based on the
4428    * last seen timestamp. */
4429   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4430       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
4431     goto in_pause;
4432
4433   /* we need to sync on the clock. */
4434   if (basesink->sync == FALSE)
4435     goto no_sync;
4436
4437   /* and we need a clock */
4438   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4439     goto no_sync;
4440
4441   /* collect all data we need holding the lock */
4442   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
4443     time = basesink->segment.time;
4444   else
4445     time = 0;
4446
4447   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
4448     duration = basesink->segment.stop - basesink->segment.start;
4449   else
4450     duration = 0;
4451
4452   base = GST_ELEMENT_CAST (basesink)->base_time;
4453   accum = basesink->segment.accum;
4454   rate = basesink->segment.rate * basesink->segment.applied_rate;
4455   latency = basesink->priv->latency;
4456
4457   gst_object_ref (clock);
4458
4459   /* this function might release the LOCK */
4460   gst_base_sink_get_position_last (basesink, format, &last, upstream);
4461
4462   /* need to release the object lock before we can get the time,
4463    * a clock might take the LOCK of the provider, which could be
4464    * a basesink subclass. */
4465   GST_OBJECT_UNLOCK (basesink);
4466
4467   now = gst_clock_get_time (clock);
4468
4469   if (oformat != tformat) {
4470     /* convert accum, time and duration to time */
4471     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4472             &accum))
4473       goto convert_failed;
4474     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4475             &duration))
4476       goto convert_failed;
4477     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4478             &time))
4479       goto convert_failed;
4480   }
4481
4482   /* subtract base time and accumulated time from the clock time.
4483    * Make sure we don't go negative. This is the current time in
4484    * the segment which we need to scale with the combined
4485    * rate and applied rate. */
4486   base += accum;
4487   base += latency;
4488   if (GST_CLOCK_DIFF (base, now) < 0)
4489     base = now;
4490
4491   /* for negative rates we need to count back from the segment
4492    * duration. */
4493   if (rate < 0.0)
4494     time += duration;
4495
4496   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4497
4498   /* never report more than last seen position */
4499   if (last != -1)
4500     *cur = MIN (last, *cur);
4501
4502   gst_object_unref (clock);
4503
4504   GST_DEBUG_OBJECT (basesink,
4505       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4506       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4507       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4508       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4509
4510   if (oformat != format) {
4511     /* convert time to final format */
4512     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4513       goto convert_failed;
4514   }
4515
4516   res = TRUE;
4517
4518 done:
4519   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4520       res, GST_TIME_ARGS (*cur));
4521   return res;
4522
4523   /* special cases */
4524 in_eos:
4525   {
4526     GST_DEBUG_OBJECT (basesink, "position in EOS");
4527     res = gst_base_sink_get_position_last (basesink, format, cur, upstream);
4528     GST_OBJECT_UNLOCK (basesink);
4529     goto done;
4530   }
4531 in_pause:
4532   {
4533     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4534     res = gst_base_sink_get_position_paused (basesink, format, cur, upstream);
4535     GST_OBJECT_UNLOCK (basesink);
4536     goto done;
4537   }
4538 wrong_state:
4539   {
4540     /* in NULL or READY we always return FALSE and -1 */
4541     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4542     res = FALSE;
4543     *cur = -1;
4544     GST_OBJECT_UNLOCK (basesink);
4545     goto done;
4546   }
4547 no_sync:
4548   {
4549     /* report last seen timestamp if any, else ask upstream to answer */
4550     if ((*cur = basesink->priv->current_sstart) != -1)
4551       res = TRUE;
4552     else
4553       *upstream = TRUE;
4554
4555     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4556         res, GST_TIME_ARGS (*cur));
4557     GST_OBJECT_UNLOCK (basesink);
4558     return res;
4559   }
4560 convert_failed:
4561   {
4562     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4563     *upstream = TRUE;
4564     return FALSE;
4565   }
4566 }
4567
4568 static gboolean
4569 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4570     gint64 * dur, gboolean * upstream)
4571 {
4572   gboolean res = FALSE;
4573
4574   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4575     GstFormat uformat = GST_FORMAT_BYTES;
4576     gint64 uduration;
4577
4578     /* get the duration in bytes, in pull mode that's all we are sure to
4579      * know. We have to explicitly get this value from upstream instead of
4580      * using our cached value because it might change. Duration caching
4581      * should be done at a higher level. */
4582     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4583     if (res) {
4584       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4585       if (format != uformat) {
4586         /* convert to the requested format */
4587         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4588             &format, dur);
4589       } else {
4590         *dur = uduration;
4591       }
4592     }
4593     *upstream = FALSE;
4594   } else {
4595     *upstream = TRUE;
4596   }
4597
4598   return res;
4599 }
4600
4601 static gboolean
4602 gst_base_sink_query (GstElement * element, GstQuery * query)
4603 {
4604   gboolean res = FALSE;
4605
4606   GstBaseSink *basesink = GST_BASE_SINK (element);
4607
4608   switch (GST_QUERY_TYPE (query)) {
4609     case GST_QUERY_POSITION:
4610     {
4611       gint64 cur = 0;
4612       GstFormat format;
4613       gboolean upstream = FALSE;
4614
4615       gst_query_parse_position (query, &format, NULL);
4616
4617       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4618           gst_format_get_name (format));
4619
4620       /* first try to get the position based on the clock */
4621       if ((res =
4622               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4623         gst_query_set_position (query, format, cur);
4624       } else if (upstream) {
4625         /* fallback to peer query */
4626         res = gst_pad_peer_query (basesink->sinkpad, query);
4627       }
4628       if (!res) {
4629         /* we can handle a few things if upstream failed */
4630         if (format == GST_FORMAT_PERCENT) {
4631           gint64 dur = 0;
4632           GstFormat uformat = GST_FORMAT_TIME;
4633
4634           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4635               &upstream);
4636           if (!res && upstream) {
4637             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4638                 &cur);
4639           }
4640           if (res) {
4641             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4642                 &upstream);
4643             if (!res && upstream) {
4644               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4645                   &dur);
4646             }
4647           }
4648           if (res) {
4649             gint64 pos;
4650
4651             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4652                 dur);
4653             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4654           }
4655         }
4656       }
4657       break;
4658     }
4659     case GST_QUERY_DURATION:
4660     {
4661       gint64 dur = 0;
4662       GstFormat format;
4663       gboolean upstream = FALSE;
4664
4665       gst_query_parse_duration (query, &format, NULL);
4666
4667       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4668           gst_format_get_name (format));
4669
4670       if ((res =
4671               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4672         gst_query_set_duration (query, format, dur);
4673       } else if (upstream) {
4674         /* fallback to peer query */
4675         res = gst_pad_peer_query (basesink->sinkpad, query);
4676       }
4677       if (!res) {
4678         /* we can handle a few things if upstream failed */
4679         if (format == GST_FORMAT_PERCENT) {
4680           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4681               GST_FORMAT_PERCENT_MAX);
4682           res = TRUE;
4683         }
4684       }
4685       break;
4686     }
4687     case GST_QUERY_LATENCY:
4688     {
4689       gboolean live, us_live;
4690       GstClockTime min, max;
4691
4692       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4693                   &max))) {
4694         gst_query_set_latency (query, live, min, max);
4695       }
4696       break;
4697     }
4698     case GST_QUERY_JITTER:
4699       break;
4700     case GST_QUERY_RATE:
4701       /* gst_query_set_rate (query, basesink->segment_rate); */
4702       res = TRUE;
4703       break;
4704     case GST_QUERY_SEGMENT:
4705     {
4706       /* FIXME, bring start/stop to stream time */
4707       gst_query_set_segment (query, basesink->segment.rate,
4708           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4709       res = TRUE;
4710       break;
4711     }
4712     case GST_QUERY_SEEKING:
4713     case GST_QUERY_CONVERT:
4714     case GST_QUERY_FORMATS:
4715     default:
4716       res = gst_pad_peer_query (basesink->sinkpad, query);
4717       break;
4718   }
4719   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4720       GST_QUERY_TYPE_NAME (query), res);
4721   return res;
4722 }
4723
4724 static GstStateChangeReturn
4725 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4726 {
4727   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4728   GstBaseSink *basesink = GST_BASE_SINK (element);
4729   GstBaseSinkClass *bclass;
4730   GstBaseSinkPrivate *priv;
4731
4732   priv = basesink->priv;
4733
4734   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4735
4736   switch (transition) {
4737     case GST_STATE_CHANGE_NULL_TO_READY:
4738       if (bclass->start)
4739         if (!bclass->start (basesink))
4740           goto start_failed;
4741       break;
4742     case GST_STATE_CHANGE_READY_TO_PAUSED:
4743       /* need to complete preroll before this state change completes, there
4744        * is no data flow in READY so we can safely assume we need to preroll. */
4745       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4746       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4747       basesink->have_newsegment = FALSE;
4748       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4749       gst_segment_init (basesink->abidata.ABI.clip_segment,
4750           GST_FORMAT_UNDEFINED);
4751       basesink->offset = 0;
4752       basesink->have_preroll = FALSE;
4753       priv->step_unlock = FALSE;
4754       basesink->need_preroll = TRUE;
4755       basesink->playing_async = TRUE;
4756       priv->current_sstart = GST_CLOCK_TIME_NONE;
4757       priv->current_sstop = GST_CLOCK_TIME_NONE;
4758       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4759       priv->latency = 0;
4760       basesink->eos = FALSE;
4761       priv->received_eos = FALSE;
4762       gst_base_sink_reset_qos (basesink);
4763       priv->commited = FALSE;
4764       priv->call_preroll = TRUE;
4765       priv->current_step.valid = FALSE;
4766       priv->pending_step.valid = FALSE;
4767       if (priv->async_enabled) {
4768         GST_DEBUG_OBJECT (basesink, "doing async state change");
4769         /* when async enabled, post async-start message and return ASYNC from
4770          * the state change function */
4771         ret = GST_STATE_CHANGE_ASYNC;
4772         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4773             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4774       } else {
4775         priv->have_latency = TRUE;
4776       }
4777       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4778       break;
4779     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4780       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4781       if (!gst_base_sink_needs_preroll (basesink)) {
4782         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4783         /* no preroll needed anymore now. */
4784         basesink->playing_async = FALSE;
4785         basesink->need_preroll = FALSE;
4786         if (basesink->eos) {
4787           GstMessage *message;
4788
4789           /* need to post EOS message here */
4790           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4791           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4792           gst_message_set_seqnum (message, basesink->priv->seqnum);
4793           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4794         } else {
4795           GST_DEBUG_OBJECT (basesink, "signal preroll");
4796           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4797         }
4798       } else {
4799         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4800         basesink->need_preroll = TRUE;
4801         basesink->playing_async = TRUE;
4802         priv->call_preroll = TRUE;
4803         priv->commited = FALSE;
4804         if (priv->async_enabled) {
4805           GST_DEBUG_OBJECT (basesink, "doing async state change");
4806           ret = GST_STATE_CHANGE_ASYNC;
4807           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4808               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4809         }
4810       }
4811       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4812       break;
4813     default:
4814       break;
4815   }
4816
4817   {
4818     GstStateChangeReturn bret;
4819
4820     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4821     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4822       goto activate_failed;
4823   }
4824
4825   switch (transition) {
4826     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4827       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4828       /* FIXME, make sure we cannot enter _render first */
4829
4830       /* we need to call ::unlock before locking PREROLL_LOCK
4831        * since we lock it before going into ::render */
4832       if (bclass->unlock)
4833         bclass->unlock (basesink);
4834
4835       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4836       GST_DEBUG_OBJECT (basesink, "got preroll lock");
4837       /* now that we have the PREROLL lock, clear our unlock request */
4838       if (bclass->unlock_stop)
4839         bclass->unlock_stop (basesink);
4840
4841       /* we need preroll again and we set the flag before unlocking the clockid
4842        * because if the clockid is unlocked before a current buffer expired, we
4843        * can use that buffer to preroll with */
4844       basesink->need_preroll = TRUE;
4845
4846       if (basesink->clock_id) {
4847         GST_DEBUG_OBJECT (basesink, "unschedule clock");
4848         gst_clock_id_unschedule (basesink->clock_id);
4849       }
4850
4851       /* if we don't have a preroll buffer we need to wait for a preroll and
4852        * return ASYNC. */
4853       if (!gst_base_sink_needs_preroll (basesink)) {
4854         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4855         basesink->playing_async = FALSE;
4856       } else {
4857         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4858           GST_DEBUG_OBJECT (basesink, "element is <= READY");
4859           ret = GST_STATE_CHANGE_SUCCESS;
4860         } else {
4861           GST_DEBUG_OBJECT (basesink,
4862               "PLAYING to PAUSED, we are not prerolled");
4863           basesink->playing_async = TRUE;
4864           priv->commited = FALSE;
4865           priv->call_preroll = TRUE;
4866           if (priv->async_enabled) {
4867             GST_DEBUG_OBJECT (basesink, "doing async state change");
4868             ret = GST_STATE_CHANGE_ASYNC;
4869             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4870                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4871                     FALSE));
4872           }
4873         }
4874       }
4875       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4876           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4877
4878       gst_base_sink_reset_qos (basesink);
4879       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4880       break;
4881     case GST_STATE_CHANGE_PAUSED_TO_READY:
4882       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4883       /* start by reseting our position state with the object lock so that the
4884        * position query gets the right idea. We do this before we post the
4885        * messages so that the message handlers pick this up. */
4886       GST_OBJECT_LOCK (basesink);
4887       basesink->have_newsegment = FALSE;
4888       priv->current_sstart = GST_CLOCK_TIME_NONE;
4889       priv->current_sstop = GST_CLOCK_TIME_NONE;
4890       priv->have_latency = FALSE;
4891       GST_OBJECT_UNLOCK (basesink);
4892
4893       gst_base_sink_set_last_buffer (basesink, NULL);
4894       priv->call_preroll = FALSE;
4895
4896       if (!priv->commited) {
4897         if (priv->async_enabled) {
4898           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4899
4900           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4901               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4902                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4903
4904           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4905               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4906         }
4907         priv->commited = TRUE;
4908       } else {
4909         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4910       }
4911       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4912       break;
4913     case GST_STATE_CHANGE_READY_TO_NULL:
4914       if (bclass->stop) {
4915         if (!bclass->stop (basesink)) {
4916           GST_WARNING_OBJECT (basesink, "failed to stop");
4917         }
4918       }
4919       gst_base_sink_set_last_buffer (basesink, NULL);
4920       priv->call_preroll = FALSE;
4921       break;
4922     default:
4923       break;
4924   }
4925
4926   return ret;
4927
4928   /* ERRORS */
4929 start_failed:
4930   {
4931     GST_DEBUG_OBJECT (basesink, "failed to start");
4932     return GST_STATE_CHANGE_FAILURE;
4933   }
4934 activate_failed:
4935   {
4936     GST_DEBUG_OBJECT (basesink,
4937         "element failed to change states -- activation problem?");
4938     return GST_STATE_CHANGE_FAILURE;
4939   }
4940 }