gst/gstutils.c: RPAD fixes all around.
[platform/upstream/gstreamer.git] / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbasesink.c: 
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #  include "config.h"
24 #endif
25
26 #include "gstbasesink.h"
27 #include <gst/gstmarshal.h>
28
29 GST_DEBUG_CATEGORY_STATIC (gst_basesink_debug);
30 #define GST_CAT_DEFAULT gst_basesink_debug
31
32 /* #define DEBUGGING */
33 #ifdef DEBUGGING
34 #define DEBUG(str,args...) g_print (str,##args)
35 #else
36 #define DEBUG(str,args...)
37 #endif
38
39 /* BaseSink signals and properties */
40 enum
41 {
42   /* FILL ME */
43   SIGNAL_HANDOFF,
44   LAST_SIGNAL
45 };
46
47 /* FIXME, need to figure out a better way to handle the pull mode */
48 #define DEFAULT_SIZE 1024
49 #define DEFAULT_HAS_LOOP FALSE
50 #define DEFAULT_HAS_CHAIN TRUE
51
52 enum
53 {
54   PROP_0,
55   PROP_HAS_LOOP,
56   PROP_HAS_CHAIN,
57   PROP_PREROLL_QUEUE_LEN
58 };
59
60 static GstElementClass *parent_class = NULL;
61
62 static void gst_basesink_base_init (gpointer g_class);
63 static void gst_basesink_class_init (GstBaseSinkClass * klass);
64 static void gst_basesink_init (GstBaseSink * trans, gpointer g_class);
65 static void gst_basesink_finalize (GObject * object);
66
67 GType
68 gst_basesink_get_type (void)
69 {
70   static GType basesink_type = 0;
71
72   if (!basesink_type) {
73     static const GTypeInfo basesink_info = {
74       sizeof (GstBaseSinkClass),
75       (GBaseInitFunc) gst_basesink_base_init,
76       NULL,
77       (GClassInitFunc) gst_basesink_class_init,
78       NULL,
79       NULL,
80       sizeof (GstBaseSink),
81       0,
82       (GInstanceInitFunc) gst_basesink_init,
83     };
84
85     basesink_type = g_type_register_static (GST_TYPE_ELEMENT,
86         "GstBaseSink", &basesink_info, G_TYPE_FLAG_ABSTRACT);
87   }
88   return basesink_type;
89 }
90
91 static void gst_basesink_set_clock (GstElement * element, GstClock * clock);
92
93 static void gst_basesink_set_property (GObject * object, guint prop_id,
94     const GValue * value, GParamSpec * pspec);
95 static void gst_basesink_get_property (GObject * object, guint prop_id,
96     GValue * value, GParamSpec * pspec);
97
98 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
99 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
100 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
101     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
102 static void gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
103     GstClockTime * start, GstClockTime * end);
104
105 static GstElementStateReturn gst_basesink_change_state (GstElement * element);
106
107 static GstFlowReturn gst_basesink_chain_unlocked (GstPad * pad,
108     GstBuffer * buffer);
109 static void gst_basesink_loop (GstPad * pad);
110 static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
111 static gboolean gst_basesink_activate (GstPad * pad, GstActivateMode mode);
112 static gboolean gst_basesink_event (GstPad * pad, GstEvent * event);
113 static inline GstFlowReturn gst_basesink_handle_buffer (GstBaseSink * basesink,
114     GstBuffer * buf);
115
116 static void
117 gst_basesink_base_init (gpointer g_class)
118 {
119   GST_DEBUG_CATEGORY_INIT (gst_basesink_debug, "basesink", 0,
120       "basesink element");
121 }
122
123 static void
124 gst_basesink_class_init (GstBaseSinkClass * klass)
125 {
126   GObjectClass *gobject_class;
127   GstElementClass *gstelement_class;
128
129   gobject_class = (GObjectClass *) klass;
130   gstelement_class = (GstElementClass *) klass;
131
132   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
133
134   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_basesink_finalize);
135   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_basesink_set_property);
136   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_basesink_get_property);
137
138   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_LOOP,
139       g_param_spec_boolean ("has-loop", "has-loop",
140           "Enable loop-based operation", DEFAULT_HAS_LOOP,
141           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
142   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_CHAIN,
143       g_param_spec_boolean ("has-chain", "has-chain",
144           "Enable chain-based operation", DEFAULT_HAS_CHAIN,
145           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
146   /* FIXME, this next value should be configured using an event from the
147    * upstream element */
148   g_object_class_install_property (G_OBJECT_CLASS (klass),
149       PROP_PREROLL_QUEUE_LEN,
150       g_param_spec_uint ("preroll-queue-len", "preroll-queue-len",
151           "Number of buffers to queue during preroll", 0, G_MAXUINT, 0,
152           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
153
154   gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_basesink_set_clock);
155   gstelement_class->change_state =
156       GST_DEBUG_FUNCPTR (gst_basesink_change_state);
157
158   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
159   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
160   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
161   klass->get_times = GST_DEBUG_FUNCPTR (gst_basesink_get_times);
162 }
163
164 static GstCaps *
165 gst_basesink_pad_getcaps (GstPad * pad)
166 {
167   GstBaseSinkClass *bclass;
168   GstBaseSink *bsink;
169   GstCaps *caps = NULL;
170
171   bsink = GST_BASESINK (GST_PAD_PARENT (pad));
172   bclass = GST_BASESINK_GET_CLASS (bsink);
173   if (bclass->get_caps)
174     caps = bclass->get_caps (bsink);
175
176   if (caps == NULL) {
177     GstPadTemplate *pad_template;
178
179     pad_template =
180         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
181     if (pad_template != NULL) {
182       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
183     }
184   }
185
186   return caps;
187 }
188
189 static gboolean
190 gst_basesink_pad_setcaps (GstPad * pad, GstCaps * caps)
191 {
192   GstBaseSinkClass *bclass;
193   GstBaseSink *bsink;
194   gboolean res = FALSE;
195
196   bsink = GST_BASESINK (GST_PAD_PARENT (pad));
197   bclass = GST_BASESINK_GET_CLASS (bsink);
198
199   if (bclass->set_caps)
200     res = bclass->set_caps (bsink, caps);
201
202   return res;
203 }
204
205 static GstFlowReturn
206 gst_basesink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
207     GstCaps * caps, GstBuffer ** buf)
208 {
209   GstBaseSinkClass *bclass;
210   GstBaseSink *bsink;
211   GstFlowReturn result = GST_FLOW_OK;
212
213   bsink = GST_BASESINK (GST_PAD_PARENT (pad));
214   bclass = GST_BASESINK_GET_CLASS (bsink);
215
216   if (bclass->buffer_alloc)
217     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
218   else
219     *buf = NULL;
220
221   return result;
222 }
223
224 static void
225 gst_basesink_init (GstBaseSink * basesink, gpointer g_class)
226 {
227   GstPadTemplate *pad_template;
228
229   pad_template =
230       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
231   g_return_if_fail (pad_template != NULL);
232
233   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
234
235   gst_pad_set_getcaps_function (basesink->sinkpad,
236       GST_DEBUG_FUNCPTR (gst_basesink_pad_getcaps));
237   gst_pad_set_setcaps_function (basesink->sinkpad,
238       GST_DEBUG_FUNCPTR (gst_basesink_pad_setcaps));
239   gst_pad_set_bufferalloc_function (basesink->sinkpad,
240       GST_DEBUG_FUNCPTR (gst_basesink_pad_buffer_alloc));
241   gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad);
242
243   basesink->pad_mode = GST_ACTIVATE_NONE;
244   GST_PAD_TASK (basesink->sinkpad) = NULL;
245   basesink->preroll_queue = g_queue_new ();
246
247   GST_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
248 }
249
250 static void
251 gst_basesink_finalize (GObject * object)
252 {
253   GstBaseSink *basesink;
254
255   basesink = GST_BASESINK (object);
256
257   g_queue_free (basesink->preroll_queue);
258
259   G_OBJECT_CLASS (parent_class)->finalize (object);
260 }
261
262 static void
263 gst_basesink_set_pad_functions (GstBaseSink * this, GstPad * pad)
264 {
265   gst_pad_set_activate_function (pad,
266       GST_DEBUG_FUNCPTR (gst_basesink_activate));
267   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_basesink_event));
268
269   if (this->has_chain)
270     gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_basesink_chain));
271   else
272     gst_pad_set_chain_function (pad, NULL);
273
274   if (this->has_loop)
275     gst_pad_set_loop_function (pad, GST_DEBUG_FUNCPTR (gst_basesink_loop));
276   else
277     gst_pad_set_loop_function (pad, NULL);
278 }
279
280 static void
281 gst_basesink_set_all_pad_functions (GstBaseSink * this)
282 {
283   GList *l;
284
285   for (l = GST_ELEMENT_PADS (this); l; l = l->next)
286     gst_basesink_set_pad_functions (this, (GstPad *) l->data);
287 }
288
289 static void
290 gst_basesink_set_clock (GstElement * element, GstClock * clock)
291 {
292   GstBaseSink *sink;
293
294   sink = GST_BASESINK (element);
295
296   sink->clock = clock;
297 }
298
299 static void
300 gst_basesink_set_property (GObject * object, guint prop_id,
301     const GValue * value, GParamSpec * pspec)
302 {
303   GstBaseSink *sink;
304
305   sink = GST_BASESINK (object);
306
307   GST_LOCK (sink);
308   switch (prop_id) {
309     case PROP_HAS_LOOP:
310       sink->has_loop = g_value_get_boolean (value);
311       gst_basesink_set_all_pad_functions (sink);
312       break;
313     case PROP_HAS_CHAIN:
314       sink->has_chain = g_value_get_boolean (value);
315       gst_basesink_set_all_pad_functions (sink);
316       break;
317     case PROP_PREROLL_QUEUE_LEN:
318       /* preroll lock necessary to serialize with finish_preroll */
319       GST_PREROLL_LOCK (sink->sinkpad);
320       sink->preroll_queue_max_len = g_value_get_uint (value);
321       GST_PREROLL_UNLOCK (sink->sinkpad);
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
325       break;
326   }
327   GST_UNLOCK (sink);
328 }
329
330 static void
331 gst_basesink_get_property (GObject * object, guint prop_id, GValue * value,
332     GParamSpec * pspec)
333 {
334   GstBaseSink *sink;
335
336   sink = GST_BASESINK (object);
337
338   GST_LOCK (sink);
339   switch (prop_id) {
340     case PROP_HAS_LOOP:
341       g_value_set_boolean (value, sink->has_loop);
342       break;
343     case PROP_HAS_CHAIN:
344       g_value_set_boolean (value, sink->has_chain);
345       break;
346     case PROP_PREROLL_QUEUE_LEN:
347       g_value_set_uint (value, sink->preroll_queue_max_len);
348       break;
349     default:
350       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
351       break;
352   }
353   GST_UNLOCK (sink);
354 }
355
356 static GstCaps *
357 gst_base_sink_get_caps (GstBaseSink * sink)
358 {
359   return NULL;
360 }
361
362 static gboolean
363 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
364 {
365   return TRUE;
366 }
367
368 static GstFlowReturn
369 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
370     GstCaps * caps, GstBuffer ** buf)
371 {
372   *buf = NULL;
373   return GST_FLOW_OK;
374 }
375
376 /* with PREROLL_LOCK */
377 static void
378 gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
379     GstBuffer * buffer)
380 {
381   /* if we don't have a buffer we just start blocking */
382   if (buffer == NULL)
383     goto block;
384
385   /* first buffer */
386   if (basesink->preroll_queue->length == 0) {
387     GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
388
389     GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
390         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
391     if (bclass->preroll)
392       bclass->preroll (basesink, buffer);
393   }
394
395   if (basesink->preroll_queue->length < basesink->preroll_queue_max_len) {
396     DEBUG ("push %p %p\n", basesink, buffer);
397     g_queue_push_tail (basesink->preroll_queue, buffer);
398     return;
399   }
400
401 block:
402   /* block until the state changes, or we get a flush, or something */
403   DEBUG ("block %p %p\n", basesink, buffer);
404   GST_DEBUG ("element %s waiting to finish preroll",
405       GST_ELEMENT_NAME (basesink));
406   basesink->need_preroll = FALSE;
407   basesink->have_preroll = TRUE;
408   GST_PREROLL_WAIT (pad);
409   GST_DEBUG ("done preroll");
410   basesink->have_preroll = FALSE;
411 }
412
413 /* with PREROLL_LOCK */
414 static GstFlowReturn
415 gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
416 {
417   GstBuffer *buf;
418   GQueue *q = basesink->preroll_queue;
419   GstFlowReturn ret;
420
421   ret = GST_FLOW_OK;
422
423   if (q) {
424     DEBUG ("empty queue\n");
425     while ((buf = g_queue_pop_head (q))) {
426       DEBUG ("pop %p\n", buf);
427       ret = gst_basesink_handle_buffer (basesink, buf);
428     }
429     DEBUG ("queue len %p %d\n", basesink, q->length);
430   }
431   return ret;
432 }
433
434 /* with PREROLL_LOCK */
435 static void
436 gst_basesink_preroll_queue_flush (GstBaseSink * basesink)
437 {
438   GstBuffer *buf;
439   GQueue *q = basesink->preroll_queue;
440
441   DEBUG ("flush %p\n", basesink);
442   if (q) {
443     while ((buf = g_queue_pop_head (q))) {
444       DEBUG ("pop %p\n", buf);
445       gst_buffer_unref (buf);
446     }
447   }
448 }
449
450 typedef enum
451 {
452   PREROLL_QUEUEING,
453   PREROLL_PLAYING,
454   PREROLL_FLUSHING,
455   PREROLL_ERROR
456 } PrerollReturn;
457
458 /* with STREAM_LOCK */
459 PrerollReturn
460 gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
461     GstBuffer * buffer)
462 {
463   gboolean flushing;
464
465   DEBUG ("finish preroll %p <\n", basesink);
466   /* lock order is important */
467   GST_STATE_LOCK (basesink);
468   GST_PREROLL_LOCK (pad);
469   DEBUG ("finish preroll %p >\n", basesink);
470   if (!basesink->need_preroll)
471     goto no_preroll;
472
473   gst_element_commit_state (GST_ELEMENT (basesink));
474   GST_STATE_UNLOCK (basesink);
475
476   gst_basesink_preroll_queue_push (basesink, pad, buffer);
477
478   GST_LOCK (pad);
479   flushing = GST_PAD_IS_FLUSHING (pad);
480   GST_UNLOCK (pad);
481   if (flushing)
482     goto flushing;
483
484   if (basesink->need_preroll)
485     goto still_queueing;
486
487   GST_DEBUG ("done preroll");
488
489   gst_basesink_preroll_queue_empty (basesink, pad);
490
491   GST_PREROLL_UNLOCK (pad);
492
493   return PREROLL_PLAYING;
494
495 no_preroll:
496   {
497     /* maybe it was another sink that blocked in preroll, need to check for
498        buffers to drain */
499     if (basesink->preroll_queue->length)
500       gst_basesink_preroll_queue_empty (basesink, pad);
501     GST_PREROLL_UNLOCK (pad);
502     GST_STATE_UNLOCK (basesink);
503     return PREROLL_PLAYING;
504   }
505 flushing:
506   {
507     GST_DEBUG ("pad is flushing");
508     GST_PREROLL_UNLOCK (pad);
509     return PREROLL_FLUSHING;
510   }
511 still_queueing:
512   {
513     GST_PREROLL_UNLOCK (pad);
514     return PREROLL_QUEUEING;
515   }
516 }
517
518 static gboolean
519 gst_basesink_event (GstPad * pad, GstEvent * event)
520 {
521   GstBaseSink *basesink;
522   gboolean result = TRUE;
523   GstBaseSinkClass *bclass;
524
525   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
526
527   bclass = GST_BASESINK_GET_CLASS (basesink);
528
529   DEBUG ("event %p\n", basesink);
530
531   if (bclass->event)
532     bclass->event (basesink, event);
533
534   switch (GST_EVENT_TYPE (event)) {
535     case GST_EVENT_EOS:
536     {
537       gboolean need_eos;
538
539       GST_STREAM_LOCK (pad);
540
541       /* EOS also finishes the preroll */
542       gst_basesink_finish_preroll (basesink, pad, NULL);
543
544       GST_LOCK (basesink);
545       need_eos = basesink->eos = TRUE;
546       if (basesink->clock) {
547         /* wait for last buffer to finish if we have a valid end time */
548         if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
549           basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
550               basesink->end_time + GST_ELEMENT (basesink)->base_time);
551           GST_UNLOCK (basesink);
552
553           gst_clock_id_wait (basesink->clock_id, NULL);
554
555           GST_LOCK (basesink);
556           if (basesink->clock_id) {
557             gst_clock_id_unref (basesink->clock_id);
558             basesink->clock_id = NULL;
559           }
560           basesink->end_time = GST_CLOCK_TIME_NONE;
561           need_eos = basesink->eos;
562         }
563         GST_UNLOCK (basesink);
564
565         /* if we are still EOS, we can post the EOS message */
566         if (need_eos) {
567           /* ok, now we can post the message */
568           gst_element_post_message (GST_ELEMENT (basesink),
569               gst_message_new_eos (GST_OBJECT (basesink)));
570         }
571       }
572       GST_STREAM_UNLOCK (pad);
573       break;
574     }
575     case GST_EVENT_DISCONTINUOUS:
576       GST_STREAM_LOCK (pad);
577       if (basesink->clock) {
578         //gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value;
579       }
580       GST_STREAM_UNLOCK (pad);
581       break;
582     case GST_EVENT_FLUSH:
583       /* make sure we are not blocked on the clock also clear any pending
584        * eos state. */
585       if (!GST_EVENT_FLUSH_DONE (event)) {
586         GST_LOCK (basesink);
587         basesink->eos = FALSE;
588         if (basesink->clock_id) {
589           gst_clock_id_unschedule (basesink->clock_id);
590         }
591         GST_UNLOCK (basesink);
592
593         /* unlock from a possible state change/preroll */
594         GST_PREROLL_LOCK (pad);
595         basesink->need_preroll = TRUE;
596         gst_basesink_preroll_queue_flush (basesink);
597         GST_PREROLL_SIGNAL (pad);
598         GST_PREROLL_UNLOCK (pad);
599       }
600       /* now we are completely unblocked and the _chain method
601        * will return */
602       break;
603     default:
604       result = gst_pad_event_default (pad, event);
605       break;
606   }
607
608   return result;
609 }
610
611 /* default implementation to calculate the start and end
612  * timestamps on a buffer, subclasses cna override
613  */
614 static void
615 gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
616     GstClockTime * start, GstClockTime * end)
617 {
618   GstClockTime timestamp, duration;
619
620   timestamp = GST_BUFFER_TIMESTAMP (buffer);
621   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
622     duration = GST_BUFFER_DURATION (buffer);
623     if (GST_CLOCK_TIME_IS_VALID (duration)) {
624       *end = timestamp + duration;
625     }
626     *start = timestamp;
627   }
628 }
629
630 /* perform synchronisation on a buffer
631  * 
632  * 1) check if we have a clock, if not, do nothing
633  * 2) calculate the start and end time of the buffer
634  * 3) create a single shot notification to wait on
635  *    the clock, save the entry so we can unlock it
636  * 4) wait on the clock, this blocks
637  * 5) unref the clockid again
638  */
639 static void
640 gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
641 {
642   if (basesink->clock) {
643     GstClockReturn ret;
644     GstClockTime start, end;
645     GstBaseSinkClass *bclass;
646
647     bclass = GST_BASESINK_GET_CLASS (basesink);
648     start = end = -1;
649     if (bclass->get_times)
650       bclass->get_times (basesink, buffer, &start, &end);
651
652     GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
653         ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
654
655     if (GST_CLOCK_TIME_IS_VALID (start)) {
656       /* save clock id so that we can unlock it if needed */
657       GST_LOCK (basesink);
658       basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
659           start + GST_ELEMENT (basesink)->base_time);
660       basesink->end_time = end;
661       GST_UNLOCK (basesink);
662
663       ret = gst_clock_id_wait (basesink->clock_id, NULL);
664
665       GST_LOCK (basesink);
666       if (basesink->clock_id) {
667         gst_clock_id_unref (basesink->clock_id);
668         basesink->clock_id = NULL;
669       }
670       /* FIXME, don't mess with end_time here */
671       basesink->end_time = GST_CLOCK_TIME_NONE;
672       GST_UNLOCK (basesink);
673
674       GST_LOG_OBJECT (basesink, "clock entry done: %d", ret);
675     }
676   }
677 }
678
679 /* handle a buffer
680  *
681  * 1) first sync on the buffer
682  * 2) render the buffer
683  * 3) unref the buffer
684  */
685 static inline GstFlowReturn
686 gst_basesink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf)
687 {
688   GstBaseSinkClass *bclass;
689   GstFlowReturn ret;
690
691   gst_basesink_do_sync (basesink, buf);
692
693   bclass = GST_BASESINK_GET_CLASS (basesink);
694   if (bclass->render)
695     ret = bclass->render (basesink, buf);
696   else
697     ret = GST_FLOW_OK;
698
699   DEBUG ("unref %p %p\n", basesink, buf);
700   gst_buffer_unref (buf);
701
702   return ret;
703 }
704
705 static GstFlowReturn
706 gst_basesink_chain_unlocked (GstPad * pad, GstBuffer * buf)
707 {
708   GstBaseSink *basesink;
709   PrerollReturn result;
710
711   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
712
713   DEBUG ("chain_unlocked %p\n", basesink);
714
715   result = gst_basesink_finish_preroll (basesink, pad, buf);
716
717   DEBUG ("chain_unlocked %p after, result %d\n", basesink, result);
718
719   switch (result) {
720     case PREROLL_QUEUEING:
721       return GST_FLOW_OK;
722     case PREROLL_PLAYING:
723       return gst_basesink_handle_buffer (basesink, buf);
724     case PREROLL_FLUSHING:
725       gst_buffer_unref (buf);
726       return GST_FLOW_UNEXPECTED;
727     default:
728       g_assert_not_reached ();
729       return GST_FLOW_ERROR;
730   }
731 }
732
733 static GstFlowReturn
734 gst_basesink_chain (GstPad * pad, GstBuffer * buf)
735 {
736   GstFlowReturn result;
737
738   g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
739       GST_ACTIVATE_PUSH);
740
741   result = gst_basesink_chain_unlocked (pad, buf);
742
743   return result;
744 }
745
746 /* FIXME, not all sinks can operate in pull mode 
747  */
748 static void
749 gst_basesink_loop (GstPad * pad)
750 {
751   GstBaseSink *basesink;
752   GstBuffer *buf = NULL;
753   GstFlowReturn result;
754
755   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
756
757   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
758
759   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
760   if (result != GST_FLOW_OK)
761     goto paused;
762
763   result = gst_basesink_chain_unlocked (pad, buf);
764   if (result != GST_FLOW_OK)
765     goto paused;
766
767   /* default */
768   return;
769
770 paused:
771   gst_pad_pause_task (pad);
772   return;
773 }
774
775 static gboolean
776 gst_basesink_activate (GstPad * pad, GstActivateMode mode)
777 {
778   gboolean result = FALSE;
779   GstBaseSink *basesink;
780   GstBaseSinkClass *bclass;
781
782   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
783   bclass = GST_BASESINK_GET_CLASS (basesink);
784
785   switch (mode) {
786     case GST_ACTIVATE_PUSH:
787       g_return_val_if_fail (basesink->has_chain, FALSE);
788       result = TRUE;
789       break;
790     case GST_ACTIVATE_PULL:
791       /* if we have a scheduler we can start the task */
792       g_return_val_if_fail (basesink->has_loop, FALSE);
793       gst_pad_peer_set_active (pad, mode);
794       result =
795           gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
796       break;
797     case GST_ACTIVATE_NONE:
798       /* step 1, unblock clock sync (if any) or any other blocking thing */
799       GST_LOCK (basesink);
800       if (basesink->clock_id) {
801         gst_clock_id_unschedule (basesink->clock_id);
802       }
803       GST_UNLOCK (basesink);
804
805       /* unlock any subclasses */
806       if (bclass->unlock)
807         bclass->unlock (basesink);
808
809       /* unlock preroll */
810       GST_PREROLL_LOCK (pad);
811       GST_PREROLL_SIGNAL (pad);
812       GST_PREROLL_UNLOCK (pad);
813
814       /* step 2, make sure streaming finishes */
815       result = gst_pad_stop_task (pad);
816       break;
817   }
818   basesink->pad_mode = mode;
819
820   return result;
821 }
822
823 static GstElementStateReturn
824 gst_basesink_change_state (GstElement * element)
825 {
826   GstElementStateReturn ret = GST_STATE_SUCCESS;
827   GstBaseSink *basesink = GST_BASESINK (element);
828   GstElementState transition = GST_STATE_TRANSITION (element);
829
830   DEBUG ("state change > %p %x\n", basesink, transition);
831
832   switch (transition) {
833     case GST_STATE_NULL_TO_READY:
834       break;
835     case GST_STATE_READY_TO_PAUSED:
836       /* need to complete preroll before this state change completes, there
837        * is no data flow in READY so we can safely assume we need to preroll. */
838       basesink->offset = 0;
839       GST_PREROLL_LOCK (basesink->sinkpad);
840       basesink->need_preroll = TRUE;
841       basesink->have_preroll = FALSE;
842       GST_PREROLL_UNLOCK (basesink->sinkpad);
843       ret = GST_STATE_ASYNC;
844       break;
845     case GST_STATE_PAUSED_TO_PLAYING:
846       GST_PREROLL_LOCK (basesink->sinkpad);
847       if (basesink->have_preroll) {
848         /* now let it play */
849         GST_PREROLL_SIGNAL (basesink->sinkpad);
850       } else {
851         /* FIXME. We do not have a preroll and we don't need it anymore 
852          * now, this is a case we want to avoid. One way would be to make
853          * a 'lost state' function that makes get_state return PAUSED with
854          * ASYNC to indicate that we are prerolling again. */
855         basesink->need_preroll = FALSE;
856       }
857       GST_PREROLL_UNLOCK (basesink->sinkpad);
858       break;
859     default:
860       break;
861   }
862
863   GST_ELEMENT_CLASS (parent_class)->change_state (element);
864
865   switch (transition) {
866     case GST_STATE_PLAYING_TO_PAUSED:
867     {
868       gboolean eos;
869
870       /* unlock clock wait if any */
871       GST_LOCK (basesink);
872       if (basesink->clock_id) {
873         gst_clock_id_unschedule (basesink->clock_id);
874       }
875       eos = basesink->eos;
876       GST_UNLOCK (basesink);
877
878       GST_PREROLL_LOCK (basesink->sinkpad);
879       /* if we don't have a preroll buffer and we have not received EOS,
880        * we need to wait for a preroll */
881       if (!basesink->have_preroll && !eos) {
882         basesink->need_preroll = TRUE;
883         ret = GST_STATE_ASYNC;
884       }
885       GST_PREROLL_UNLOCK (basesink->sinkpad);
886       break;
887     }
888     case GST_STATE_PAUSED_TO_READY:
889       /* flush out the data thread if it's locked in finish_preroll */
890       GST_PREROLL_LOCK (basesink->sinkpad);
891
892       gst_basesink_preroll_queue_flush (basesink);
893
894       if (basesink->have_preroll)
895         GST_PREROLL_SIGNAL (basesink->sinkpad);
896
897       basesink->need_preroll = FALSE;
898       basesink->have_preroll = FALSE;
899       GST_PREROLL_UNLOCK (basesink->sinkpad);
900
901       /* clear EOS state */
902       basesink->eos = FALSE;
903       break;
904     case GST_STATE_READY_TO_NULL:
905       break;
906     default:
907       break;
908   }
909
910   DEBUG ("state change < %p %x\n", basesink, transition);
911   return ret;
912 }