basesink: move stuff around, more stepping
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * <programlisting>
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *   
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * </programlisting>
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSink::preroll vmethod with this preroll buffer and will then commit
59  * the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from ::get_times. If this function returns
63  * #GST_CLOCK_TIME_NONE for the start time, no synchronisation will be done.
64  * Synchronisation can be disabled entirely by setting the object "sync"
65  * property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSink::render will be called.
68  * Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the ::render method
71  * are supported as well. These classes typically receive a buffer in the render
72  * method and can then potentially block on the clock while rendering. A typical
73  * example is an audiosink. Since 0.10.11 these subclasses can use
74  * gst_base_sink_wait_preroll() to perform the blocking wait.
75  *
76  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
77  * for the clock to reach the time indicated by the stop time of the last
78  * ::get_times call before posting an EOS message. When the element receives
79  * EOS in PAUSED, preroll completes, the event is queued and an EOS message is
80  * posted when going to PLAYING.
81  *
82  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
83  * synchronisation and clipping of buffers. Buffers that fall completely outside
84  * of the current segment are dropped. Buffers that fall partially in the
85  * segment are rendered (and prerolled). Subclasses should do any subbuffer
86  * clipping themselves when needed.
87  *
88  * #GstBaseSink will by default report the current playback position in
89  * #GST_FORMAT_TIME based on the current clock time and segment information.
90  * If no clock has been set on the element, the query will be forwarded
91  * upstream.
92  *
93  * The ::set_caps function will be called when the subclass should configure
94  * itself to process a specific media type.
95  *
96  * The ::start and ::stop virtual methods will be called when resources should
97  * be allocated. Any ::preroll, ::render  and ::set_caps function will be
98  * called between the ::start and ::stop calls.
99  *
100  * The ::event virtual method will be called when an event is received by
101  * #GstBaseSink. Normally this method should only be overriden by very specific
102  * elements (such as file sinks) which need to handle the newsegment event
103  * specially.
104  *
105  * #GstBaseSink provides an overridable ::buffer_alloc function that can be
106  * used by sinks that want to do reverse negotiation or to provide
107  * custom buffers (hardware buffers for example) to upstream elements.
108  *
109  * The ::unlock method is called when the elements should unblock any blocking
110  * operations they perform in the ::render method. This is mostly useful when
111  * the ::render method performs a blocking write on a file descriptor, for
112  * example.
113  *
114  * The max-lateness property affects how the sink deals with buffers that
115  * arrive too late in the sink. A buffer arrives too late in the sink when
116  * the presentation time (as a combination of the last segment, buffer
117  * timestamp and element base_time) plus the duration is before the current
118  * time of the clock.
119  * If the frame is later than max-lateness, the sink will drop the buffer
120  * without calling the render method.
121  * This feature is disabled if sync is disabled, the ::get-times method does
122  * not return a valid start time or max-lateness is set to -1 (the default).
123  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
124  * max-lateness value.
125  *
126  * The qos property will enable the quality-of-service features of the basesink
127  * which gather statistics about the real-time performance of the clock
128  * synchronisation. For each buffer received in the sink, statistics are
129  * gathered and a QOS event is sent upstream with these numbers. This
130  * information can then be used by upstream elements to reduce their processing
131  * rate, for example.
132  *
133  * Since 0.10.15 the async property can be used to instruct the sink to never
134  * perform an ASYNC state change. This feature is mostly usable when dealing
135  * with non-synchronized streams or sparse streams.
136  *
137  * Last reviewed on 2007-08-29 (0.10.15)
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 #include "gstbasesink.h"
145 #include <gst/gstmarshal.h>
146 #include <gst/gst_private.h>
147 #include <gst/gst-i18n-lib.h>
148
149 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
150 #define GST_CAT_DEFAULT gst_base_sink_debug
151
152 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
153    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
154
155 typedef struct
156 {
157   gboolean valid;               /* if this info is valid */
158   guint32 seqnum;               /* the seqnum of the STEP event */
159   GstFormat format;             /* the format of the amount */
160   guint64 amount;               /* the total amount of data to skip */
161   guint64 remaining;            /* the remaining amount of data to skip */
162   guint64 duration;             /* the duration in time of the skipped data */
163   guint64 start;                /* running_time of the start */
164   gdouble rate;                 /* rate of skipping */
165   gboolean intermediate;        /* if this is an intermediate step */
166   gboolean need_preroll;        /* if we need preroll after this step */
167 } GstStepInfo;
168
169 /* FIXME, some stuff in ABI.data and other in Private...
170  * Make up your mind please.
171  */
172 struct _GstBaseSinkPrivate
173 {
174   gint qos_enabled;             /* ATOMIC */
175   gboolean async_enabled;
176   GstClockTimeDiff ts_offset;
177   GstClockTime render_delay;
178
179   /* start, stop of current buffer, stream time, used to report position */
180   GstClockTime current_sstart;
181   GstClockTime current_sstop;
182
183   /* start, stop and jitter of current buffer, running time */
184   GstClockTime current_rstart;
185   GstClockTime current_rstop;
186   GstClockTimeDiff current_jitter;
187
188   /* EOS sync time in running time */
189   GstClockTime eos_rtime;
190
191   /* last buffer that arrived in time, running time */
192   GstClockTime last_in_time;
193   /* when the last buffer left the sink, running time */
194   GstClockTime last_left;
195
196   /* running averages go here these are done on running time */
197   GstClockTime avg_pt;
198   GstClockTime avg_duration;
199   gdouble avg_rate;
200
201   /* these are done on system time. avg_jitter and avg_render are
202    * compared to eachother to see if the rendering time takes a
203    * huge amount of the processing, If so we are flooded with
204    * buffers. */
205   GstClockTime last_left_systime;
206   GstClockTime avg_jitter;
207   GstClockTime start, stop;
208   GstClockTime avg_render;
209
210   /* number of rendered and dropped frames */
211   guint64 rendered;
212   guint64 dropped;
213
214   /* latency stuff */
215   GstClockTime latency;
216
217   /* if we already commited the state */
218   gboolean commited;
219
220   /* when we received EOS */
221   gboolean received_eos;
222
223   /* when we are prerolled and able to report latency */
224   gboolean have_latency;
225
226   /* the last buffer we prerolled or rendered. Useful for making snapshots */
227   GstBuffer *last_buffer;
228
229   /* caps for pull based scheduling */
230   GstCaps *pull_caps;
231
232   /* blocksize for pulling */
233   guint blocksize;
234
235   gboolean discont;
236
237   /* seqnum of the stream */
238   guint32 seqnum;
239
240   gboolean call_preroll;
241
242   /* we have a pending and a current step operation */
243   GstStepInfo current_step;
244   GstStepInfo pending_step;
245 };
246
247 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
248
249 /* generic running average, this has a neutral window size */
250 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
251
252 /* the windows for these running averages are experimentally obtained.
253  * possitive values get averaged more while negative values use a small
254  * window so we can react faster to badness. */
255 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
256 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
257
258 /* BaseSink properties */
259
260 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
261 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
262
263 #define DEFAULT_PREROLL_QUEUE_LEN       0
264 #define DEFAULT_SYNC                    TRUE
265 #define DEFAULT_MAX_LATENESS            -1
266 #define DEFAULT_QOS                     FALSE
267 #define DEFAULT_ASYNC                   TRUE
268 #define DEFAULT_TS_OFFSET               0
269 #define DEFAULT_BLOCKSIZE               4096
270 #define DEFAULT_RENDER_DELAY            0
271
272 enum
273 {
274   PROP_0,
275   PROP_PREROLL_QUEUE_LEN,
276   PROP_SYNC,
277   PROP_MAX_LATENESS,
278   PROP_QOS,
279   PROP_ASYNC,
280   PROP_TS_OFFSET,
281   PROP_LAST_BUFFER,
282   PROP_BLOCKSIZE,
283   PROP_RENDER_DELAY,
284   PROP_LAST
285 };
286
287 static GstElementClass *parent_class = NULL;
288
289 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
290 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
291 static void gst_base_sink_finalize (GObject * object);
292
293 GType
294 gst_base_sink_get_type (void)
295 {
296   static volatile gsize base_sink_type = 0;
297
298   if (g_once_init_enter (&base_sink_type)) {
299     GType _type;
300     static const GTypeInfo base_sink_info = {
301       sizeof (GstBaseSinkClass),
302       NULL,
303       NULL,
304       (GClassInitFunc) gst_base_sink_class_init,
305       NULL,
306       NULL,
307       sizeof (GstBaseSink),
308       0,
309       (GInstanceInitFunc) gst_base_sink_init,
310     };
311
312     _type = g_type_register_static (GST_TYPE_ELEMENT,
313         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
314     g_once_init_leave (&base_sink_type, _type);
315   }
316   return base_sink_type;
317 }
318
319 static void gst_base_sink_set_property (GObject * object, guint prop_id,
320     const GValue * value, GParamSpec * pspec);
321 static void gst_base_sink_get_property (GObject * object, guint prop_id,
322     GValue * value, GParamSpec * pspec);
323
324 static gboolean gst_base_sink_send_event (GstElement * element,
325     GstEvent * event);
326 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
327
328 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
329 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
330 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
331     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
332 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
333     GstClockTime * start, GstClockTime * end);
334 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
335     GstPad * pad, gboolean flushing);
336 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
337     gboolean active);
338 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
339     GstSegment * segment);
340 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
341     GstEvent * event, GstSegment * segment);
342
343 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
344     GstStateChange transition);
345
346 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
347 static void gst_base_sink_loop (GstPad * pad);
348 static gboolean gst_base_sink_pad_activate (GstPad * pad);
349 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
350 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
351 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
352 static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
353
354 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
355
356 /* check if an object was too late */
357 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
358     GstMiniObject * obj, GstClockTime start, GstClockTime stop,
359     GstClockReturn status, GstClockTimeDiff jitter);
360 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
361     GstMiniObject * obj);
362
363 static void
364 gst_base_sink_class_init (GstBaseSinkClass * klass)
365 {
366   GObjectClass *gobject_class;
367   GstElementClass *gstelement_class;
368
369   gobject_class = G_OBJECT_CLASS (klass);
370   gstelement_class = GST_ELEMENT_CLASS (klass);
371
372   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
373       "basesink element");
374
375   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
376
377   parent_class = g_type_class_peek_parent (klass);
378
379   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
380   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
381   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
382
383   /* FIXME, this next value should be configured using an event from the
384    * upstream element, ie, the BUFFER_SIZE event. */
385   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
386       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
387           "Number of buffers to queue during preroll", 0, G_MAXUINT,
388           DEFAULT_PREROLL_QUEUE_LEN,
389           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
390
391   g_object_class_install_property (gobject_class, PROP_SYNC,
392       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
393           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394
395   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
396       g_param_spec_int64 ("max-lateness", "Max Lateness",
397           "Maximum number of nanoseconds that a buffer can be late before it "
398           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
399           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400
401   g_object_class_install_property (gobject_class, PROP_QOS,
402       g_param_spec_boolean ("qos", "Qos",
403           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
404           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
405   /**
406    * GstBaseSink:async
407    *
408    * If set to #TRUE, the basesink will perform asynchronous state changes.
409    * When set to #FALSE, the sink will not signal the parent when it prerolls.
410    * Use this option when dealing with sparse streams or when synchronisation is
411    * not required.
412    *
413    * Since: 0.10.15
414    */
415   g_object_class_install_property (gobject_class, PROP_ASYNC,
416       g_param_spec_boolean ("async", "Async",
417           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
418           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
419   /**
420    * GstBaseSink:ts-offset
421    *
422    * Controls the final synchronisation, a negative value will render the buffer
423    * earlier while a positive value delays playback. This property can be 
424    * used to fix synchronisation in bad files.
425    *
426    * Since: 0.10.15
427    */
428   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
429       g_param_spec_int64 ("ts-offset", "TS Offset",
430           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
431           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432   /**
433    * GstBaseSink:last-buffer
434    *
435    * The last buffer that arrived in the sink and was used for preroll or for
436    * rendering. This property can be used to generate thumbnails. This property
437    * can be NULL when the sink has not yet received a bufer.
438    *
439    * Since: 0.10.15
440    */
441   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
442       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
443           "The last buffer received in the sink", GST_TYPE_BUFFER,
444           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
445   /**
446    * GstBaseSink:blocksize
447    *
448    * The amount of bytes to pull when operating in pull mode.
449    *
450    * Since: 0.10.22
451    */
452   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
453       g_param_spec_uint ("blocksize", "Block size",
454           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
455           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456   /**
457    * GstBaseSink:render-delay
458    *
459    * The additional delay between synchronisation and actual rendering of the
460    * media. This property will add additional latency to the device in order to
461    * make other sinks compensate for the delay.
462    *
463    * Since: 0.10.22
464    */
465   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
466       g_param_spec_uint64 ("render-delay", "Render Delay",
467           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
468           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469
470   gstelement_class->change_state =
471       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
472   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
473   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
474
475   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
476   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
477   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
478   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
479   klass->activate_pull =
480       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
481 }
482
483 static GstCaps *
484 gst_base_sink_pad_getcaps (GstPad * pad)
485 {
486   GstBaseSinkClass *bclass;
487   GstBaseSink *bsink;
488   GstCaps *caps = NULL;
489
490   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
491   bclass = GST_BASE_SINK_GET_CLASS (bsink);
492
493   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
494     /* if we are operating in pull mode we only accept the negotiated caps */
495     GST_OBJECT_LOCK (pad);
496     if ((caps = GST_PAD_CAPS (pad)))
497       gst_caps_ref (caps);
498     GST_OBJECT_UNLOCK (pad);
499   }
500   if (caps == NULL) {
501     if (bclass->get_caps)
502       caps = bclass->get_caps (bsink);
503
504     if (caps == NULL) {
505       GstPadTemplate *pad_template;
506
507       pad_template =
508           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
509           "sink");
510       if (pad_template != NULL) {
511         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
512       }
513     }
514   }
515   gst_object_unref (bsink);
516
517   return caps;
518 }
519
520 static gboolean
521 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
522 {
523   GstBaseSinkClass *bclass;
524   GstBaseSink *bsink;
525   gboolean res = TRUE;
526
527   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
528   bclass = GST_BASE_SINK_GET_CLASS (bsink);
529
530   if (res && bclass->set_caps)
531     res = bclass->set_caps (bsink, caps);
532
533   gst_object_unref (bsink);
534
535   return res;
536 }
537
538 static void
539 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
540 {
541   GstBaseSinkClass *bclass;
542   GstBaseSink *bsink;
543
544   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
545   bclass = GST_BASE_SINK_GET_CLASS (bsink);
546
547   if (bclass->fixate)
548     bclass->fixate (bsink, caps);
549
550   gst_object_unref (bsink);
551 }
552
553 static GstFlowReturn
554 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
555     GstCaps * caps, GstBuffer ** buf)
556 {
557   GstBaseSinkClass *bclass;
558   GstBaseSink *bsink;
559   GstFlowReturn result = GST_FLOW_OK;
560
561   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
562   bclass = GST_BASE_SINK_GET_CLASS (bsink);
563
564   if (bclass->buffer_alloc)
565     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
566   else
567     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
568
569   gst_object_unref (bsink);
570
571   return result;
572 }
573
574 static void
575 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
576 {
577   GstPadTemplate *pad_template;
578   GstBaseSinkPrivate *priv;
579
580   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
581
582   pad_template =
583       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
584   g_return_if_fail (pad_template != NULL);
585
586   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587
588   gst_pad_set_getcaps_function (basesink->sinkpad,
589       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
590   gst_pad_set_setcaps_function (basesink->sinkpad,
591       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
592   gst_pad_set_fixatecaps_function (basesink->sinkpad,
593       GST_DEBUG_FUNCPTR (gst_base_sink_pad_fixate));
594   gst_pad_set_bufferalloc_function (basesink->sinkpad,
595       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
596   gst_pad_set_activate_function (basesink->sinkpad,
597       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate));
598   gst_pad_set_activatepush_function (basesink->sinkpad,
599       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_push));
600   gst_pad_set_activatepull_function (basesink->sinkpad,
601       GST_DEBUG_FUNCPTR (gst_base_sink_pad_activate_pull));
602   gst_pad_set_event_function (basesink->sinkpad,
603       GST_DEBUG_FUNCPTR (gst_base_sink_event));
604   gst_pad_set_chain_function (basesink->sinkpad,
605       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
606   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
607
608   basesink->pad_mode = GST_ACTIVATE_NONE;
609   basesink->preroll_queue = g_queue_new ();
610   basesink->abidata.ABI.clip_segment = gst_segment_new ();
611   priv->have_latency = FALSE;
612
613   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
614   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
615
616   basesink->sync = DEFAULT_SYNC;
617   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
618   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
619   priv->async_enabled = DEFAULT_ASYNC;
620   priv->ts_offset = DEFAULT_TS_OFFSET;
621   priv->render_delay = DEFAULT_RENDER_DELAY;
622   priv->blocksize = DEFAULT_BLOCKSIZE;
623
624   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
625 }
626
627 static void
628 gst_base_sink_finalize (GObject * object)
629 {
630   GstBaseSink *basesink;
631
632   basesink = GST_BASE_SINK (object);
633
634   g_queue_free (basesink->preroll_queue);
635   gst_segment_free (basesink->abidata.ABI.clip_segment);
636
637   G_OBJECT_CLASS (parent_class)->finalize (object);
638 }
639
640 /**
641  * gst_base_sink_set_sync:
642  * @sink: the sink
643  * @sync: the new sync value.
644  *
645  * Configures @sink to synchronize on the clock or not. When
646  * @sync is FALSE, incomming samples will be played as fast as
647  * possible. If @sync is TRUE, the timestamps of the incomming
648  * buffers will be used to schedule the exact render time of its
649  * contents.
650  *
651  * Since: 0.10.4
652  */
653 void
654 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
655 {
656   g_return_if_fail (GST_IS_BASE_SINK (sink));
657
658   GST_OBJECT_LOCK (sink);
659   sink->sync = sync;
660   GST_OBJECT_UNLOCK (sink);
661 }
662
663 /**
664  * gst_base_sink_get_sync:
665  * @sink: the sink
666  *
667  * Checks if @sink is currently configured to synchronize against the
668  * clock.
669  *
670  * Returns: TRUE if the sink is configured to synchronize against the clock.
671  *
672  * Since: 0.10.4
673  */
674 gboolean
675 gst_base_sink_get_sync (GstBaseSink * sink)
676 {
677   gboolean res;
678
679   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
680
681   GST_OBJECT_LOCK (sink);
682   res = sink->sync;
683   GST_OBJECT_UNLOCK (sink);
684
685   return res;
686 }
687
688 /**
689  * gst_base_sink_set_max_lateness:
690  * @sink: the sink
691  * @max_lateness: the new max lateness value.
692  *
693  * Sets the new max lateness value to @max_lateness. This value is
694  * used to decide if a buffer should be dropped or not based on the
695  * buffer timestamp and the current clock time. A value of -1 means
696  * an unlimited time.
697  *
698  * Since: 0.10.4
699  */
700 void
701 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
702 {
703   g_return_if_fail (GST_IS_BASE_SINK (sink));
704
705   GST_OBJECT_LOCK (sink);
706   sink->abidata.ABI.max_lateness = max_lateness;
707   GST_OBJECT_UNLOCK (sink);
708 }
709
710 /**
711  * gst_base_sink_get_max_lateness:
712  * @sink: the sink
713  *
714  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
715  * more details.
716  *
717  * Returns: The maximum time in nanoseconds that a buffer can be late
718  * before it is dropped and not rendered. A value of -1 means an
719  * unlimited time.
720  *
721  * Since: 0.10.4
722  */
723 gint64
724 gst_base_sink_get_max_lateness (GstBaseSink * sink)
725 {
726   gint64 res;
727
728   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
729
730   GST_OBJECT_LOCK (sink);
731   res = sink->abidata.ABI.max_lateness;
732   GST_OBJECT_UNLOCK (sink);
733
734   return res;
735 }
736
737 /**
738  * gst_base_sink_set_qos_enabled:
739  * @sink: the sink
740  * @enabled: the new qos value.
741  *
742  * Configures @sink to send Quality-of-Service events upstream.
743  *
744  * Since: 0.10.5
745  */
746 void
747 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
748 {
749   g_return_if_fail (GST_IS_BASE_SINK (sink));
750
751   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
752 }
753
754 /**
755  * gst_base_sink_is_qos_enabled:
756  * @sink: the sink
757  *
758  * Checks if @sink is currently configured to send Quality-of-Service events
759  * upstream.
760  *
761  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
762  *
763  * Since: 0.10.5
764  */
765 gboolean
766 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
767 {
768   gboolean res;
769
770   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
771
772   res = g_atomic_int_get (&sink->priv->qos_enabled);
773
774   return res;
775 }
776
777 /**
778  * gst_base_sink_set_async_enabled:
779  * @sink: the sink
780  * @enabled: the new async value.
781  *
782  * Configures @sink to perform all state changes asynchronusly. When async is
783  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
784  * preroll buffer. This feature is usefull if the sink does not synchronize
785  * against the clock or when it is dealing with sparse streams.
786  *
787  * Since: 0.10.15
788  */
789 void
790 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
791 {
792   g_return_if_fail (GST_IS_BASE_SINK (sink));
793
794   GST_PAD_PREROLL_LOCK (sink->sinkpad);
795   sink->priv->async_enabled = enabled;
796   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
797   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
798 }
799
800 /**
801  * gst_base_sink_is_async_enabled:
802  * @sink: the sink
803  *
804  * Checks if @sink is currently configured to perform asynchronous state
805  * changes to PAUSED.
806  *
807  * Returns: TRUE if the sink is configured to perform asynchronous state
808  * changes.
809  *
810  * Since: 0.10.15
811  */
812 gboolean
813 gst_base_sink_is_async_enabled (GstBaseSink * sink)
814 {
815   gboolean res;
816
817   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
818
819   GST_PAD_PREROLL_LOCK (sink->sinkpad);
820   res = sink->priv->async_enabled;
821   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
822
823   return res;
824 }
825
826 /**
827  * gst_base_sink_set_ts_offset:
828  * @sink: the sink
829  * @offset: the new offset
830  *
831  * Adjust the synchronisation of @sink with @offset. A negative value will
832  * render buffers earlier than their timestamp. A positive value will delay
833  * rendering. This function can be used to fix playback of badly timestamped
834  * buffers.
835  *
836  * Since: 0.10.15
837  */
838 void
839 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
840 {
841   g_return_if_fail (GST_IS_BASE_SINK (sink));
842
843   GST_OBJECT_LOCK (sink);
844   sink->priv->ts_offset = offset;
845   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
846   GST_OBJECT_UNLOCK (sink);
847 }
848
849 /**
850  * gst_base_sink_get_ts_offset:
851  * @sink: the sink
852  *
853  * Get the synchronisation offset of @sink.
854  *
855  * Returns: The synchronisation offset.
856  *
857  * Since: 0.10.15
858  */
859 GstClockTimeDiff
860 gst_base_sink_get_ts_offset (GstBaseSink * sink)
861 {
862   GstClockTimeDiff res;
863
864   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
865
866   GST_OBJECT_LOCK (sink);
867   res = sink->priv->ts_offset;
868   GST_OBJECT_UNLOCK (sink);
869
870   return res;
871 }
872
873 /**
874  * gst_base_sink_get_last_buffer:
875  * @sink: the sink
876  *
877  * Get the last buffer that arrived in the sink and was used for preroll or for
878  * rendering. This property can be used to generate thumbnails.
879  *
880  * The #GstCaps on the buffer can be used to determine the type of the buffer.
881  * 
882  * Returns: a #GstBuffer. gst_buffer_unref() after usage. This function returns
883  * NULL when no buffer has arrived in the sink yet or when the sink is not in
884  * PAUSED or PLAYING.
885  *
886  * Since: 0.10.15
887  */
888 GstBuffer *
889 gst_base_sink_get_last_buffer (GstBaseSink * sink)
890 {
891   GstBuffer *res;
892
893   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
894
895   GST_OBJECT_LOCK (sink);
896   if ((res = sink->priv->last_buffer))
897     gst_buffer_ref (res);
898   GST_OBJECT_UNLOCK (sink);
899
900   return res;
901 }
902
903 static void
904 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
905 {
906   GstBuffer *old;
907
908   GST_OBJECT_LOCK (sink);
909   old = sink->priv->last_buffer;
910   if (G_LIKELY (old != buffer)) {
911     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
912     if (G_LIKELY (buffer))
913       gst_buffer_ref (buffer);
914     sink->priv->last_buffer = buffer;
915   } else {
916     old = NULL;
917   }
918   GST_OBJECT_UNLOCK (sink);
919
920   /* avoid unreffing with the lock because cleanup code might want to take the
921    * lock too */
922   if (G_LIKELY (old))
923     gst_buffer_unref (old);
924 }
925
926 /**
927  * gst_base_sink_get_latency:
928  * @sink: the sink
929  *
930  * Get the currently configured latency.
931  *
932  * Returns: The configured latency.
933  *
934  * Since: 0.10.12
935  */
936 GstClockTime
937 gst_base_sink_get_latency (GstBaseSink * sink)
938 {
939   GstClockTime res;
940
941   GST_OBJECT_LOCK (sink);
942   res = sink->priv->latency;
943   GST_OBJECT_UNLOCK (sink);
944
945   return res;
946 }
947
948 /**
949  * gst_base_sink_query_latency:
950  * @sink: the sink
951  * @live: if the sink is live
952  * @upstream_live: if an upstream element is live
953  * @min_latency: the min latency of the upstream elements
954  * @max_latency: the max latency of the upstream elements
955  *
956  * Query the sink for the latency parameters. The latency will be queried from
957  * the upstream elements. @live will be TRUE if @sink is configured to
958  * synchronize against the clock. @upstream_live will be TRUE if an upstream
959  * element is live. 
960  *
961  * If both @live and @upstream_live are TRUE, the sink will want to compensate
962  * for the latency introduced by the upstream elements by setting the
963  * @min_latency to a strictly possitive value.
964  *
965  * This function is mostly used by subclasses. 
966  *
967  * Returns: TRUE if the query succeeded.
968  *
969  * Since: 0.10.12
970  */
971 gboolean
972 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
973     gboolean * upstream_live, GstClockTime * min_latency,
974     GstClockTime * max_latency)
975 {
976   gboolean l, us_live, res, have_latency;
977   GstClockTime min, max, render_delay;
978   GstQuery *query;
979   GstClockTime us_min, us_max;
980
981   /* we are live when we sync to the clock */
982   GST_OBJECT_LOCK (sink);
983   l = sink->sync;
984   have_latency = sink->priv->have_latency;
985   render_delay = sink->priv->render_delay;
986   GST_OBJECT_UNLOCK (sink);
987
988   /* assume no latency */
989   min = 0;
990   max = -1;
991   us_live = FALSE;
992
993   if (have_latency) {
994     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
995     /* we are ready for a latency query this is when we preroll or when we are
996      * not async. */
997     query = gst_query_new_latency ();
998
999     /* ask the peer for the latency */
1000     if ((res = gst_base_sink_peer_query (sink, query))) {
1001       /* get upstream min and max latency */
1002       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1003
1004       if (us_live) {
1005         /* upstream live, use its latency, subclasses should use these
1006          * values to create the complete latency. */
1007         min = us_min;
1008         max = us_max;
1009       }
1010       if (l) {
1011         /* we need to add the render delay if we are live */
1012         if (min != -1)
1013           min += render_delay;
1014         if (max != -1)
1015           max += render_delay;
1016       }
1017     }
1018     gst_query_unref (query);
1019   } else {
1020     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1021     res = FALSE;
1022   }
1023
1024   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1025   if (!res) {
1026     if (!l) {
1027       res = TRUE;
1028       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1029     } else {
1030       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1031     }
1032   }
1033
1034   if (res) {
1035     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1036         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1037         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1038
1039     if (live)
1040       *live = l;
1041     if (upstream_live)
1042       *upstream_live = us_live;
1043     if (min_latency)
1044       *min_latency = min;
1045     if (max_latency)
1046       *max_latency = max;
1047   }
1048   return res;
1049 }
1050
1051 /**
1052  * gst_base_sink_set_render_delay:
1053  * @sink: a #GstBaseSink
1054  * @delay: the new delay
1055  *
1056  * Set the render delay in @sink to @delay. The render delay is the time 
1057  * between actual rendering of a buffer and its synchronisation time. Some
1058  * devices might delay media rendering which can be compensated for with this
1059  * function. 
1060  *
1061  * After calling this function, this sink will report additional latency and
1062  * other sinks will adjust their latency to delay the rendering of their media.
1063  *
1064  * This function is usually called by subclasses.
1065  *
1066  * Since: 0.10.21
1067  */
1068 void
1069 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1070 {
1071   GstClockTime old_render_delay;
1072
1073   g_return_if_fail (GST_IS_BASE_SINK (sink));
1074
1075   GST_OBJECT_LOCK (sink);
1076   old_render_delay = sink->priv->render_delay;
1077   sink->priv->render_delay = delay;
1078   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1079       GST_TIME_ARGS (delay));
1080   GST_OBJECT_UNLOCK (sink);
1081
1082   if (delay != old_render_delay) {
1083     GST_DEBUG_OBJECT (sink, "posting latency changed");
1084     gst_element_post_message (GST_ELEMENT_CAST (sink),
1085         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1086   }
1087 }
1088
1089 /**
1090  * gst_base_sink_get_render_delay:
1091  * @sink: a #GstBaseSink
1092  *
1093  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1094  * information about the render delay.
1095  *
1096  * Returns: the render delay of @sink.
1097  *
1098  * Since: 0.10.21
1099  */
1100 GstClockTime
1101 gst_base_sink_get_render_delay (GstBaseSink * sink)
1102 {
1103   GstClockTimeDiff res;
1104
1105   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1106
1107   GST_OBJECT_LOCK (sink);
1108   res = sink->priv->render_delay;
1109   GST_OBJECT_UNLOCK (sink);
1110
1111   return res;
1112 }
1113
1114 /**
1115  * gst_base_sink_set_blocksize:
1116  * @sink: a #GstBaseSink
1117  * @blocksize: the blocksize in bytes
1118  *
1119  * Set the number of bytes that the sink will pull when it is operating in pull
1120  * mode.
1121  *
1122  * Since: 0.10.22
1123  */
1124 void
1125 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1126 {
1127   g_return_if_fail (GST_IS_BASE_SINK (sink));
1128
1129   GST_OBJECT_LOCK (sink);
1130   sink->priv->blocksize = blocksize;
1131   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1132   GST_OBJECT_UNLOCK (sink);
1133 }
1134
1135 /**
1136  * gst_base_sink_get_blocksize:
1137  * @sink: a #GstBaseSink
1138  *
1139  * Get the number of bytes that the sink will pull when it is operating in pull
1140  * mode.
1141  *
1142  * Returns: the number of bytes @sink will pull in pull mode.
1143  *
1144  * Since: 0.10.22
1145  */
1146 guint
1147 gst_base_sink_get_blocksize (GstBaseSink * sink)
1148 {
1149   guint res;
1150
1151   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1152
1153   GST_OBJECT_LOCK (sink);
1154   res = sink->priv->blocksize;
1155   GST_OBJECT_UNLOCK (sink);
1156
1157   return res;
1158 }
1159
1160 static void
1161 gst_base_sink_set_property (GObject * object, guint prop_id,
1162     const GValue * value, GParamSpec * pspec)
1163 {
1164   GstBaseSink *sink = GST_BASE_SINK (object);
1165
1166   switch (prop_id) {
1167     case PROP_PREROLL_QUEUE_LEN:
1168       /* preroll lock necessary to serialize with finish_preroll */
1169       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1170       sink->preroll_queue_max_len = g_value_get_uint (value);
1171       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1172       break;
1173     case PROP_SYNC:
1174       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1175       break;
1176     case PROP_MAX_LATENESS:
1177       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1178       break;
1179     case PROP_QOS:
1180       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1181       break;
1182     case PROP_ASYNC:
1183       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1184       break;
1185     case PROP_TS_OFFSET:
1186       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1187       break;
1188     case PROP_BLOCKSIZE:
1189       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1190       break;
1191     case PROP_RENDER_DELAY:
1192       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1193       break;
1194     default:
1195       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1196       break;
1197   }
1198 }
1199
1200 static void
1201 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1202     GParamSpec * pspec)
1203 {
1204   GstBaseSink *sink = GST_BASE_SINK (object);
1205
1206   switch (prop_id) {
1207     case PROP_PREROLL_QUEUE_LEN:
1208       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1209       g_value_set_uint (value, sink->preroll_queue_max_len);
1210       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1211       break;
1212     case PROP_SYNC:
1213       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1214       break;
1215     case PROP_MAX_LATENESS:
1216       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1217       break;
1218     case PROP_QOS:
1219       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1220       break;
1221     case PROP_ASYNC:
1222       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1223       break;
1224     case PROP_TS_OFFSET:
1225       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1226       break;
1227     case PROP_LAST_BUFFER:
1228       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1229       break;
1230     case PROP_BLOCKSIZE:
1231       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1232       break;
1233     case PROP_RENDER_DELAY:
1234       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1235       break;
1236     default:
1237       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1238       break;
1239   }
1240 }
1241
1242
1243 static GstCaps *
1244 gst_base_sink_get_caps (GstBaseSink * sink)
1245 {
1246   return NULL;
1247 }
1248
1249 static gboolean
1250 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1251 {
1252   return TRUE;
1253 }
1254
1255 static GstFlowReturn
1256 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1257     GstCaps * caps, GstBuffer ** buf)
1258 {
1259   *buf = NULL;
1260   return GST_FLOW_OK;
1261 }
1262
1263 /* with PREROLL_LOCK, STREAM_LOCK */
1264 static void
1265 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1266 {
1267   GstMiniObject *obj;
1268
1269   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1270   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1271     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1272     gst_mini_object_unref (obj);
1273   }
1274   /* we can't have EOS anymore now */
1275   basesink->eos = FALSE;
1276   basesink->priv->received_eos = FALSE;
1277   basesink->have_preroll = FALSE;
1278   basesink->eos_queued = FALSE;
1279   basesink->preroll_queued = 0;
1280   basesink->buffers_queued = 0;
1281   basesink->events_queued = 0;
1282   /* can't report latency anymore until we preroll again */
1283   if (basesink->priv->async_enabled) {
1284     GST_OBJECT_LOCK (basesink);
1285     basesink->priv->have_latency = FALSE;
1286     GST_OBJECT_UNLOCK (basesink);
1287   }
1288   /* and signal any waiters now */
1289   GST_PAD_PREROLL_SIGNAL (pad);
1290 }
1291
1292 /* with STREAM_LOCK, configures given segment with the event information. */
1293 static void
1294 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1295     GstEvent * event, GstSegment * segment)
1296 {
1297   gboolean update;
1298   gdouble rate, arate;
1299   GstFormat format;
1300   gint64 start;
1301   gint64 stop;
1302   gint64 time;
1303
1304   /* the newsegment event is needed to bring the buffer timestamps to the
1305    * stream time and to drop samples outside of the playback segment. */
1306   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1307       &start, &stop, &time);
1308
1309   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1310    * We protect with the OBJECT_LOCK so that we can use the values to
1311    * safely answer a POSITION query. */
1312   GST_OBJECT_LOCK (basesink);
1313   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1314       stop, time);
1315
1316   if (format == GST_FORMAT_TIME) {
1317     GST_DEBUG_OBJECT (basesink,
1318         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1319         "format GST_FORMAT_TIME, "
1320         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1321         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1322         update, rate, arate, GST_TIME_ARGS (segment->start),
1323         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1324         GST_TIME_ARGS (segment->accum));
1325   } else {
1326     GST_DEBUG_OBJECT (basesink,
1327         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1328         "format %d, "
1329         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1330         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1331         segment->format, segment->start, segment->stop, segment->time,
1332         segment->accum);
1333   }
1334   GST_OBJECT_UNLOCK (basesink);
1335 }
1336
1337 /* with PREROLL_LOCK, STREAM_LOCK */
1338 static gboolean
1339 gst_base_sink_commit_state (GstBaseSink * basesink)
1340 {
1341   /* commit state and proceed to next pending state */
1342   GstState current, next, pending, post_pending;
1343   gboolean post_paused = FALSE;
1344   gboolean post_async_done = FALSE;
1345   gboolean post_playing = FALSE;
1346
1347   /* we are certainly not playing async anymore now */
1348   basesink->playing_async = FALSE;
1349
1350   GST_OBJECT_LOCK (basesink);
1351   current = GST_STATE (basesink);
1352   next = GST_STATE_NEXT (basesink);
1353   pending = GST_STATE_PENDING (basesink);
1354   post_pending = pending;
1355
1356   switch (pending) {
1357     case GST_STATE_PLAYING:
1358     {
1359       GstBaseSinkClass *bclass;
1360       GstStateChangeReturn ret;
1361
1362       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1363
1364       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1365
1366       basesink->need_preroll = FALSE;
1367       post_async_done = TRUE;
1368       basesink->priv->commited = TRUE;
1369       post_playing = TRUE;
1370       /* post PAUSED too when we were READY */
1371       if (current == GST_STATE_READY) {
1372         post_paused = TRUE;
1373       }
1374
1375       /* make sure we notify the subclass of async playing */
1376       if (bclass->async_play) {
1377         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1378         ret = bclass->async_play (basesink);
1379         if (ret == GST_STATE_CHANGE_FAILURE)
1380           goto async_failed;
1381       }
1382       break;
1383     }
1384     case GST_STATE_PAUSED:
1385       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1386       post_paused = TRUE;
1387       post_async_done = TRUE;
1388       basesink->priv->commited = TRUE;
1389       post_pending = GST_STATE_VOID_PENDING;
1390       break;
1391     case GST_STATE_READY:
1392     case GST_STATE_NULL:
1393       goto stopping;
1394     case GST_STATE_VOID_PENDING:
1395       goto nothing_pending;
1396     default:
1397       break;
1398   }
1399
1400   /* we can report latency queries now */
1401   basesink->priv->have_latency = TRUE;
1402
1403   GST_STATE (basesink) = pending;
1404   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1405   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1406   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1407   GST_OBJECT_UNLOCK (basesink);
1408
1409   if (post_paused) {
1410     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1411     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1412         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1413             current, next, post_pending));
1414   }
1415   if (post_async_done) {
1416     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1417     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1418         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1419   }
1420   if (post_playing) {
1421     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1422     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1423         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1424             next, pending, GST_STATE_VOID_PENDING));
1425   }
1426
1427   GST_STATE_BROADCAST (basesink);
1428
1429   return TRUE;
1430
1431 nothing_pending:
1432   {
1433     /* Depending on the state, set our vars. We get in this situation when the
1434      * state change function got a change to update the state vars before the
1435      * streaming thread did. This is fine but we need to make sure that we
1436      * update the need_preroll var since it was TRUE when we got here and might
1437      * become FALSE if we got to PLAYING. */
1438     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1439         gst_element_state_get_name (current));
1440     switch (current) {
1441       case GST_STATE_PLAYING:
1442         basesink->need_preroll = FALSE;
1443         break;
1444       case GST_STATE_PAUSED:
1445         basesink->need_preroll = TRUE;
1446         break;
1447       default:
1448         basesink->need_preroll = FALSE;
1449         basesink->flushing = TRUE;
1450         break;
1451     }
1452     /* we can report latency queries now */
1453     basesink->priv->have_latency = TRUE;
1454     GST_OBJECT_UNLOCK (basesink);
1455     return TRUE;
1456   }
1457 stopping:
1458   {
1459     /* app is going to READY */
1460     GST_DEBUG_OBJECT (basesink, "stopping");
1461     basesink->need_preroll = FALSE;
1462     basesink->flushing = TRUE;
1463     GST_OBJECT_UNLOCK (basesink);
1464     return FALSE;
1465   }
1466 async_failed:
1467   {
1468     GST_DEBUG_OBJECT (basesink, "async commit failed");
1469     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1470     GST_OBJECT_UNLOCK (basesink);
1471     return FALSE;
1472   }
1473 }
1474
1475 static void
1476 start_stepping (GstBaseSink * sink, GstSegment * segment,
1477     GstStepInfo * pending, GstStepInfo * current)
1478 {
1479   gint64 start;
1480
1481   GST_DEBUG_OBJECT (sink, "update pending step");
1482   memcpy (current, pending, sizeof (GstStepInfo));
1483   pending->valid = FALSE;
1484
1485   /* get the running time of the current segment start and remember it */
1486   start =
1487       gst_segment_to_running_time (segment, segment->format, segment->start);
1488
1489   /* update the segment, accumulating what was consumed at the old rate, setting
1490    * the new rate. */
1491   gst_segment_set_newsegment_full (segment, TRUE, current->rate,
1492       segment->applied_rate, segment->format, segment->start, segment->stop,
1493       segment->time);
1494
1495   GST_DEBUG_OBJECT (sink, "step start %" GST_TIME_FORMAT,
1496       GST_TIME_ARGS (start));
1497   current->start = start;
1498 }
1499
1500 static void
1501 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1502     GstStepInfo * current, gint64 * cstart, gint64 * cstop)
1503 {
1504   GstMessage *message;
1505   gint64 stop;
1506
1507   GST_DEBUG_OBJECT (sink, "step complete");
1508
1509   /* get the end of the step segment */
1510   stop = gst_segment_to_running_time (segment, segment->format, *cstart);
1511
1512   GST_DEBUG_OBJECT (sink, "step stop %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1513   /* configure the duration of the elapsed segment */
1514   current->duration = stop - current->start;
1515
1516   /* update the segment, discarding what was consumed */
1517   segment->start = *cstart;
1518
1519   message =
1520       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1521       current->amount, current->rate, current->duration, current->intermediate);
1522   gst_message_set_seqnum (message, current->seqnum);
1523   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1524
1525   if (!current->intermediate)
1526     sink->need_preroll = current->need_preroll;
1527   current->valid = FALSE;
1528 }
1529
1530 static gboolean
1531 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1532     GstStepInfo * current)
1533 {
1534   GstBaseSinkPrivate *priv;
1535   gboolean step_end = FALSE;
1536
1537   priv = sink->priv;
1538
1539   /* see if we need to skip this buffer because of stepping */
1540   switch (current->format) {
1541     case GST_FORMAT_TIME:
1542       GST_DEBUG_OBJECT (sink,
1543           "got time step %" GST_TIME_FORMAT "/%" GST_TIME_FORMAT,
1544           GST_TIME_ARGS (current->remaining), GST_TIME_ARGS (current->amount));
1545       break;
1546     case GST_FORMAT_BUFFERS:
1547       GST_DEBUG_OBJECT (sink,
1548           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1549           current->remaining, current->amount);
1550
1551       if (current->remaining > 0) {
1552         current->remaining--;
1553       } else {
1554         step_end = TRUE;
1555       }
1556       break;
1557     case GST_FORMAT_DEFAULT:
1558     default:
1559       GST_DEBUG_OBJECT (sink,
1560           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1561           current->remaining, current->amount);
1562       break;
1563   }
1564   return step_end;
1565 }
1566
1567 /* with STREAM_LOCK, PREROLL_LOCK
1568  *
1569  * Returns TRUE if the object needs synchronisation and takes therefore
1570  * part in prerolling.
1571  *
1572  * rsstart/rsstop contain the start/stop in stream time.
1573  * rrstart/rrstop contain the start/stop in running time.
1574  */
1575 static gboolean
1576 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1577     GstClockTime * rsstart, GstClockTime * rsstop,
1578     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1579     gboolean * stepped, GstSegment * segment, GstStepInfo * step)
1580 {
1581   GstBaseSinkClass *bclass;
1582   GstBuffer *buffer;
1583   GstClockTime start, stop;     /* raw start/stop timestamps */
1584   gint64 cstart, cstop;         /* clipped raw timestamps */
1585   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1586   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1587   GstFormat format;
1588   GstBaseSinkPrivate *priv;
1589   gboolean step_end;
1590
1591   priv = basesink->priv;
1592
1593   /* start with nothing */
1594   start = stop = -1;
1595
1596   step_end = FALSE;
1597
1598   if (G_UNLIKELY (GST_IS_EVENT (obj))) {
1599     GstEvent *event = GST_EVENT_CAST (obj);
1600
1601     switch (GST_EVENT_TYPE (event)) {
1602         /* EOS event needs syncing */
1603       case GST_EVENT_EOS:
1604       {
1605         if (basesink->segment.rate >= 0.0) {
1606           sstart = sstop = priv->current_sstop;
1607           if (sstart == -1) {
1608             /* we have not seen a buffer yet, use the segment values */
1609             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1610                 basesink->segment.format, basesink->segment.stop);
1611           }
1612         } else {
1613           sstart = sstop = priv->current_sstart;
1614           if (sstart == -1) {
1615             /* we have not seen a buffer yet, use the segment values */
1616             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1617                 basesink->segment.format, basesink->segment.start);
1618           }
1619         }
1620
1621         rstart = rstop = priv->eos_rtime;
1622         *do_sync = rstart != -1;
1623         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1624             GST_TIME_ARGS (rstart));
1625         /* if we are stepping, we end now */
1626         step_end = step->valid;
1627         goto done;
1628       }
1629       default:
1630         /* other events do not need syncing */
1631         /* FIXME, maybe NEWSEGMENT might need synchronisation
1632          * since the POSITION query depends on accumulated times and
1633          * we cannot accumulate the current segment before the previous
1634          * one completed.
1635          */
1636         return FALSE;
1637     }
1638   }
1639
1640   /* else do buffer sync code */
1641   buffer = GST_BUFFER_CAST (obj);
1642
1643   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1644
1645   /* just get the times to see if we need syncing, if the start returns -1 we
1646    * don't sync. */
1647   if (bclass->get_times)
1648     bclass->get_times (basesink, buffer, &start, &stop);
1649
1650   if (start == -1) {
1651     /* we don't need to sync but we still want to get the timestamps for
1652      * tracking the position */
1653     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1654     *do_sync = FALSE;
1655   } else {
1656     *do_sync = TRUE;
1657   }
1658
1659   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1660       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1661       GST_TIME_ARGS (stop), *do_sync);
1662
1663   /* collect segment and format for code clarity */
1664   format = segment->format;
1665
1666   /* no timestamp clipping if we did not get a TIME segment format */
1667   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
1668     cstart = start;
1669     cstop = stop;
1670     /* do running and stream time in TIME format */
1671     format = GST_FORMAT_TIME;
1672     goto do_times;
1673   }
1674
1675   /* clip */
1676   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
1677               (gint64) start, (gint64) stop, &cstart, &cstop)))
1678     goto out_of_segment;
1679
1680   if (G_UNLIKELY (start != cstart || stop != cstop)) {
1681     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
1682         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
1683         GST_TIME_ARGS (cstop));
1684   }
1685
1686   /* set last stop position */
1687   if (G_LIKELY (cstop != GST_CLOCK_TIME_NONE))
1688     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
1689   else
1690     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
1691
1692 do_times:
1693   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
1694    * upstream is behaving very badly */
1695   sstart = gst_segment_to_stream_time (segment, format, cstart);
1696   sstop = gst_segment_to_stream_time (segment, format, cstop);
1697   rstart = gst_segment_to_running_time (segment, format, cstart);
1698   rstop = gst_segment_to_running_time (segment, format, cstop);
1699
1700   if (G_UNLIKELY (step->valid)) {
1701     if (!(step_end = handle_stepping (basesink, segment, step)))
1702       *stepped = TRUE;
1703   }
1704
1705 done:
1706   /* done label called when doing EOS */
1707   if (step_end)
1708     stop_stepping (basesink, segment, &priv->current_step, &cstart, &cstop);
1709
1710   /* save times */
1711   *rsstart = sstart;
1712   *rsstop = sstop;
1713   *rrstart = rstart;
1714   *rrstop = rstop;
1715
1716   /* buffers and EOS always need syncing and preroll */
1717   return TRUE;
1718
1719   /* special cases */
1720 out_of_segment:
1721   {
1722     /* should not happen since we clip them in the chain function already, 
1723      * we return FALSE so that we don't try to sync on it. */
1724     GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
1725         (NULL), ("unexpected buffer out of segment found."));
1726     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1727     return FALSE;
1728   }
1729 }
1730
1731 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
1732  * adjust a timestamp with the latency and timestamp offset */
1733 static GstClockTime
1734 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
1735 {
1736   GstClockTimeDiff ts_offset;
1737
1738   /* don't do anything funny with invalid timestamps */
1739   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1740     return time;
1741
1742   time += basesink->priv->latency;
1743
1744   /* apply offset, be carefull for underflows */
1745   ts_offset = basesink->priv->ts_offset;
1746   if (ts_offset < 0) {
1747     ts_offset = -ts_offset;
1748     if (ts_offset < time)
1749       time -= ts_offset;
1750     else
1751       time = 0;
1752   } else
1753     time += ts_offset;
1754
1755   return time;
1756 }
1757
1758 /**
1759  * gst_base_sink_wait_clock:
1760  * @sink: the sink
1761  * @time: the running_time to be reached
1762  * @jitter: the jitter to be filled with time diff (can be NULL)
1763  *
1764  * This function will block until @time is reached. It is usually called by
1765  * subclasses that use their own internal synchronisation.
1766  *
1767  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
1768  * returned. Likewise, if synchronisation is disabled in the element or there
1769  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
1770  *
1771  * This function should only be called with the PREROLL_LOCK held, like when
1772  * receiving an EOS event in the ::event vmethod or when receiving a buffer in
1773  * the ::render vmethod.
1774  *
1775  * The @time argument should be the running_time of when this method should
1776  * return and is not adjusted with any latency or offset configured in the
1777  * sink.
1778  *
1779  * Since 0.10.20
1780  *
1781  * Returns: #GstClockReturn
1782  */
1783 GstClockReturn
1784 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
1785     GstClockTimeDiff * jitter)
1786 {
1787   GstClockID id;
1788   GstClockReturn ret;
1789   GstClock *clock;
1790
1791   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
1792     goto invalid_time;
1793
1794   GST_OBJECT_LOCK (sink);
1795   if (G_UNLIKELY (!sink->sync))
1796     goto no_sync;
1797
1798   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
1799     goto no_clock;
1800
1801   /* add base_time to running_time to get the time against the clock */
1802   time += GST_ELEMENT_CAST (sink)->base_time;
1803
1804   id = gst_clock_new_single_shot_id (clock, time);
1805   GST_OBJECT_UNLOCK (sink);
1806
1807   /* A blocking wait is performed on the clock. We save the ClockID
1808    * so we can unlock the entry at any time. While we are blocking, we 
1809    * release the PREROLL_LOCK so that other threads can interrupt the
1810    * entry. */
1811   sink->clock_id = id;
1812   /* release the preroll lock while waiting */
1813   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1814
1815   ret = gst_clock_id_wait (id, jitter);
1816
1817   GST_PAD_PREROLL_LOCK (sink->sinkpad);
1818   gst_clock_id_unref (id);
1819   sink->clock_id = NULL;
1820
1821   return ret;
1822
1823   /* no syncing needed */
1824 invalid_time:
1825   {
1826     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
1827     return GST_CLOCK_BADTIME;
1828   }
1829 no_sync:
1830   {
1831     GST_DEBUG_OBJECT (sink, "sync disabled");
1832     GST_OBJECT_UNLOCK (sink);
1833     return GST_CLOCK_BADTIME;
1834   }
1835 no_clock:
1836   {
1837     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
1838     GST_OBJECT_UNLOCK (sink);
1839     return GST_CLOCK_BADTIME;
1840   }
1841 }
1842
1843 /**
1844  * gst_base_sink_wait_preroll:
1845  * @sink: the sink
1846  *
1847  * If the #GstBaseSinkClass::render method performs its own synchronisation against
1848  * the clock it must unblock when going from PLAYING to the PAUSED state and call
1849  * this method before continuing to render the remaining data.
1850  *
1851  * This function will block until a state change to PLAYING happens (in which
1852  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
1853  * to a state change to READY or a FLUSH event (in which case this function
1854  * returns #GST_FLOW_WRONG_STATE).
1855  *
1856  * This function should only be called with the PREROLL_LOCK held, like in the
1857  * render function.
1858  *
1859  * Since: 0.10.11
1860  *
1861  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1862  * continue. Any other return value should be returned from the render vmethod.
1863  */
1864 GstFlowReturn
1865 gst_base_sink_wait_preroll (GstBaseSink * sink)
1866 {
1867   sink->have_preroll = TRUE;
1868   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
1869   /* block until the state changes, or we get a flush, or something */
1870   GST_PAD_PREROLL_WAIT (sink->sinkpad);
1871   sink->have_preroll = FALSE;
1872   if (G_UNLIKELY (sink->flushing))
1873     goto stopping;
1874   GST_DEBUG_OBJECT (sink, "continue after preroll");
1875
1876   return GST_FLOW_OK;
1877
1878   /* ERRORS */
1879 stopping:
1880   {
1881     GST_DEBUG_OBJECT (sink, "preroll interrupted");
1882     return GST_FLOW_WRONG_STATE;
1883   }
1884 }
1885
1886 /**
1887  * gst_base_sink_do_preroll:
1888  * @sink: the sink
1889  * @obj: the object that caused the preroll
1890  *
1891  * If the @sink spawns its own thread for pulling buffers from upstream it
1892  * should call this method after it has pulled a buffer. If the element needed
1893  * to preroll, this function will perform the preroll and will then block
1894  * until the element state is changed.
1895  *
1896  * This function should be called with the PREROLL_LOCK held.
1897  *
1898  * Since 0.10.22
1899  *
1900  * Returns: #GST_FLOW_OK if the preroll completed and processing can
1901  * continue. Any other return value should be returned from the render vmethod.
1902  */
1903 GstFlowReturn
1904 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
1905 {
1906   GstFlowReturn ret;
1907
1908   while (G_UNLIKELY (sink->need_preroll)) {
1909     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
1910
1911     ret = gst_base_sink_preroll_object (sink, obj);
1912     if (ret != GST_FLOW_OK)
1913       goto preroll_failed;
1914
1915     /* need to recheck here because the commit state could have
1916      * made us not need the preroll anymore */
1917     if (G_LIKELY (sink->need_preroll)) {
1918       /* block until the state changes, or we get a flush, or something */
1919       ret = gst_base_sink_wait_preroll (sink);
1920       if (ret != GST_FLOW_OK)
1921         goto preroll_failed;
1922     }
1923   }
1924   return GST_FLOW_OK;
1925
1926   /* ERRORS */
1927 preroll_failed:
1928   {
1929     GST_DEBUG_OBJECT (sink, "preroll failed %d", ret);
1930     return ret;
1931   }
1932 }
1933
1934 /**
1935  * gst_base_sink_wait_eos:
1936  * @sink: the sink
1937  * @time: the running_time to be reached
1938  * @jitter: the jitter to be filled with time diff (can be NULL)
1939  *
1940  * This function will block until @time is reached. It is usually called by
1941  * subclasses that use their own internal synchronisation but want to let the
1942  * EOS be handled by the base class.
1943  *
1944  * This function should only be called with the PREROLL_LOCK held, like when
1945  * receiving an EOS event in the ::event vmethod.
1946  *
1947  * The @time argument should be the running_time of when the EOS should happen
1948  * and will be adjusted with any latency and offset configured in the sink.
1949  *
1950  * Since 0.10.15
1951  *
1952  * Returns: #GstFlowReturn
1953  */
1954 GstFlowReturn
1955 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
1956     GstClockTimeDiff * jitter)
1957 {
1958   GstClockReturn status;
1959   GstFlowReturn ret;
1960
1961   do {
1962     GstClockTime stime;
1963
1964     GST_DEBUG_OBJECT (sink, "checking preroll");
1965
1966     /* first wait for the playing state before we can continue */
1967     if (G_UNLIKELY (sink->need_preroll)) {
1968       ret = gst_base_sink_wait_preroll (sink);
1969       if (ret != GST_FLOW_OK)
1970         goto flushing;
1971     }
1972
1973     /* preroll done, we can sync since we are in PLAYING now. */
1974     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
1975         GST_TIME_FORMAT, GST_TIME_ARGS (time));
1976
1977     /* compensate for latency and ts_offset. We don't adjust for render delay
1978      * because we don't interact with the device on EOS normally. */
1979     stime = gst_base_sink_adjust_time (sink, time);
1980
1981     /* wait for the clock, this can be interrupted because we got shut down or 
1982      * we PAUSED. */
1983     status = gst_base_sink_wait_clock (sink, stime, jitter);
1984
1985     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
1986
1987     /* invalid time, no clock or sync disabled, just continue then */
1988     if (status == GST_CLOCK_BADTIME)
1989       break;
1990
1991     /* waiting could have been interrupted and we can be flushing now */
1992     if (G_UNLIKELY (sink->flushing))
1993       goto flushing;
1994
1995     /* retry if we got unscheduled, which means we did not reach the timeout
1996      * yet. if some other error occures, we continue. */
1997   } while (status == GST_CLOCK_UNSCHEDULED);
1998
1999   GST_DEBUG_OBJECT (sink, "end of stream");
2000
2001   return GST_FLOW_OK;
2002
2003   /* ERRORS */
2004 flushing:
2005   {
2006     GST_DEBUG_OBJECT (sink, "we are flushing");
2007     return GST_FLOW_WRONG_STATE;
2008   }
2009 }
2010
2011 /* with STREAM_LOCK, PREROLL_LOCK
2012  *
2013  * Make sure we are in PLAYING and synchronize an object to the clock.
2014  *
2015  * If we need preroll, we are not in PLAYING. We try to commit the state
2016  * if needed and then block if we still are not PLAYING.
2017  *
2018  * We start waiting on the clock in PLAYING. If we got interrupted, we
2019  * immediatly try to re-preroll.
2020  *
2021  * Some objects do not need synchronisation (most events) and so this function
2022  * immediatly returns GST_FLOW_OK.
2023  *
2024  * for objects that arrive later than max-lateness to be synchronized to the 
2025  * clock have the @late boolean set to TRUE.
2026  *
2027  * This function keeps a running average of the jitter (the diff between the
2028  * clock time and the requested sync time). The jitter is negative for
2029  * objects that arrive in time and positive for late buffers.
2030  *
2031  * does not take ownership of obj.
2032  */
2033 static GstFlowReturn
2034 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2035     GstMiniObject * obj, gboolean * late)
2036 {
2037   GstClockTimeDiff jitter;
2038   gboolean syncable;
2039   GstClockReturn status = GST_CLOCK_OK;
2040   GstClockTime rstart, rstop, sstart, sstop, stime;
2041   gboolean do_sync;
2042   GstBaseSinkPrivate *priv;
2043   GstFlowReturn ret;
2044   GstStepInfo *current, *pending;
2045   gboolean stepped;
2046
2047   priv = basesink->priv;
2048
2049 do_step:
2050   sstart = sstop = rstart = rstop = -1;
2051   do_sync = TRUE;
2052   stepped = FALSE;
2053
2054   priv->current_rstart = -1;
2055
2056   /* get stepping info */
2057   current = &priv->current_step;
2058   pending = &priv->pending_step;
2059
2060   /* get timing information for this object against the render segment */
2061   syncable = gst_base_sink_get_sync_times (basesink, obj,
2062       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2063       current);
2064
2065   if (G_UNLIKELY (stepped))
2066     goto step_skipped;
2067
2068   /* a syncable object needs to participate in preroll and
2069    * clocking. All buffers and EOS are syncable. */
2070   if (G_UNLIKELY (!syncable))
2071     goto not_syncable;
2072
2073   /* store timing info for current object */
2074   priv->current_rstart = rstart;
2075   priv->current_rstop = (rstop != -1 ? rstop : rstart);
2076
2077   /* save sync time for eos when the previous object needed sync */
2078   priv->eos_rtime = (do_sync ? priv->current_rstop : -1);
2079
2080 again:
2081   /* first do preroll, this makes sure we commit our state
2082    * to PAUSED and can continue to PLAYING. We cannot perform
2083    * any clock sync in PAUSED because there is no clock. */
2084   ret = gst_base_sink_do_preroll (basesink, obj);
2085   if (G_UNLIKELY (ret != GST_FLOW_OK))
2086     goto preroll_failed;
2087
2088   /* update the segment with a pending step if the current one is invalid and we
2089    * have a new pending one. We only accept new step updates after a preroll */
2090   if (G_UNLIKELY (pending->valid && !current->valid)) {
2091     start_stepping (basesink, &basesink->segment, pending, current);
2092     goto do_step;
2093   }
2094
2095   /* After rendering we store the position of the last buffer so that we can use
2096    * it to report the position. We need to take the lock here. */
2097   GST_OBJECT_LOCK (basesink);
2098   priv->current_sstart = sstart;
2099   priv->current_sstop = (sstop != -1 ? sstop : sstart);
2100   GST_OBJECT_UNLOCK (basesink);
2101
2102   if (!do_sync)
2103     goto done;
2104
2105   /* adjust for latency */
2106   stime = gst_base_sink_adjust_time (basesink, rstart);
2107
2108   /* adjust for render-delay, avoid underflows */
2109   if (stime != -1) {
2110     if (stime > priv->render_delay)
2111       stime -= priv->render_delay;
2112     else
2113       stime = 0;
2114   }
2115
2116   /* preroll done, we can sync since we are in PLAYING now. */
2117   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2118       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2119       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2120
2121   /* This function will return immediatly if start == -1, no clock
2122    * or sync is disabled with GST_CLOCK_BADTIME. */
2123   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2124
2125   GST_DEBUG_OBJECT (basesink, "clock returned %d", status);
2126
2127   /* invalid time, no clock or sync disabled, just render */
2128   if (status == GST_CLOCK_BADTIME)
2129     goto done;
2130
2131   /* waiting could have been interrupted and we can be flushing now */
2132   if (G_UNLIKELY (basesink->flushing))
2133     goto flushing;
2134
2135   /* check for unlocked by a state change, we are not flushing so
2136    * we can try to preroll on the current buffer. */
2137   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2138     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2139     priv->call_preroll = TRUE;
2140     goto again;
2141   }
2142
2143   /* successful syncing done, record observation */
2144   priv->current_jitter = jitter;
2145
2146   /* check if the object should be dropped */
2147   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2148       status, jitter);
2149
2150 done:
2151   return GST_FLOW_OK;
2152
2153   /* ERRORS */
2154 step_skipped:
2155   {
2156     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2157     *late = TRUE;
2158     return GST_FLOW_OK;
2159   }
2160 not_syncable:
2161   {
2162     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2163     return GST_FLOW_OK;
2164   }
2165 flushing:
2166   {
2167     GST_DEBUG_OBJECT (basesink, "we are flushing");
2168     return GST_FLOW_WRONG_STATE;
2169   }
2170 preroll_failed:
2171   {
2172     GST_DEBUG_OBJECT (basesink, "preroll failed");
2173     return ret;
2174   }
2175 }
2176
2177 static gboolean
2178 gst_base_sink_send_qos (GstBaseSink * basesink,
2179     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2180 {
2181   GstEvent *event;
2182   gboolean res;
2183
2184   /* generate Quality-of-Service event */
2185   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2186       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2187       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (time));
2188
2189   event = gst_event_new_qos (proportion, diff, time);
2190
2191   /* send upstream */
2192   res = gst_pad_push_event (basesink->sinkpad, event);
2193
2194   return res;
2195 }
2196
2197 static void
2198 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2199 {
2200   GstBaseSinkPrivate *priv;
2201   GstClockTime start, stop;
2202   GstClockTimeDiff jitter;
2203   GstClockTime pt, entered, left;
2204   GstClockTime duration;
2205   gdouble rate;
2206
2207   priv = sink->priv;
2208
2209   start = priv->current_rstart;
2210
2211   /* if Quality-of-Service disabled, do nothing */
2212   if (!g_atomic_int_get (&priv->qos_enabled) || start == -1)
2213     return;
2214
2215   stop = priv->current_rstop;
2216   jitter = priv->current_jitter;
2217
2218   if (jitter < 0) {
2219     /* this is the time the buffer entered the sink */
2220     if (start < -jitter)
2221       entered = 0;
2222     else
2223       entered = start + jitter;
2224     left = start;
2225   } else {
2226     /* this is the time the buffer entered the sink */
2227     entered = start + jitter;
2228     /* this is the time the buffer left the sink */
2229     left = start + jitter;
2230   }
2231
2232   /* calculate duration of the buffer */
2233   if (stop != -1)
2234     duration = stop - start;
2235   else
2236     duration = -1;
2237
2238   /* if we have the time when the last buffer left us, calculate
2239    * processing time */
2240   if (priv->last_left != -1) {
2241     if (entered > priv->last_left) {
2242       pt = entered - priv->last_left;
2243     } else {
2244       pt = 0;
2245     }
2246   } else {
2247     pt = priv->avg_pt;
2248   }
2249
2250   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2251       ", entered %" GST_TIME_FORMAT ", left %" GST_TIME_FORMAT ", pt: %"
2252       GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT ",jitter %"
2253       G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (entered),
2254       GST_TIME_ARGS (left), GST_TIME_ARGS (pt), GST_TIME_ARGS (duration),
2255       jitter);
2256
2257   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2258       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2259       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2260       priv->avg_rate);
2261
2262   /* collect running averages. for first observations, we copy the
2263    * values */
2264   if (priv->avg_duration == -1)
2265     priv->avg_duration = duration;
2266   else
2267     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2268
2269   if (priv->avg_pt == -1)
2270     priv->avg_pt = pt;
2271   else
2272     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2273
2274   if (priv->avg_duration != 0)
2275     rate =
2276         gst_guint64_to_gdouble (priv->avg_pt) /
2277         gst_guint64_to_gdouble (priv->avg_duration);
2278   else
2279     rate = 0.0;
2280
2281   if (priv->last_left != -1) {
2282     if (dropped || priv->avg_rate < 0.0) {
2283       priv->avg_rate = rate;
2284     } else {
2285       if (rate > 1.0)
2286         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2287       else
2288         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2289     }
2290   }
2291
2292   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2293       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2294       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2295       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2296
2297
2298   if (priv->avg_rate >= 0.0) {
2299     /* if we have a valid rate, start sending QoS messages */
2300     if (priv->current_jitter < 0) {
2301       /* make sure we never go below 0 when adding the jitter to the
2302        * timestamp. */
2303       if (priv->current_rstart < -priv->current_jitter)
2304         priv->current_jitter = -priv->current_rstart;
2305     }
2306     gst_base_sink_send_qos (sink, priv->avg_rate, priv->current_rstart,
2307         priv->current_jitter);
2308   }
2309
2310   /* record when this buffer will leave us */
2311   priv->last_left = left;
2312 }
2313
2314 /* reset all qos measuring */
2315 static void
2316 gst_base_sink_reset_qos (GstBaseSink * sink)
2317 {
2318   GstBaseSinkPrivate *priv;
2319
2320   priv = sink->priv;
2321
2322   priv->last_in_time = -1;
2323   priv->last_left = -1;
2324   priv->avg_duration = -1;
2325   priv->avg_pt = -1;
2326   priv->avg_rate = -1.0;
2327   priv->avg_render = -1;
2328   priv->rendered = 0;
2329   priv->dropped = 0;
2330
2331 }
2332
2333 /* Checks if the object was scheduled too late.
2334  *
2335  * start/stop contain the raw timestamp start and stop values
2336  * of the object.
2337  *
2338  * status and jitter contain the return values from the clock wait.
2339  *
2340  * returns TRUE if the buffer was too late.
2341  */
2342 static gboolean
2343 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2344     GstClockTime start, GstClockTime stop,
2345     GstClockReturn status, GstClockTimeDiff jitter)
2346 {
2347   gboolean late;
2348   gint64 max_lateness;
2349   GstBaseSinkPrivate *priv;
2350
2351   priv = basesink->priv;
2352
2353   late = FALSE;
2354
2355   /* only for objects that were too late */
2356   if (G_LIKELY (status != GST_CLOCK_EARLY))
2357     goto in_time;
2358
2359   max_lateness = basesink->abidata.ABI.max_lateness;
2360
2361   /* check if frame dropping is enabled */
2362   if (max_lateness == -1)
2363     goto no_drop;
2364
2365   /* only check for buffers */
2366   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2367     goto not_buffer;
2368
2369   /* can't do check if we don't have a timestamp */
2370   if (G_UNLIKELY (start == -1))
2371     goto no_timestamp;
2372
2373   /* we can add a valid stop time */
2374   if (stop != -1)
2375     max_lateness += stop;
2376   else
2377     max_lateness += start;
2378
2379   /* if the jitter bigger than duration and lateness we are too late */
2380   if ((late = start + jitter > max_lateness)) {
2381     GST_DEBUG_OBJECT (basesink, "buffer is too late %" GST_TIME_FORMAT
2382         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (start + jitter),
2383         GST_TIME_ARGS (max_lateness));
2384     /* !!emergency!!, if we did not receive anything valid for more than a 
2385      * second, render it anyway so the user sees something */
2386     if (priv->last_in_time != -1 && start - priv->last_in_time > GST_SECOND) {
2387       late = FALSE;
2388       GST_DEBUG_OBJECT (basesink,
2389           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2390           GST_TIME_ARGS (priv->last_in_time));
2391     }
2392   }
2393
2394 done:
2395   if (!late) {
2396     priv->last_in_time = start;
2397   }
2398   return late;
2399
2400   /* all is fine */
2401 in_time:
2402   {
2403     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2404     goto done;
2405   }
2406 no_drop:
2407   {
2408     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2409     goto done;
2410   }
2411 not_buffer:
2412   {
2413     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2414     return FALSE;
2415   }
2416 no_timestamp:
2417   {
2418     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2419     return FALSE;
2420   }
2421 }
2422
2423 /* called before and after calling the render vmethod. It keeps track of how
2424  * much time was spent in the render method and is used to check if we are
2425  * flooded */
2426 static void
2427 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2428 {
2429   GstBaseSinkPrivate *priv;
2430
2431   priv = basesink->priv;
2432
2433   if (start) {
2434     priv->start = gst_util_get_timestamp ();
2435   } else {
2436     GstClockTime elapsed;
2437
2438     priv->stop = gst_util_get_timestamp ();
2439
2440     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2441
2442     if (priv->avg_render == -1)
2443       priv->avg_render = elapsed;
2444     else
2445       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2446
2447     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2448         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2449   }
2450 }
2451
2452 /* with STREAM_LOCK, PREROLL_LOCK,
2453  *
2454  * Synchronize the object on the clock and then render it.
2455  *
2456  * takes ownership of obj.
2457  */
2458 static GstFlowReturn
2459 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2460     GstMiniObject * obj)
2461 {
2462   GstFlowReturn ret = GST_FLOW_OK;
2463   GstBaseSinkClass *bclass;
2464   gboolean late = FALSE;
2465   GstBaseSinkPrivate *priv;
2466
2467   priv = basesink->priv;
2468
2469   /* synchronize this object, non syncable objects return OK
2470    * immediatly. */
2471   ret = gst_base_sink_do_sync (basesink, pad, obj, &late);
2472   if (G_UNLIKELY (ret != GST_FLOW_OK))
2473     goto sync_failed;
2474
2475   /* and now render, event or buffer. */
2476   if (G_LIKELY (GST_IS_BUFFER (obj))) {
2477     GstBuffer *buf;
2478
2479     /* drop late buffers unconditionally, let's hope it's unlikely */
2480     if (G_UNLIKELY (late))
2481       goto dropped;
2482
2483     buf = GST_BUFFER_CAST (obj);
2484
2485     gst_base_sink_set_last_buffer (basesink, buf);
2486
2487     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2488
2489     if (G_LIKELY (bclass->render)) {
2490       gint do_qos;
2491
2492       /* read once, to get same value before and after */
2493       do_qos = g_atomic_int_get (&priv->qos_enabled);
2494
2495       GST_DEBUG_OBJECT (basesink, "rendering buffer %p", obj);
2496
2497       /* record rendering time for QoS and stats */
2498       if (do_qos)
2499         gst_base_sink_do_render_stats (basesink, TRUE);
2500
2501       ret = bclass->render (basesink, buf);
2502
2503       priv->rendered++;
2504
2505       if (do_qos)
2506         gst_base_sink_do_render_stats (basesink, FALSE);
2507     }
2508   } else {
2509     GstEvent *event = GST_EVENT_CAST (obj);
2510     gboolean event_res = TRUE;
2511     GstEventType type;
2512
2513     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2514
2515     type = GST_EVENT_TYPE (event);
2516
2517     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
2518         gst_event_type_get_name (type));
2519
2520     if (bclass->event)
2521       event_res = bclass->event (basesink, event);
2522
2523     /* when we get here we could be flushing again when the event handler calls
2524      * _wait_eos(). We have to ignore this object in that case. */
2525     if (G_UNLIKELY (basesink->flushing))
2526       goto flushing;
2527
2528     if (G_LIKELY (event_res)) {
2529       guint32 seqnum;
2530
2531       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
2532       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
2533
2534       switch (type) {
2535         case GST_EVENT_EOS:
2536         {
2537           GstMessage *message;
2538
2539           /* the EOS event is completely handled so we mark
2540            * ourselves as being in the EOS state. eos is also 
2541            * protected by the object lock so we can read it when 
2542            * answering the POSITION query. */
2543           GST_OBJECT_LOCK (basesink);
2544           basesink->eos = TRUE;
2545           GST_OBJECT_UNLOCK (basesink);
2546
2547           /* ok, now we can post the message */
2548           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
2549
2550           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
2551           gst_message_set_seqnum (message, seqnum);
2552           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
2553           break;
2554         }
2555         case GST_EVENT_NEWSEGMENT:
2556           /* configure the segment */
2557           gst_base_sink_configure_segment (basesink, pad, event,
2558               &basesink->segment);
2559           break;
2560         default:
2561           break;
2562       }
2563     }
2564   }
2565
2566 done:
2567   gst_base_sink_perform_qos (basesink, late);
2568
2569   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
2570   gst_mini_object_unref (obj);
2571
2572   return ret;
2573
2574   /* ERRORS */
2575 sync_failed:
2576   {
2577     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
2578     goto done;
2579   }
2580 dropped:
2581   {
2582     priv->dropped++;
2583     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
2584     goto done;
2585   }
2586 flushing:
2587   {
2588     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
2589     gst_mini_object_unref (obj);
2590     return GST_FLOW_WRONG_STATE;
2591   }
2592 }
2593
2594 /* with STREAM_LOCK, PREROLL_LOCK
2595  *
2596  * Perform preroll on the given object. For buffers this means 
2597  * calling the preroll subclass method. 
2598  * If that succeeds, the state will be commited.
2599  *
2600  * function does not take ownership of obj.
2601  */
2602 static GstFlowReturn
2603 gst_base_sink_preroll_object (GstBaseSink * basesink, GstMiniObject * obj)
2604 {
2605   GstFlowReturn ret;
2606
2607   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
2608
2609   /* if it's a buffer, we need to call the preroll method */
2610   if (G_LIKELY (GST_IS_BUFFER (obj)) && basesink->priv->call_preroll) {
2611     GstBaseSinkClass *bclass;
2612     GstBuffer *buf;
2613     GstClockTime timestamp;
2614
2615     buf = GST_BUFFER_CAST (obj);
2616     timestamp = GST_BUFFER_TIMESTAMP (buf);
2617
2618     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
2619         GST_TIME_ARGS (timestamp));
2620
2621     gst_base_sink_set_last_buffer (basesink, buf);
2622
2623     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2624     if (bclass->preroll)
2625       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
2626         goto preroll_failed;
2627
2628     basesink->priv->call_preroll = FALSE;
2629   }
2630
2631   /* commit state */
2632   if (G_LIKELY (basesink->playing_async)) {
2633     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
2634       goto stopping;
2635   }
2636
2637   return GST_FLOW_OK;
2638
2639   /* ERRORS */
2640 preroll_failed:
2641   {
2642     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
2643     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
2644     return ret;
2645   }
2646 stopping:
2647   {
2648     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
2649     return GST_FLOW_WRONG_STATE;
2650   }
2651 }
2652
2653 /* with STREAM_LOCK, PREROLL_LOCK 
2654  *
2655  * Queue an object for rendering.
2656  * The first prerollable object queued will complete the preroll. If the
2657  * preroll queue if filled, we render all the objects in the queue.
2658  *
2659  * This function takes ownership of the object.
2660  */
2661 static GstFlowReturn
2662 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
2663     GstMiniObject * obj, gboolean prerollable)
2664 {
2665   GstFlowReturn ret = GST_FLOW_OK;
2666   gint length;
2667   GQueue *q;
2668
2669   if (G_UNLIKELY (basesink->need_preroll)) {
2670     if (G_LIKELY (prerollable))
2671       basesink->preroll_queued++;
2672
2673     length = basesink->preroll_queued;
2674
2675     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
2676
2677     /* first prerollable item needs to finish the preroll */
2678     if (length == 1) {
2679       ret = gst_base_sink_preroll_object (basesink, obj);
2680       if (G_UNLIKELY (ret != GST_FLOW_OK))
2681         goto preroll_failed;
2682     }
2683     /* need to recheck if we need preroll, commmit state during preroll 
2684      * could have made us not need more preroll. */
2685     if (G_UNLIKELY (basesink->need_preroll)) {
2686       /* see if we can render now, if we can't add the object to the preroll
2687        * queue. */
2688       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
2689         goto more_preroll;
2690     }
2691   }
2692
2693   /* we can start rendering (or blocking) the queued object
2694    * if any. */
2695   q = basesink->preroll_queue;
2696   while (G_UNLIKELY (!g_queue_is_empty (q))) {
2697     GstMiniObject *o;
2698
2699     o = g_queue_pop_head (q);
2700     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
2701
2702     /* do something with the return value */
2703     ret = gst_base_sink_render_object (basesink, pad, o);
2704     if (ret != GST_FLOW_OK)
2705       goto dequeue_failed;
2706   }
2707
2708   /* now render the object */
2709   ret = gst_base_sink_render_object (basesink, pad, obj);
2710   basesink->preroll_queued = 0;
2711
2712   return ret;
2713
2714   /* special cases */
2715 preroll_failed:
2716   {
2717     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
2718         gst_flow_get_name (ret));
2719     gst_mini_object_unref (obj);
2720     return ret;
2721   }
2722 more_preroll:
2723   {
2724     /* add object to the queue and return */
2725     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
2726         length, basesink->preroll_queue_max_len);
2727     g_queue_push_tail (basesink->preroll_queue, obj);
2728     return GST_FLOW_OK;
2729   }
2730 dequeue_failed:
2731   {
2732     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
2733         gst_flow_get_name (ret));
2734     gst_mini_object_unref (obj);
2735     return ret;
2736   }
2737 }
2738
2739 /* with STREAM_LOCK
2740  *
2741  * This function grabs the PREROLL_LOCK and adds the object to
2742  * the queue.
2743  *
2744  * This function takes ownership of obj.
2745  */
2746 static GstFlowReturn
2747 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
2748     GstMiniObject * obj, gboolean prerollable)
2749 {
2750   GstFlowReturn ret;
2751
2752   GST_PAD_PREROLL_LOCK (pad);
2753   if (G_UNLIKELY (basesink->flushing))
2754     goto flushing;
2755
2756   if (G_UNLIKELY (basesink->priv->received_eos))
2757     goto was_eos;
2758
2759   ret = gst_base_sink_queue_object_unlocked (basesink, pad, obj, prerollable);
2760   GST_PAD_PREROLL_UNLOCK (pad);
2761
2762   return ret;
2763
2764   /* ERRORS */
2765 flushing:
2766   {
2767     GST_DEBUG_OBJECT (basesink, "sink is flushing");
2768     GST_PAD_PREROLL_UNLOCK (pad);
2769     gst_mini_object_unref (obj);
2770     return GST_FLOW_WRONG_STATE;
2771   }
2772 was_eos:
2773   {
2774     GST_DEBUG_OBJECT (basesink,
2775         "we are EOS, dropping object, return UNEXPECTED");
2776     GST_PAD_PREROLL_UNLOCK (pad);
2777     gst_mini_object_unref (obj);
2778     return GST_FLOW_UNEXPECTED;
2779   }
2780 }
2781
2782 static void
2783 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
2784 {
2785   /* make sure we are not blocked on the clock also clear any pending
2786    * eos state. */
2787   gst_base_sink_set_flushing (basesink, pad, TRUE);
2788
2789   /* we grab the stream lock but that is not needed since setting the
2790    * sink to flushing would make sure no state commit is being done
2791    * anymore */
2792   GST_PAD_STREAM_LOCK (pad);
2793   gst_base_sink_reset_qos (basesink);
2794   if (basesink->priv->async_enabled) {
2795     /* and we need to commit our state again on the next
2796      * prerolled buffer */
2797     basesink->playing_async = TRUE;
2798     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
2799   } else {
2800     basesink->priv->have_latency = TRUE;
2801     basesink->need_preroll = FALSE;
2802   }
2803   gst_base_sink_set_last_buffer (basesink, NULL);
2804   GST_PAD_STREAM_UNLOCK (pad);
2805 }
2806
2807 static void
2808 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
2809 {
2810   /* unset flushing so we can accept new data, this also flushes out any EOS
2811    * event. */
2812   gst_base_sink_set_flushing (basesink, pad, FALSE);
2813
2814   /* for position reporting */
2815   GST_OBJECT_LOCK (basesink);
2816   basesink->priv->current_sstart = -1;
2817   basesink->priv->current_sstop = -1;
2818   basesink->priv->eos_rtime = -1;
2819   basesink->priv->call_preroll = TRUE;
2820   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
2821     /* we need new segment info after the flush. */
2822     basesink->have_newsegment = FALSE;
2823     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
2824     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
2825   }
2826   GST_OBJECT_UNLOCK (basesink);
2827 }
2828
2829 static gboolean
2830 gst_base_sink_event (GstPad * pad, GstEvent * event)
2831 {
2832   GstBaseSink *basesink;
2833   gboolean result = TRUE;
2834   GstBaseSinkClass *bclass;
2835
2836   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
2837
2838   bclass = GST_BASE_SINK_GET_CLASS (basesink);
2839
2840   GST_DEBUG_OBJECT (basesink, "event %p (%s)", event,
2841       GST_EVENT_TYPE_NAME (event));
2842
2843   switch (GST_EVENT_TYPE (event)) {
2844     case GST_EVENT_EOS:
2845     {
2846       GstFlowReturn ret;
2847
2848       GST_PAD_PREROLL_LOCK (pad);
2849       if (G_UNLIKELY (basesink->flushing))
2850         goto flushing;
2851
2852       if (G_UNLIKELY (basesink->priv->received_eos)) {
2853         /* we can't accept anything when we are EOS */
2854         result = FALSE;
2855         gst_event_unref (event);
2856       } else {
2857         /* we set the received EOS flag here so that we can use it when testing if
2858          * we are prerolled and to refuse more buffers. */
2859         basesink->priv->received_eos = TRUE;
2860
2861         /* EOS is a prerollable object, we call the unlocked version because it
2862          * does not check the received_eos flag. */
2863         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
2864             GST_MINI_OBJECT_CAST (event), TRUE);
2865         if (G_UNLIKELY (ret != GST_FLOW_OK))
2866           result = FALSE;
2867       }
2868       GST_PAD_PREROLL_UNLOCK (pad);
2869       break;
2870     }
2871     case GST_EVENT_NEWSEGMENT:
2872     {
2873       GstFlowReturn ret;
2874
2875       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
2876
2877       GST_PAD_PREROLL_LOCK (pad);
2878       if (G_UNLIKELY (basesink->flushing))
2879         goto flushing;
2880
2881       if (G_UNLIKELY (basesink->priv->received_eos)) {
2882         /* we can't accept anything when we are EOS */
2883         result = FALSE;
2884         gst_event_unref (event);
2885       } else {
2886         /* the new segment is a non prerollable item and does not block anything,
2887          * we need to configure the current clipping segment and insert the event 
2888          * in the queue to serialize it with the buffers for rendering. */
2889         gst_base_sink_configure_segment (basesink, pad, event,
2890             basesink->abidata.ABI.clip_segment);
2891
2892         ret =
2893             gst_base_sink_queue_object_unlocked (basesink, pad,
2894             GST_MINI_OBJECT_CAST (event), FALSE);
2895         if (G_UNLIKELY (ret != GST_FLOW_OK))
2896           result = FALSE;
2897         else {
2898           GST_OBJECT_LOCK (basesink);
2899           basesink->have_newsegment = TRUE;
2900           GST_OBJECT_UNLOCK (basesink);
2901         }
2902       }
2903       GST_PAD_PREROLL_UNLOCK (pad);
2904       break;
2905     }
2906     case GST_EVENT_FLUSH_START:
2907       if (bclass->event)
2908         bclass->event (basesink, event);
2909
2910       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
2911
2912       gst_base_sink_flush_start (basesink, pad);
2913
2914       gst_event_unref (event);
2915       break;
2916     case GST_EVENT_FLUSH_STOP:
2917       if (bclass->event)
2918         bclass->event (basesink, event);
2919
2920       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
2921
2922       gst_base_sink_flush_stop (basesink, pad);
2923
2924       gst_event_unref (event);
2925       break;
2926     default:
2927       /* other events are sent to queue or subclass depending on if they
2928        * are serialized. */
2929       if (GST_EVENT_IS_SERIALIZED (event)) {
2930         gst_base_sink_queue_object (basesink, pad,
2931             GST_MINI_OBJECT_CAST (event), FALSE);
2932       } else {
2933         if (bclass->event)
2934           bclass->event (basesink, event);
2935         gst_event_unref (event);
2936       }
2937       break;
2938   }
2939 done:
2940   gst_object_unref (basesink);
2941
2942   return result;
2943
2944   /* ERRORS */
2945 flushing:
2946   {
2947     GST_DEBUG_OBJECT (basesink, "we are flushing");
2948     GST_PAD_PREROLL_UNLOCK (pad);
2949     result = FALSE;
2950     gst_event_unref (event);
2951     goto done;
2952   }
2953 }
2954
2955 /* default implementation to calculate the start and end
2956  * timestamps on a buffer, subclasses can override
2957  */
2958 static void
2959 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
2960     GstClockTime * start, GstClockTime * end)
2961 {
2962   GstClockTime timestamp, duration;
2963
2964   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2965   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
2966
2967     /* get duration to calculate end time */
2968     duration = GST_BUFFER_DURATION (buffer);
2969     if (GST_CLOCK_TIME_IS_VALID (duration)) {
2970       *end = timestamp + duration;
2971     }
2972     *start = timestamp;
2973   }
2974 }
2975
2976 /* must be called with PREROLL_LOCK */
2977 static gboolean
2978 gst_base_sink_needs_preroll (GstBaseSink * basesink)
2979 {
2980   gboolean is_prerolled, res;
2981
2982   /* we have 2 cases where the PREROLL_LOCK is released:
2983    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
2984    *  2) we are syncing on the clock
2985    */
2986   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
2987   res = !is_prerolled;
2988
2989   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
2990       basesink->have_preroll, basesink->priv->received_eos, res);
2991
2992   return res;
2993 }
2994
2995 /* with STREAM_LOCK, PREROLL_LOCK 
2996  *
2997  * Takes a buffer and compare the timestamps with the last segment.
2998  * If the buffer falls outside of the segment boundaries, drop it.
2999  * Else queue the buffer for preroll and rendering.
3000  *
3001  * This function takes ownership of the buffer.
3002  */
3003 static GstFlowReturn
3004 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3005     GstBuffer * buf)
3006 {
3007   GstBaseSinkClass *bclass;
3008   GstFlowReturn result;
3009   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3010   GstSegment *clip_segment;
3011
3012   if (G_UNLIKELY (basesink->flushing))
3013     goto flushing;
3014
3015   if (G_UNLIKELY (basesink->priv->received_eos))
3016     goto was_eos;
3017
3018   /* for code clarity */
3019   clip_segment = basesink->abidata.ABI.clip_segment;
3020
3021   if (G_UNLIKELY (!basesink->have_newsegment)) {
3022     gboolean sync;
3023
3024     sync = gst_base_sink_get_sync (basesink);
3025     if (sync) {
3026       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3027           (_("Internal data flow problem.")),
3028           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3029     }
3030
3031     /* this means this sink will assume timestamps start from 0 */
3032     GST_OBJECT_LOCK (basesink);
3033     clip_segment->start = 0;
3034     clip_segment->stop = -1;
3035     basesink->segment.start = 0;
3036     basesink->segment.stop = -1;
3037     basesink->have_newsegment = TRUE;
3038     GST_OBJECT_UNLOCK (basesink);
3039   }
3040
3041   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3042
3043   /* check if the buffer needs to be dropped, we first ask the subclass for the
3044    * start and end */
3045   if (bclass->get_times)
3046     bclass->get_times (basesink, buf, &start, &end);
3047
3048   if (start == -1) {
3049     /* if the subclass does not want sync, we use our own values so that we at
3050      * least clip the buffer to the segment */
3051     gst_base_sink_get_times (basesink, buf, &start, &end);
3052   }
3053
3054   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3055       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3056
3057   /* a dropped buffer does not participate in anything */
3058   if (GST_CLOCK_TIME_IS_VALID (start) &&
3059       (clip_segment->format == GST_FORMAT_TIME)) {
3060     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3061                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3062       goto out_of_segment;
3063   }
3064
3065   /* now we can process the buffer in the queue, this function takes ownership
3066    * of the buffer */
3067   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3068       GST_MINI_OBJECT_CAST (buf), TRUE);
3069
3070   return result;
3071
3072   /* ERRORS */
3073 flushing:
3074   {
3075     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3076     gst_buffer_unref (buf);
3077     return GST_FLOW_WRONG_STATE;
3078   }
3079 was_eos:
3080   {
3081     GST_DEBUG_OBJECT (basesink,
3082         "we are EOS, dropping object, return UNEXPECTED");
3083     gst_buffer_unref (buf);
3084     return GST_FLOW_UNEXPECTED;
3085   }
3086 out_of_segment:
3087   {
3088     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3089     gst_buffer_unref (buf);
3090     return GST_FLOW_OK;
3091   }
3092 }
3093
3094 /* with STREAM_LOCK
3095  */
3096 static GstFlowReturn
3097 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3098 {
3099   GstBaseSink *basesink;
3100   GstFlowReturn result;
3101
3102   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3103
3104   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3105     goto wrong_mode;
3106
3107   GST_PAD_PREROLL_LOCK (pad);
3108   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3109   GST_PAD_PREROLL_UNLOCK (pad);
3110
3111 done:
3112   return result;
3113
3114   /* ERRORS */
3115 wrong_mode:
3116   {
3117     GST_OBJECT_LOCK (pad);
3118     GST_WARNING_OBJECT (basesink,
3119         "Push on pad %s:%s, but it was not activated in push mode",
3120         GST_DEBUG_PAD_NAME (pad));
3121     GST_OBJECT_UNLOCK (pad);
3122     gst_buffer_unref (buf);
3123     /* we don't post an error message this will signal to the peer
3124      * pushing that EOS is reached. */
3125     result = GST_FLOW_UNEXPECTED;
3126     goto done;
3127   }
3128 }
3129
3130 static gboolean
3131 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3132 {
3133   gboolean res = TRUE;
3134
3135   /* update our offset if the start/stop position was updated */
3136   if (segment->format == GST_FORMAT_BYTES) {
3137     segment->time = segment->start;
3138   } else if (segment->start == 0) {
3139     /* seek to start, we can implement a default for this. */
3140     segment->time = 0;
3141   } else {
3142     res = FALSE;
3143     GST_INFO_OBJECT (sink, "Can't do a default seek");
3144   }
3145
3146   return res;
3147 }
3148
3149 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3150
3151 static gboolean
3152 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3153     GstEvent * event, GstSegment * segment)
3154 {
3155   /* By default, we try one of 2 things:
3156    *   - For absolute seek positions, convert the requested position to our 
3157    *     configured processing format and place it in the output segment \
3158    *   - For relative seek positions, convert our current (input) values to the
3159    *     seek format, adjust by the relative seek offset and then convert back to
3160    *     the processing format
3161    */
3162   GstSeekType cur_type, stop_type;
3163   gint64 cur, stop;
3164   GstSeekFlags flags;
3165   GstFormat seek_format, dest_format;
3166   gdouble rate;
3167   gboolean update;
3168   gboolean res = TRUE;
3169
3170   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3171       &cur_type, &cur, &stop_type, &stop);
3172   dest_format = segment->format;
3173
3174   if (seek_format == dest_format) {
3175     gst_segment_set_seek (segment, rate, seek_format, flags,
3176         cur_type, cur, stop_type, stop, &update);
3177     return TRUE;
3178   }
3179
3180   if (cur_type != GST_SEEK_TYPE_NONE) {
3181     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3182     res =
3183         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3184         &cur);
3185     cur_type = GST_SEEK_TYPE_SET;
3186   }
3187
3188   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3189     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3190     res =
3191         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3192         &stop);
3193     stop_type = GST_SEEK_TYPE_SET;
3194   }
3195
3196   /* And finally, configure our output segment in the desired format */
3197   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3198       stop_type, stop, &update);
3199
3200   if (!res)
3201     goto no_format;
3202
3203   return res;
3204
3205 no_format:
3206   {
3207     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3208     return FALSE;
3209   }
3210 }
3211
3212 /* perform a seek, only executed in pull mode */
3213 static gboolean
3214 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3215 {
3216   gboolean flush;
3217   gdouble rate;
3218   GstFormat seek_format, dest_format;
3219   GstSeekFlags flags;
3220   GstSeekType cur_type, stop_type;
3221   gboolean seekseg_configured = FALSE;
3222   gint64 cur, stop;
3223   gboolean update, res = TRUE;
3224   GstSegment seeksegment;
3225
3226   dest_format = sink->segment.format;
3227
3228   if (event) {
3229     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3230     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3231         &cur_type, &cur, &stop_type, &stop);
3232
3233     flush = flags & GST_SEEK_FLAG_FLUSH;
3234   } else {
3235     GST_DEBUG_OBJECT (sink, "performing seek without event");
3236     flush = FALSE;
3237   }
3238
3239   if (flush) {
3240     GST_DEBUG_OBJECT (sink, "flushing upstream");
3241     gst_pad_push_event (pad, gst_event_new_flush_start ());
3242     gst_base_sink_flush_start (sink, pad);
3243   } else {
3244     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3245   }
3246
3247   GST_PAD_STREAM_LOCK (pad);
3248
3249   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3250    * copy the current segment info into the temp segment that we can actually
3251    * attempt the seek with. We only update the real segment if the seek suceeds. */
3252   if (!seekseg_configured) {
3253     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3254
3255     /* now configure the final seek segment */
3256     if (event) {
3257       if (sink->segment.format != seek_format) {
3258         /* OK, here's where we give the subclass a chance to convert the relative
3259          * seek into an absolute one in the processing format. We set up any
3260          * absolute seek above, before taking the stream lock. */
3261         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3262                 &seeksegment)) {
3263           GST_DEBUG_OBJECT (sink,
3264               "Preparing the seek failed after flushing. " "Aborting seek");
3265           res = FALSE;
3266         }
3267       } else {
3268         /* The seek format matches our processing format, no need to ask the
3269          * the subclass to configure the segment. */
3270         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3271             cur_type, cur, stop_type, stop, &update);
3272       }
3273     }
3274     /* Else, no seek event passed, so we're just (re)starting the 
3275        current segment. */
3276   }
3277
3278   if (res) {
3279     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3280         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3281         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3282
3283     /* do the seek, segment.last_stop contains the new position. */
3284     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3285   }
3286
3287
3288   if (flush) {
3289     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3290     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3291     gst_base_sink_flush_stop (sink, pad);
3292   } else if (res && sink->abidata.ABI.running) {
3293     /* we are running the current segment and doing a non-flushing seek, 
3294      * close the segment first based on the last_stop. */
3295     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3296         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3297   }
3298
3299   /* The subclass must have converted the segment to the processing format 
3300    * by now */
3301   if (res && seeksegment.format != dest_format) {
3302     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3303         "in the correct format. Aborting seek.");
3304     res = FALSE;
3305   }
3306
3307   /* if successfull seek, we update our real segment and push
3308    * out the new segment. */
3309   if (res) {
3310     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3311
3312     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3313       gst_element_post_message (GST_ELEMENT (sink),
3314           gst_message_new_segment_start (GST_OBJECT (sink),
3315               sink->segment.format, sink->segment.last_stop));
3316     }
3317   }
3318
3319   sink->priv->discont = TRUE;
3320   sink->abidata.ABI.running = TRUE;
3321
3322   GST_PAD_STREAM_UNLOCK (pad);
3323
3324   return res;
3325 }
3326
3327 static gboolean
3328 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3329 {
3330   GstBaseSinkPrivate *priv;
3331   GstBaseSinkClass *bclass;
3332   gboolean flush, intermediate;
3333   gdouble rate;
3334   GstFormat format;
3335   guint64 amount;
3336   guint seqnum;
3337
3338   bclass = GST_BASE_SINK_GET_CLASS (sink);
3339   priv = sink->priv;
3340
3341   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3342
3343   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3344   seqnum = gst_event_get_seqnum (event);
3345
3346   if (flush) {
3347     /* we need to call ::unlock before locking PREROLL_LOCK
3348      * since we lock it before going into ::render */
3349     if (bclass->unlock)
3350       bclass->unlock (sink);
3351
3352     GST_PAD_PREROLL_LOCK (sink->sinkpad);
3353     /* update the segment */
3354     priv->pending_step.seqnum = seqnum;
3355     priv->pending_step.format = format;
3356     priv->pending_step.amount = amount;
3357     priv->pending_step.remaining = amount;
3358     priv->pending_step.rate = rate;
3359     priv->pending_step.intermediate = intermediate;
3360     priv->pending_step.valid = TRUE;
3361
3362     /* now that we have the PREROLL lock, clear our unlock request */
3363     if (bclass->unlock_stop)
3364       bclass->unlock_stop (sink);
3365
3366     if (sink->priv->async_enabled) {
3367       /* and we need to commit our state again on the next
3368        * prerolled buffer */
3369       sink->playing_async = TRUE;
3370       priv->pending_step.need_preroll = TRUE;
3371       sink->need_preroll = FALSE;
3372       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
3373     } else {
3374       sink->priv->have_latency = TRUE;
3375       sink->need_preroll = FALSE;
3376     }
3377     priv->call_preroll = TRUE;
3378     gst_base_sink_set_last_buffer (sink, NULL);
3379     gst_base_sink_reset_qos (sink);
3380
3381     if (sink->clock_id) {
3382       gst_clock_id_unschedule (sink->clock_id);
3383     }
3384
3385     GST_DEBUG_OBJECT (sink, "signal waiter");
3386     GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
3387
3388     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
3389   }
3390
3391   return TRUE;
3392 }
3393
3394 /* with STREAM_LOCK
3395  */
3396 static void
3397 gst_base_sink_loop (GstPad * pad)
3398 {
3399   GstBaseSink *basesink;
3400   GstBuffer *buf = NULL;
3401   GstFlowReturn result;
3402   guint blocksize;
3403   guint64 offset;
3404
3405   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3406
3407   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
3408
3409   if ((blocksize = basesink->priv->blocksize) == 0)
3410     blocksize = -1;
3411
3412   offset = basesink->segment.last_stop;
3413
3414   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
3415       offset, blocksize);
3416
3417   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
3418   if (G_UNLIKELY (result != GST_FLOW_OK))
3419     goto paused;
3420
3421   if (G_UNLIKELY (buf == NULL))
3422     goto no_buffer;
3423
3424   offset += GST_BUFFER_SIZE (buf);
3425
3426   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
3427
3428   GST_PAD_PREROLL_LOCK (pad);
3429   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
3430   GST_PAD_PREROLL_UNLOCK (pad);
3431   if (G_UNLIKELY (result != GST_FLOW_OK))
3432     goto paused;
3433
3434   return;
3435
3436   /* ERRORS */
3437 paused:
3438   {
3439     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
3440         gst_flow_get_name (result));
3441     gst_pad_pause_task (pad);
3442     /* fatal errors and NOT_LINKED cause EOS */
3443     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
3444       if (result == GST_FLOW_UNEXPECTED) {
3445         /* perform EOS logic */
3446         if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3447           gst_element_post_message (GST_ELEMENT_CAST (basesink),
3448               gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
3449                   basesink->segment.format, basesink->segment.last_stop));
3450         } else {
3451           gst_base_sink_event (pad, gst_event_new_eos ());
3452         }
3453       } else {
3454         /* for fatal errors we post an error message, post the error
3455          * first so the app knows about the error first. */
3456         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3457             (_("Internal data stream error.")),
3458             ("stream stopped, reason %s", gst_flow_get_name (result)));
3459         gst_base_sink_event (pad, gst_event_new_eos ());
3460       }
3461     }
3462     return;
3463   }
3464 no_buffer:
3465   {
3466     GST_LOG_OBJECT (basesink, "no buffer, pausing");
3467     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
3468         (_("Internal data flow error.")), ("element returned NULL buffer"));
3469     result = GST_FLOW_ERROR;
3470     goto paused;
3471   }
3472 }
3473
3474 static gboolean
3475 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
3476     gboolean flushing)
3477 {
3478   GstBaseSinkClass *bclass;
3479
3480   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3481
3482   if (flushing) {
3483     /* unlock any subclasses, we need to do this before grabbing the
3484      * PREROLL_LOCK since we hold this lock before going into ::render. */
3485     if (bclass->unlock)
3486       bclass->unlock (basesink);
3487   }
3488
3489   GST_PAD_PREROLL_LOCK (pad);
3490   basesink->flushing = flushing;
3491   if (flushing) {
3492     /* step 1, now that we have the PREROLL lock, clear our unlock request */
3493     if (bclass->unlock_stop)
3494       bclass->unlock_stop (basesink);
3495
3496     /* set need_preroll before we unblock the clock. If the clock is unblocked
3497      * before timing out, we can reuse the buffer for preroll. */
3498     basesink->need_preroll = TRUE;
3499
3500     /* step 2, unblock clock sync (if any) or any other blocking thing */
3501     if (basesink->clock_id) {
3502       gst_clock_id_unschedule (basesink->clock_id);
3503     }
3504
3505     /* flush out the data thread if it's locked in finish_preroll, this will
3506      * also flush out the EOS state */
3507     GST_DEBUG_OBJECT (basesink,
3508         "flushing out data thread, need preroll to TRUE");
3509     gst_base_sink_preroll_queue_flush (basesink, pad);
3510   }
3511   GST_PAD_PREROLL_UNLOCK (pad);
3512
3513   return TRUE;
3514 }
3515
3516 static gboolean
3517 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
3518 {
3519   gboolean result;
3520
3521   if (active) {
3522     /* start task */
3523     result = gst_pad_start_task (basesink->sinkpad,
3524         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
3525   } else {
3526     /* step 2, make sure streaming finishes */
3527     result = gst_pad_stop_task (basesink->sinkpad);
3528   }
3529
3530   return result;
3531 }
3532
3533 static gboolean
3534 gst_base_sink_pad_activate (GstPad * pad)
3535 {
3536   gboolean result = FALSE;
3537   GstBaseSink *basesink;
3538
3539   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3540
3541   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
3542
3543   gst_base_sink_set_flushing (basesink, pad, FALSE);
3544
3545   /* we need to have the pull mode enabled */
3546   if (!basesink->can_activate_pull) {
3547     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
3548     goto fallback;
3549   }
3550
3551   /* check if downstreams supports pull mode at all */
3552   if (!gst_pad_check_pull_range (pad)) {
3553     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
3554     goto fallback;
3555   }
3556
3557   /* set the pad mode before starting the task so that it's in the
3558    * correct state for the new thread. also the sink set_caps and get_caps
3559    * function checks this */
3560   basesink->pad_mode = GST_ACTIVATE_PULL;
3561
3562   /* we first try to negotiate a format so that when we try to activate
3563    * downstream, it knows about our format */
3564   if (!gst_base_sink_negotiate_pull (basesink)) {
3565     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
3566     goto fallback;
3567   }
3568
3569   /* ok activate now */
3570   if (!gst_pad_activate_pull (pad, TRUE)) {
3571     /* clear any pending caps */
3572     GST_OBJECT_LOCK (basesink);
3573     gst_caps_replace (&basesink->priv->pull_caps, NULL);
3574     GST_OBJECT_UNLOCK (basesink);
3575     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
3576     goto fallback;
3577   }
3578
3579   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
3580   result = TRUE;
3581   goto done;
3582
3583   /* push mode fallback */
3584 fallback:
3585   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
3586   if ((result = gst_pad_activate_push (pad, TRUE))) {
3587     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
3588   }
3589
3590 done:
3591   if (!result) {
3592     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
3593     gst_base_sink_set_flushing (basesink, pad, TRUE);
3594   }
3595
3596   gst_object_unref (basesink);
3597
3598   return result;
3599 }
3600
3601 static gboolean
3602 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
3603 {
3604   gboolean result;
3605   GstBaseSink *basesink;
3606
3607   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3608
3609   if (active) {
3610     if (!basesink->can_activate_push) {
3611       result = FALSE;
3612       basesink->pad_mode = GST_ACTIVATE_NONE;
3613     } else {
3614       result = TRUE;
3615       basesink->pad_mode = GST_ACTIVATE_PUSH;
3616     }
3617   } else {
3618     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
3619       g_warning ("Internal GStreamer activation error!!!");
3620       result = FALSE;
3621     } else {
3622       gst_base_sink_set_flushing (basesink, pad, TRUE);
3623       result = TRUE;
3624       basesink->pad_mode = GST_ACTIVATE_NONE;
3625     }
3626   }
3627
3628   gst_object_unref (basesink);
3629
3630   return result;
3631 }
3632
3633 static gboolean
3634 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
3635 {
3636   GstCaps *caps;
3637   gboolean result;
3638
3639   result = FALSE;
3640
3641   /* this returns the intersection between our caps and the peer caps. If there
3642    * is no peer, it returns NULL and we can't operate in pull mode so we can
3643    * fail the negotiation. */
3644   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
3645   if (caps == NULL || gst_caps_is_empty (caps))
3646     goto no_caps_possible;
3647
3648   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
3649
3650   caps = gst_caps_make_writable (caps);
3651   /* get the first (prefered) format */
3652   gst_caps_truncate (caps);
3653   /* try to fixate */
3654   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
3655
3656   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
3657
3658   if (gst_caps_is_any (caps)) {
3659     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
3660         "allowing pull()");
3661     /* neither side has template caps in this case, so they are prepared for
3662        pull() without setcaps() */
3663     result = TRUE;
3664   } else if (gst_caps_is_fixed (caps)) {
3665     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
3666       goto could_not_set_caps;
3667
3668     GST_OBJECT_LOCK (basesink);
3669     gst_caps_replace (&basesink->priv->pull_caps, caps);
3670     GST_OBJECT_UNLOCK (basesink);
3671
3672     result = TRUE;
3673   }
3674
3675   gst_caps_unref (caps);
3676
3677   return result;
3678
3679 no_caps_possible:
3680   {
3681     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
3682     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
3683     if (caps)
3684       gst_caps_unref (caps);
3685     return FALSE;
3686   }
3687 could_not_set_caps:
3688   {
3689     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
3690     gst_caps_unref (caps);
3691     return FALSE;
3692   }
3693 }
3694
3695 /* this won't get called until we implement an activate function */
3696 static gboolean
3697 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
3698 {
3699   gboolean result = FALSE;
3700   GstBaseSink *basesink;
3701   GstBaseSinkClass *bclass;
3702
3703   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3704   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3705
3706   if (active) {
3707     GstFormat format;
3708     gint64 duration;
3709
3710     /* we mark we have a newsegment here because pull based
3711      * mode works just fine without having a newsegment before the
3712      * first buffer */
3713     format = GST_FORMAT_BYTES;
3714
3715     gst_segment_init (&basesink->segment, format);
3716     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
3717     GST_OBJECT_LOCK (basesink);
3718     basesink->have_newsegment = TRUE;
3719     GST_OBJECT_UNLOCK (basesink);
3720
3721     /* get the peer duration in bytes */
3722     result = gst_pad_query_peer_duration (pad, &format, &duration);
3723     if (result) {
3724       GST_DEBUG_OBJECT (basesink,
3725           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
3726       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
3727           duration);
3728       gst_segment_set_duration (&basesink->segment, format, duration);
3729     } else {
3730       GST_DEBUG_OBJECT (basesink, "unknown duration");
3731     }
3732
3733     if (bclass->activate_pull)
3734       result = bclass->activate_pull (basesink, TRUE);
3735     else
3736       result = FALSE;
3737
3738     if (!result)
3739       goto activate_failed;
3740
3741   } else {
3742     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
3743       g_warning ("Internal GStreamer activation error!!!");
3744       result = FALSE;
3745     } else {
3746       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
3747       if (bclass->activate_pull)
3748         result &= bclass->activate_pull (basesink, FALSE);
3749       basesink->pad_mode = GST_ACTIVATE_NONE;
3750       /* clear any pending caps */
3751       GST_OBJECT_LOCK (basesink);
3752       gst_caps_replace (&basesink->priv->pull_caps, NULL);
3753       GST_OBJECT_UNLOCK (basesink);
3754     }
3755   }
3756   gst_object_unref (basesink);
3757
3758   return result;
3759
3760   /* ERRORS */
3761 activate_failed:
3762   {
3763     /* reset, as starting the thread failed */
3764     basesink->pad_mode = GST_ACTIVATE_NONE;
3765
3766     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
3767     return FALSE;
3768   }
3769 }
3770
3771 /* send an event to our sinkpad peer. */
3772 static gboolean
3773 gst_base_sink_send_event (GstElement * element, GstEvent * event)
3774 {
3775   GstPad *pad;
3776   GstBaseSink *basesink = GST_BASE_SINK (element);
3777   gboolean forward, result = TRUE;
3778   GstActivateMode mode;
3779
3780   GST_OBJECT_LOCK (element);
3781   /* get the pad and the scheduling mode */
3782   pad = gst_object_ref (basesink->sinkpad);
3783   mode = basesink->pad_mode;
3784   GST_OBJECT_UNLOCK (element);
3785
3786   /* only push UPSTREAM events upstream */
3787   forward = GST_EVENT_IS_UPSTREAM (event);
3788
3789   switch (GST_EVENT_TYPE (event)) {
3790     case GST_EVENT_LATENCY:
3791     {
3792       GstClockTime latency;
3793
3794       gst_event_parse_latency (event, &latency);
3795
3796       /* store the latency. We use this to adjust the running_time before syncing
3797        * it to the clock. */
3798       GST_OBJECT_LOCK (element);
3799       basesink->priv->latency = latency;
3800       if (!basesink->priv->have_latency)
3801         forward = FALSE;
3802       GST_OBJECT_UNLOCK (element);
3803       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
3804           GST_TIME_ARGS (latency));
3805
3806       /* We forward this event so that all elements know about the global pipeline
3807        * latency. This is interesting for an element when it wants to figure out
3808        * when a particular piece of data will be rendered. */
3809       break;
3810     }
3811     case GST_EVENT_SEEK:
3812       /* in pull mode we will execute the seek */
3813       if (mode == GST_ACTIVATE_PULL)
3814         result = gst_base_sink_perform_seek (basesink, pad, event);
3815       break;
3816     case GST_EVENT_STEP:
3817       result = gst_base_sink_perform_step (basesink, pad, event);
3818       forward = FALSE;
3819       break;
3820     default:
3821       break;
3822   }
3823
3824   if (forward) {
3825     result = gst_pad_push_event (pad, event);
3826   } else {
3827     /* not forwarded, unref the event */
3828     gst_event_unref (event);
3829   }
3830
3831   gst_object_unref (pad);
3832   return result;
3833 }
3834
3835 static gboolean
3836 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
3837 {
3838   GstPad *peer;
3839   gboolean res = FALSE;
3840
3841   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
3842     res = gst_pad_query (peer, query);
3843     gst_object_unref (peer);
3844   }
3845   return res;
3846 }
3847
3848 /* get the end position of the last seen object, this is used
3849  * for EOS and for making sure that we don't report a position we
3850  * have not reached yet. With LOCK. */
3851 static gboolean
3852 gst_base_sink_get_position_last (GstBaseSink * basesink, GstFormat format,
3853     gint64 * cur)
3854 {
3855   GstFormat oformat;
3856   GstSegment *segment;
3857   gboolean ret = TRUE;
3858
3859   segment = &basesink->segment;
3860   oformat = segment->format;
3861
3862   if (oformat == GST_FORMAT_TIME) {
3863     /* return last observed stream time, we keep the stream time around in the
3864      * time format. */
3865     *cur = basesink->priv->current_sstop;
3866   } else {
3867     /* convert last stop to stream time */
3868     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
3869   }
3870
3871   if (*cur != -1 && oformat != format) {
3872     GST_OBJECT_UNLOCK (basesink);
3873     /* convert to the target format if we need to, release lock first */
3874     ret =
3875         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
3876     if (!ret)
3877       *cur = -1;
3878     GST_OBJECT_LOCK (basesink);
3879   }
3880
3881   GST_DEBUG_OBJECT (basesink, "POSITION: %" GST_TIME_FORMAT,
3882       GST_TIME_ARGS (*cur));
3883
3884   return ret;
3885 }
3886
3887 /* get the position when we are PAUSED, this is the stream time of the buffer
3888  * that prerolled. If no buffer is prerolled (we are still flushing), this
3889  * value will be -1. With LOCK. */
3890 static gboolean
3891 gst_base_sink_get_position_paused (GstBaseSink * basesink, GstFormat format,
3892     gint64 * cur)
3893 {
3894   gboolean res;
3895   gint64 time;
3896   GstSegment *segment;
3897   GstFormat oformat;
3898
3899   /* we don't use the clip segment in pull mode, when seeking we update the
3900    * main segment directly with the new segment values without it having to be
3901    * activated by the rendering after preroll */
3902   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
3903     segment = basesink->abidata.ABI.clip_segment;
3904   else
3905     segment = &basesink->segment;
3906   oformat = segment->format;
3907
3908   if (oformat == GST_FORMAT_TIME) {
3909     *cur = basesink->priv->current_sstart;
3910   } else {
3911     *cur = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
3912   }
3913
3914   time = segment->time;
3915
3916   if (*cur != -1) {
3917     *cur = MAX (*cur, time);
3918     GST_DEBUG_OBJECT (basesink, "POSITION as max: %" GST_TIME_FORMAT
3919         ", time %" GST_TIME_FORMAT, GST_TIME_ARGS (*cur), GST_TIME_ARGS (time));
3920   } else {
3921     /* we have no buffer, use the segment times. */
3922     if (segment->rate >= 0.0) {
3923       /* forward, next position is always the time of the segment */
3924       *cur = time;
3925       GST_DEBUG_OBJECT (basesink, "POSITION as time: %" GST_TIME_FORMAT,
3926           GST_TIME_ARGS (*cur));
3927     } else {
3928       /* reverse, next expected timestamp is segment->stop. We use the function
3929        * to get things right for negative applied_rates. */
3930       *cur = gst_segment_to_stream_time (segment, oformat, segment->stop);
3931       GST_DEBUG_OBJECT (basesink, "reverse POSITION: %" GST_TIME_FORMAT,
3932           GST_TIME_ARGS (*cur));
3933     }
3934   }
3935
3936   res = (*cur != -1);
3937   if (res && oformat != format) {
3938     GST_OBJECT_UNLOCK (basesink);
3939     res =
3940         gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur);
3941     if (!res)
3942       *cur = -1;
3943     GST_OBJECT_LOCK (basesink);
3944   }
3945
3946   return res;
3947 }
3948
3949 static gboolean
3950 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
3951     gint64 * cur, gboolean * upstream)
3952 {
3953   GstClock *clock;
3954   gboolean res = FALSE;
3955   GstFormat oformat, tformat;
3956   GstClockTime now, base, latency;
3957   gint64 time, accum, duration;
3958   gdouble rate;
3959   gint64 last;
3960
3961   GST_OBJECT_LOCK (basesink);
3962   /* our intermediate time format */
3963   tformat = GST_FORMAT_TIME;
3964   /* get the format in the segment */
3965   oformat = basesink->segment.format;
3966
3967   /* can only give answer based on the clock if not EOS */
3968   if (G_UNLIKELY (basesink->eos))
3969     goto in_eos;
3970
3971   /* we can only get the segment when we are not NULL or READY */
3972   if (!basesink->have_newsegment)
3973     goto wrong_state;
3974
3975   /* when not in PLAYING or when we're busy with a state change, we
3976    * cannot read from the clock so we report time based on the
3977    * last seen timestamp. */
3978   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
3979       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING)
3980     goto in_pause;
3981
3982   /* we need to sync on the clock. */
3983   if (basesink->sync == FALSE)
3984     goto no_sync;
3985
3986   /* and we need a clock */
3987   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
3988     goto no_sync;
3989
3990   /* collect all data we need holding the lock */
3991   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
3992     time = basesink->segment.time;
3993   else
3994     time = 0;
3995
3996   if (GST_CLOCK_TIME_IS_VALID (basesink->segment.stop))
3997     duration = basesink->segment.stop - basesink->segment.start;
3998   else
3999     duration = 0;
4000
4001   base = GST_ELEMENT_CAST (basesink)->base_time;
4002   accum = basesink->segment.accum;
4003   rate = basesink->segment.rate * basesink->segment.applied_rate;
4004   latency = basesink->priv->latency;
4005
4006   gst_object_ref (clock);
4007
4008   /* this function might release the LOCK */
4009   gst_base_sink_get_position_last (basesink, format, &last);
4010
4011   /* need to release the object lock before we can get the time, 
4012    * a clock might take the LOCK of the provider, which could be
4013    * a basesink subclass. */
4014   GST_OBJECT_UNLOCK (basesink);
4015
4016   now = gst_clock_get_time (clock);
4017
4018   if (oformat != tformat) {
4019     /* convert accum, time and duration to time */
4020     if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4021             &accum))
4022       goto convert_failed;
4023     if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration, &tformat,
4024             &duration))
4025       goto convert_failed;
4026     if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4027             &time))
4028       goto convert_failed;
4029   }
4030
4031   /* subtract base time and accumulated time from the clock time. 
4032    * Make sure we don't go negative. This is the current time in
4033    * the segment which we need to scale with the combined 
4034    * rate and applied rate. */
4035   base += accum;
4036   base += latency;
4037   base = MIN (now, base);
4038
4039   /* for negative rates we need to count back from from the segment
4040    * duration. */
4041   if (rate < 0.0)
4042     time += duration;
4043
4044   *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4045
4046   /* never report more than last seen position */
4047   if (last != -1)
4048     *cur = MIN (last, *cur);
4049
4050   gst_object_unref (clock);
4051
4052   GST_DEBUG_OBJECT (basesink,
4053       "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4054       GST_TIME_FORMAT " + time %" GST_TIME_FORMAT,
4055       GST_TIME_ARGS (now), GST_TIME_ARGS (base),
4056       GST_TIME_ARGS (accum), GST_TIME_ARGS (time));
4057
4058   if (oformat != format) {
4059     /* convert time to final format */
4060     if (!gst_pad_query_convert (basesink->sinkpad, tformat, *cur, &format, cur))
4061       goto convert_failed;
4062   }
4063
4064   res = TRUE;
4065
4066 done:
4067   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4068       res, GST_TIME_ARGS (*cur));
4069   return res;
4070
4071   /* special cases */
4072 in_eos:
4073   {
4074     GST_DEBUG_OBJECT (basesink, "position in EOS");
4075     res = gst_base_sink_get_position_last (basesink, format, cur);
4076     GST_OBJECT_UNLOCK (basesink);
4077     goto done;
4078   }
4079 in_pause:
4080   {
4081     GST_DEBUG_OBJECT (basesink, "position in PAUSED");
4082     res = gst_base_sink_get_position_paused (basesink, format, cur);
4083     GST_OBJECT_UNLOCK (basesink);
4084     goto done;
4085   }
4086 wrong_state:
4087   {
4088     /* in NULL or READY we always return FALSE and -1 */
4089     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4090     res = FALSE;
4091     *cur = -1;
4092     GST_OBJECT_UNLOCK (basesink);
4093     goto done;
4094   }
4095 no_sync:
4096   {
4097     /* report last seen timestamp if any, else ask upstream to answer */
4098     if ((*cur = basesink->priv->current_sstart) != -1)
4099       res = TRUE;
4100     else
4101       *upstream = TRUE;
4102
4103     GST_DEBUG_OBJECT (basesink, "no sync, res %d, POSITION %" GST_TIME_FORMAT,
4104         res, GST_TIME_ARGS (*cur));
4105     GST_OBJECT_UNLOCK (basesink);
4106     return res;
4107   }
4108 convert_failed:
4109   {
4110     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4111     *upstream = TRUE;
4112     return FALSE;
4113   }
4114 }
4115
4116 static gboolean
4117 gst_base_sink_query (GstElement * element, GstQuery * query)
4118 {
4119   gboolean res = FALSE;
4120
4121   GstBaseSink *basesink = GST_BASE_SINK (element);
4122
4123   switch (GST_QUERY_TYPE (query)) {
4124     case GST_QUERY_POSITION:
4125     {
4126       gint64 cur = 0;
4127       GstFormat format;
4128       gboolean upstream = FALSE;
4129
4130       gst_query_parse_position (query, &format, NULL);
4131
4132       GST_DEBUG_OBJECT (basesink, "position format %d", format);
4133
4134       /* first try to get the position based on the clock */
4135       if ((res =
4136               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4137         gst_query_set_position (query, format, cur);
4138       } else if (upstream) {
4139         /* fallback to peer query */
4140         res = gst_base_sink_peer_query (basesink, query);
4141       }
4142       break;
4143     }
4144     case GST_QUERY_DURATION:
4145     {
4146       GstFormat format, uformat;
4147       gint64 duration, uduration;
4148
4149       gst_query_parse_duration (query, &format, NULL);
4150
4151       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4152           gst_format_get_name (format));
4153
4154       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4155         uformat = GST_FORMAT_BYTES;
4156
4157         /* get the duration in bytes, in pull mode that's all we are sure to
4158          * know. We have to explicitly get this value from upstream instead of
4159          * using our cached value because it might change. Duration caching
4160          * should be done at a higher level. */
4161         res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4162             &uduration);
4163         if (res) {
4164           gst_segment_set_duration (&basesink->segment, uformat, uduration);
4165           if (format != uformat) {
4166             /* convert to the requested format */
4167             res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4168                 &format, &duration);
4169           } else {
4170             duration = uduration;
4171           }
4172           if (res) {
4173             /* set the result */
4174             gst_query_set_duration (query, format, duration);
4175           }
4176         }
4177       } else {
4178         /* in push mode we simply forward upstream */
4179         res = gst_base_sink_peer_query (basesink, query);
4180       }
4181       break;
4182     }
4183     case GST_QUERY_LATENCY:
4184     {
4185       gboolean live, us_live;
4186       GstClockTime min, max;
4187
4188       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4189                   &max))) {
4190         gst_query_set_latency (query, live, min, max);
4191       }
4192       break;
4193     }
4194     case GST_QUERY_JITTER:
4195       break;
4196     case GST_QUERY_RATE:
4197       /* gst_query_set_rate (query, basesink->segment_rate); */
4198       res = TRUE;
4199       break;
4200     case GST_QUERY_SEGMENT:
4201     {
4202       /* FIXME, bring start/stop to stream time */
4203       gst_query_set_segment (query, basesink->segment.rate,
4204           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4205       break;
4206     }
4207     case GST_QUERY_SEEKING:
4208     case GST_QUERY_CONVERT:
4209     case GST_QUERY_FORMATS:
4210     default:
4211       res = gst_base_sink_peer_query (basesink, query);
4212       break;
4213   }
4214   return res;
4215 }
4216
4217 static GstStateChangeReturn
4218 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4219 {
4220   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4221   GstBaseSink *basesink = GST_BASE_SINK (element);
4222   GstBaseSinkClass *bclass;
4223   GstBaseSinkPrivate *priv;
4224
4225   priv = basesink->priv;
4226
4227   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4228
4229   switch (transition) {
4230     case GST_STATE_CHANGE_NULL_TO_READY:
4231       if (bclass->start)
4232         if (!bclass->start (basesink))
4233           goto start_failed;
4234       break;
4235     case GST_STATE_CHANGE_READY_TO_PAUSED:
4236       /* need to complete preroll before this state change completes, there
4237        * is no data flow in READY so we can safely assume we need to preroll. */
4238       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4239       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4240       basesink->have_newsegment = FALSE;
4241       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4242       gst_segment_init (basesink->abidata.ABI.clip_segment,
4243           GST_FORMAT_UNDEFINED);
4244       basesink->offset = 0;
4245       basesink->have_preroll = FALSE;
4246       basesink->need_preroll = TRUE;
4247       basesink->playing_async = TRUE;
4248       priv->current_sstart = -1;
4249       priv->current_sstop = -1;
4250       priv->eos_rtime = -1;
4251       priv->latency = 0;
4252       basesink->eos = FALSE;
4253       priv->received_eos = FALSE;
4254       gst_base_sink_reset_qos (basesink);
4255       priv->commited = FALSE;
4256       priv->call_preroll = TRUE;
4257       priv->current_step.valid = FALSE;
4258       priv->pending_step.valid = FALSE;
4259       if (priv->async_enabled) {
4260         GST_DEBUG_OBJECT (basesink, "doing async state change");
4261         /* when async enabled, post async-start message and return ASYNC from
4262          * the state change function */
4263         ret = GST_STATE_CHANGE_ASYNC;
4264         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4265             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4266       } else {
4267         priv->have_latency = TRUE;
4268       }
4269       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4270       break;
4271     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4272       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4273       if (!gst_base_sink_needs_preroll (basesink)) {
4274         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4275         /* no preroll needed anymore now. */
4276         basesink->playing_async = FALSE;
4277         basesink->need_preroll = FALSE;
4278         if (basesink->eos) {
4279           GstMessage *message;
4280
4281           /* need to post EOS message here */
4282           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4283           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4284           gst_message_set_seqnum (message, basesink->priv->seqnum);
4285           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4286         } else {
4287           GST_DEBUG_OBJECT (basesink, "signal preroll");
4288           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
4289         }
4290       } else {
4291         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4292         basesink->need_preroll = TRUE;
4293         basesink->playing_async = TRUE;
4294         priv->call_preroll = TRUE;
4295         priv->commited = FALSE;
4296         if (priv->async_enabled) {
4297           GST_DEBUG_OBJECT (basesink, "doing async state change");
4298           ret = GST_STATE_CHANGE_ASYNC;
4299           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4300               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4301         }
4302       }
4303       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4304       break;
4305     default:
4306       break;
4307   }
4308
4309   {
4310     GstStateChangeReturn bret;
4311
4312     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4313     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
4314       goto activate_failed;
4315   }
4316
4317   switch (transition) {
4318     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4319       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
4320       /* FIXME, make sure we cannot enter _render first */
4321
4322       /* we need to call ::unlock before locking PREROLL_LOCK
4323        * since we lock it before going into ::render */
4324       if (bclass->unlock)
4325         bclass->unlock (basesink);
4326
4327       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4328       /* now that we have the PREROLL lock, clear our unlock request */
4329       if (bclass->unlock_stop)
4330         bclass->unlock_stop (basesink);
4331
4332       /* we need preroll again and we set the flag before unlocking the clockid
4333        * because if the clockid is unlocked before a current buffer expired, we
4334        * can use that buffer to preroll with */
4335       basesink->need_preroll = TRUE;
4336
4337       if (basesink->clock_id) {
4338         gst_clock_id_unschedule (basesink->clock_id);
4339       }
4340
4341       /* if we don't have a preroll buffer we need to wait for a preroll and
4342        * return ASYNC. */
4343       if (!gst_base_sink_needs_preroll (basesink)) {
4344         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
4345         basesink->playing_async = FALSE;
4346       } else {
4347         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
4348           ret = GST_STATE_CHANGE_SUCCESS;
4349         } else {
4350           GST_DEBUG_OBJECT (basesink,
4351               "PLAYING to PAUSED, we are not prerolled");
4352           basesink->playing_async = TRUE;
4353           priv->commited = FALSE;
4354           priv->call_preroll = TRUE;
4355           if (priv->async_enabled) {
4356             GST_DEBUG_OBJECT (basesink, "doing async state change");
4357             ret = GST_STATE_CHANGE_ASYNC;
4358             gst_element_post_message (GST_ELEMENT_CAST (basesink),
4359                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
4360                     FALSE));
4361           }
4362         }
4363       }
4364       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
4365           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
4366
4367       gst_base_sink_reset_qos (basesink);
4368       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4369       break;
4370     case GST_STATE_CHANGE_PAUSED_TO_READY:
4371       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4372       /* start by reseting our position state with the object lock so that the
4373        * position query gets the right idea. We do this before we post the
4374        * messages so that the message handlers pick this up. */
4375       GST_OBJECT_LOCK (basesink);
4376       basesink->have_newsegment = FALSE;
4377       priv->current_sstart = -1;
4378       priv->current_sstop = -1;
4379       priv->have_latency = FALSE;
4380       GST_OBJECT_UNLOCK (basesink);
4381
4382       gst_base_sink_set_last_buffer (basesink, NULL);
4383       priv->call_preroll = FALSE;
4384
4385       if (!priv->commited) {
4386         if (priv->async_enabled) {
4387           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
4388
4389           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4390               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
4391                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
4392
4393           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4394               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
4395         }
4396         priv->commited = TRUE;
4397       } else {
4398         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
4399       }
4400       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4401       break;
4402     case GST_STATE_CHANGE_READY_TO_NULL:
4403       if (bclass->stop) {
4404         if (!bclass->stop (basesink)) {
4405           GST_WARNING_OBJECT (basesink, "failed to stop");
4406         }
4407       }
4408       gst_base_sink_set_last_buffer (basesink, NULL);
4409       priv->call_preroll = FALSE;
4410       break;
4411     default:
4412       break;
4413   }
4414
4415   return ret;
4416
4417   /* ERRORS */
4418 start_failed:
4419   {
4420     GST_DEBUG_OBJECT (basesink, "failed to start");
4421     return GST_STATE_CHANGE_FAILURE;
4422   }
4423 activate_failed:
4424   {
4425     GST_DEBUG_OBJECT (basesink,
4426         "element failed to change states -- activation problem?");
4427     return GST_STATE_CHANGE_FAILURE;
4428   }
4429 }