7d75d4932b41cfc06dcad9f6d3043a9eb4351cf4
[platform/upstream/gstreamer.git] / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbasesink.c: 
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #  include "config.h"
24 #endif
25
26 #include "gstbasesink.h"
27 #include <gst/gstmarshal.h>
28
29 GST_DEBUG_CATEGORY_STATIC (gst_basesink_debug);
30 #define GST_CAT_DEFAULT gst_basesink_debug
31
32 /* BaseSink signals and properties */
33 enum
34 {
35   /* FILL ME */
36   SIGNAL_HANDOFF,
37   LAST_SIGNAL
38 };
39
40 /* FIXME, need to figure out a better way to handle the pull mode */
41 #define DEFAULT_SIZE 1024
42 #define DEFAULT_HAS_LOOP FALSE
43 #define DEFAULT_HAS_CHAIN TRUE
44
45 enum
46 {
47   PROP_0,
48   PROP_HAS_LOOP,
49   PROP_HAS_CHAIN,
50   PROP_PREROLL_QUEUE_LEN
51 };
52
53 static GstElementClass *parent_class = NULL;
54
55 static void gst_basesink_base_init (gpointer g_class);
56 static void gst_basesink_class_init (GstBaseSinkClass * klass);
57 static void gst_basesink_init (GstBaseSink * trans, gpointer g_class);
58 static void gst_basesink_finalize (GObject * object);
59
60 GType
61 gst_basesink_get_type (void)
62 {
63   static GType basesink_type = 0;
64
65   if (!basesink_type) {
66     static const GTypeInfo basesink_info = {
67       sizeof (GstBaseSinkClass),
68       (GBaseInitFunc) gst_basesink_base_init,
69       NULL,
70       (GClassInitFunc) gst_basesink_class_init,
71       NULL,
72       NULL,
73       sizeof (GstBaseSink),
74       0,
75       (GInstanceInitFunc) gst_basesink_init,
76     };
77
78     basesink_type = g_type_register_static (GST_TYPE_ELEMENT,
79         "GstBaseSink", &basesink_info, G_TYPE_FLAG_ABSTRACT);
80   }
81   return basesink_type;
82 }
83
84 static void gst_basesink_set_clock (GstElement * element, GstClock * clock);
85
86 static void gst_basesink_set_property (GObject * object, guint prop_id,
87     const GValue * value, GParamSpec * pspec);
88 static void gst_basesink_get_property (GObject * object, guint prop_id,
89     GValue * value, GParamSpec * pspec);
90
91 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
92 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
93 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
94     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
95 static void gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
96     GstClockTime * start, GstClockTime * end);
97
98 static GstElementStateReturn gst_basesink_change_state (GstElement * element);
99
100 static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
101 static void gst_basesink_loop (GstPad * pad);
102 static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
103 static gboolean gst_basesink_activate (GstPad * pad, GstActivateMode mode);
104 static gboolean gst_basesink_event (GstPad * pad, GstEvent * event);
105 static inline GstFlowReturn gst_basesink_handle_buffer (GstBaseSink * basesink,
106     GstBuffer * buf);
107 static inline gboolean gst_basesink_handle_event (GstBaseSink * basesink,
108     GstEvent * event);
109
110 static void
111 gst_basesink_base_init (gpointer g_class)
112 {
113   GST_DEBUG_CATEGORY_INIT (gst_basesink_debug, "basesink", 0,
114       "basesink element");
115 }
116
117 static void
118 gst_basesink_class_init (GstBaseSinkClass * klass)
119 {
120   GObjectClass *gobject_class;
121   GstElementClass *gstelement_class;
122
123   gobject_class = (GObjectClass *) klass;
124   gstelement_class = (GstElementClass *) klass;
125
126   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
127
128   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_basesink_finalize);
129   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_basesink_set_property);
130   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_basesink_get_property);
131
132   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_LOOP,
133       g_param_spec_boolean ("has-loop", "has-loop",
134           "Enable loop-based operation", DEFAULT_HAS_LOOP,
135           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
136   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_HAS_CHAIN,
137       g_param_spec_boolean ("has-chain", "has-chain",
138           "Enable chain-based operation", DEFAULT_HAS_CHAIN,
139           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
140   /* FIXME, this next value should be configured using an event from the
141    * upstream element */
142   g_object_class_install_property (G_OBJECT_CLASS (klass),
143       PROP_PREROLL_QUEUE_LEN,
144       g_param_spec_uint ("preroll-queue-len", "preroll-queue-len",
145           "Number of buffers to queue during preroll", 0, G_MAXUINT, 0,
146           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
147
148   gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_basesink_set_clock);
149   gstelement_class->change_state =
150       GST_DEBUG_FUNCPTR (gst_basesink_change_state);
151
152   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
153   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
154   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
155   klass->get_times = GST_DEBUG_FUNCPTR (gst_basesink_get_times);
156 }
157
158 static GstCaps *
159 gst_basesink_pad_getcaps (GstPad * pad)
160 {
161   GstBaseSinkClass *bclass;
162   GstBaseSink *bsink;
163   GstCaps *caps = NULL;
164
165   bsink = GST_BASESINK (GST_PAD_PARENT (pad));
166   bclass = GST_BASESINK_GET_CLASS (bsink);
167   if (bclass->get_caps)
168     caps = bclass->get_caps (bsink);
169
170   if (caps == NULL) {
171     GstPadTemplate *pad_template;
172
173     pad_template =
174         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
175     if (pad_template != NULL) {
176       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
177     }
178   }
179
180   return caps;
181 }
182
183 static gboolean
184 gst_basesink_pad_setcaps (GstPad * pad, GstCaps * caps)
185 {
186   GstBaseSinkClass *bclass;
187   GstBaseSink *bsink;
188   gboolean res = FALSE;
189
190   bsink = GST_BASESINK (GST_PAD_PARENT (pad));
191   bclass = GST_BASESINK_GET_CLASS (bsink);
192
193   if (bclass->set_caps)
194     res = bclass->set_caps (bsink, caps);
195
196   return res;
197 }
198
199 static GstFlowReturn
200 gst_basesink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
201     GstCaps * caps, GstBuffer ** buf)
202 {
203   GstBaseSinkClass *bclass;
204   GstBaseSink *bsink;
205   GstFlowReturn result = GST_FLOW_OK;
206
207   bsink = GST_BASESINK (GST_PAD_PARENT (pad));
208   bclass = GST_BASESINK_GET_CLASS (bsink);
209
210   if (bclass->buffer_alloc)
211     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
212   else
213     *buf = NULL;
214
215   return result;
216 }
217
218 static void
219 gst_basesink_init (GstBaseSink * basesink, gpointer g_class)
220 {
221   GstPadTemplate *pad_template;
222
223   pad_template =
224       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
225   g_return_if_fail (pad_template != NULL);
226
227   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
228
229   gst_pad_set_getcaps_function (basesink->sinkpad,
230       GST_DEBUG_FUNCPTR (gst_basesink_pad_getcaps));
231   gst_pad_set_setcaps_function (basesink->sinkpad,
232       GST_DEBUG_FUNCPTR (gst_basesink_pad_setcaps));
233   gst_pad_set_bufferalloc_function (basesink->sinkpad,
234       GST_DEBUG_FUNCPTR (gst_basesink_pad_buffer_alloc));
235   gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad);
236
237   basesink->pad_mode = GST_ACTIVATE_NONE;
238   GST_PAD_TASK (basesink->sinkpad) = NULL;
239   basesink->preroll_queue = g_queue_new ();
240
241   GST_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
242 }
243
244 static void
245 gst_basesink_finalize (GObject * object)
246 {
247   GstBaseSink *basesink;
248
249   basesink = GST_BASESINK (object);
250
251   g_queue_free (basesink->preroll_queue);
252
253   G_OBJECT_CLASS (parent_class)->finalize (object);
254 }
255
256 static void
257 gst_basesink_set_pad_functions (GstBaseSink * this, GstPad * pad)
258 {
259   gst_pad_set_activate_function (pad,
260       GST_DEBUG_FUNCPTR (gst_basesink_activate));
261   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_basesink_event));
262
263   if (this->has_chain)
264     gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_basesink_chain));
265   else
266     gst_pad_set_chain_function (pad, NULL);
267
268   if (this->has_loop)
269     gst_pad_set_loop_function (pad, GST_DEBUG_FUNCPTR (gst_basesink_loop));
270   else
271     gst_pad_set_loop_function (pad, NULL);
272 }
273
274 static void
275 gst_basesink_set_all_pad_functions (GstBaseSink * this)
276 {
277   GList *l;
278
279   for (l = GST_ELEMENT_PADS (this); l; l = l->next)
280     gst_basesink_set_pad_functions (this, (GstPad *) l->data);
281 }
282
283 static void
284 gst_basesink_set_clock (GstElement * element, GstClock * clock)
285 {
286   GstBaseSink *sink;
287
288   sink = GST_BASESINK (element);
289
290   sink->clock = clock;
291 }
292
293 static void
294 gst_basesink_set_property (GObject * object, guint prop_id,
295     const GValue * value, GParamSpec * pspec)
296 {
297   GstBaseSink *sink;
298
299   sink = GST_BASESINK (object);
300
301   switch (prop_id) {
302     case PROP_HAS_LOOP:
303       GST_LOCK (sink);
304       sink->has_loop = g_value_get_boolean (value);
305       gst_basesink_set_all_pad_functions (sink);
306       GST_UNLOCK (sink);
307       break;
308     case PROP_HAS_CHAIN:
309       GST_LOCK (sink);
310       sink->has_chain = g_value_get_boolean (value);
311       gst_basesink_set_all_pad_functions (sink);
312       GST_UNLOCK (sink);
313       break;
314     case PROP_PREROLL_QUEUE_LEN:
315       /* preroll lock necessary to serialize with finish_preroll */
316       GST_PREROLL_LOCK (sink->sinkpad);
317       sink->preroll_queue_max_len = g_value_get_uint (value);
318       GST_PREROLL_UNLOCK (sink->sinkpad);
319       break;
320     default:
321       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
322       break;
323   }
324 }
325
326 static void
327 gst_basesink_get_property (GObject * object, guint prop_id, GValue * value,
328     GParamSpec * pspec)
329 {
330   GstBaseSink *sink;
331
332   sink = GST_BASESINK (object);
333
334   GST_LOCK (sink);
335   switch (prop_id) {
336     case PROP_HAS_LOOP:
337       g_value_set_boolean (value, sink->has_loop);
338       break;
339     case PROP_HAS_CHAIN:
340       g_value_set_boolean (value, sink->has_chain);
341       break;
342     case PROP_PREROLL_QUEUE_LEN:
343       g_value_set_uint (value, sink->preroll_queue_max_len);
344       break;
345     default:
346       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
347       break;
348   }
349   GST_UNLOCK (sink);
350 }
351
352 static GstCaps *
353 gst_base_sink_get_caps (GstBaseSink * sink)
354 {
355   return NULL;
356 }
357
358 static gboolean
359 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
360 {
361   return TRUE;
362 }
363
364 static GstFlowReturn
365 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
366     GstCaps * caps, GstBuffer ** buf)
367 {
368   *buf = NULL;
369   return GST_FLOW_OK;
370 }
371
372 /* with PREROLL_LOCK */
373 static GstFlowReturn
374 gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
375 {
376   GstMiniObject *obj;
377   GQueue *q = basesink->preroll_queue;
378   GstFlowReturn ret;
379
380   ret = GST_FLOW_OK;
381
382   if (q) {
383     GST_DEBUG ("emptying queue");
384     while ((obj = g_queue_pop_head (q))) {
385       /* we release the preroll lock while pushing so that we
386        * can still flush it while blocking on the clock or
387        * inside the element. */
388       GST_PREROLL_UNLOCK (pad);
389
390       if (GST_IS_BUFFER (obj)) {
391         GST_DEBUG ("poped buffer %p", obj);
392         ret = gst_basesink_handle_buffer (basesink, GST_BUFFER (obj));
393       } else {
394         GST_DEBUG ("poped event %p", obj);
395         gst_basesink_handle_event (basesink, GST_EVENT (obj));
396         ret = GST_FLOW_OK;
397       }
398
399       GST_PREROLL_LOCK (pad);
400     }
401     GST_DEBUG ("queue empty");
402   }
403   return ret;
404 }
405
406 /* with PREROLL_LOCK */
407 static void
408 gst_basesink_preroll_queue_flush (GstBaseSink * basesink)
409 {
410   GstMiniObject *obj;
411   GQueue *q = basesink->preroll_queue;
412
413   GST_DEBUG ("flushing queue %p", basesink);
414   if (q) {
415     while ((obj = g_queue_pop_head (q))) {
416       GST_DEBUG ("poped %p", obj);
417       gst_mini_object_unref (obj);
418     }
419   }
420   /* we can't have EOS anymore now */
421   basesink->eos = FALSE;
422 }
423
424 /* with STREAM_LOCK */
425 static GstFlowReturn
426 gst_basesink_handle_object (GstBaseSink * basesink, GstPad * pad,
427     GstMiniObject * obj)
428 {
429   gint length;
430   gboolean have_event;
431
432   GST_PREROLL_LOCK (pad);
433   /* push object on the queue */
434   GST_DEBUG ("push on queue %p %p", basesink, obj);
435   g_queue_push_tail (basesink->preroll_queue, obj);
436
437   have_event = GST_IS_EVENT (obj);
438
439   if (have_event && GST_EVENT_TYPE (obj) == GST_EVENT_EOS) {
440     basesink->eos = TRUE;
441   }
442
443   /* check if we are prerolling */
444   if (!basesink->need_preroll)
445     goto no_preroll;
446
447   length = basesink->preroll_queue->length;
448   /* this is the first object we queued */
449   if (length == 1) {
450     GST_DEBUG ("do preroll %p", obj);
451
452     /* if it's a buffer, we need to call the preroll method */
453     if (GST_IS_BUFFER (obj)) {
454       GstBaseSinkClass *bclass;
455
456       bclass = GST_BASESINK_GET_CLASS (basesink);
457       if (bclass->preroll)
458         bclass->preroll (basesink, GST_BUFFER (obj));
459     }
460   }
461   /* we are prerolling */
462   GST_DEBUG ("finish preroll %p >", basesink);
463   GST_PREROLL_UNLOCK (pad);
464
465   /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
466    * inside the STREAM_LOCK */
467   GST_STREAM_UNLOCK (pad);
468
469   /* now we commit our state */
470   GST_STATE_LOCK (basesink);
471   GST_DEBUG ("commit state %p >", basesink);
472   gst_element_commit_state (GST_ELEMENT (basesink));
473   GST_STATE_UNLOCK (basesink);
474
475   /* reacquire stream lock, pad could be flushing now */
476   GST_STREAM_LOCK (pad);
477
478   GST_LOCK (pad);
479   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
480     goto flushing;
481   GST_UNLOCK (pad);
482
483   /* and wait if needed */
484   GST_PREROLL_LOCK (pad);
485   /* it is possible that the application set the state to PLAYING
486    * now in which case we don't need to block anymore. */
487   if (!basesink->need_preroll)
488     goto no_preroll;
489
490   length = basesink->preroll_queue->length;
491   GST_DEBUG ("prerolled length %d", length);
492   /* see if we need to block now. We cannot block on events, only
493    * on buffers, the reason is that events can be sent from the
494    * application thread and we don't want to block there. */
495   if (length > basesink->preroll_queue_max_len && !have_event) {
496     /* block until the state changes, or we get a flush, or something */
497     GST_DEBUG ("element %s waiting to finish preroll",
498         GST_ELEMENT_NAME (basesink));
499     basesink->have_preroll = TRUE;
500     GST_PREROLL_WAIT (pad);
501     GST_DEBUG ("done preroll");
502     basesink->have_preroll = FALSE;
503   }
504   GST_PREROLL_UNLOCK (pad);
505
506   GST_LOCK (pad);
507   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
508     goto flushing;
509   GST_UNLOCK (pad);
510
511   return GST_FLOW_OK;
512
513 no_preroll:
514   {
515     GstFlowReturn ret;
516
517     GST_DEBUG ("no preroll needed");
518     /* maybe it was another sink that blocked in preroll, need to check for
519        buffers to drain */
520     ret = gst_basesink_preroll_queue_empty (basesink, pad);
521     GST_PREROLL_UNLOCK (pad);
522
523     return ret;
524   }
525 flushing:
526   {
527     GST_UNLOCK (pad);
528     GST_DEBUG ("pad is flushing");
529     return GST_FLOW_WRONG_STATE;
530   }
531 }
532
533 static gboolean
534 gst_basesink_event (GstPad * pad, GstEvent * event)
535 {
536   GstBaseSink *basesink;
537   gboolean result = TRUE;
538   GstBaseSinkClass *bclass;
539
540   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
541
542   bclass = GST_BASESINK_GET_CLASS (basesink);
543
544   GST_DEBUG ("event %p", event);
545
546   switch (GST_EVENT_TYPE (event)) {
547     case GST_EVENT_EOS:
548     {
549       GstFlowReturn ret;
550
551       GST_STREAM_LOCK (pad);
552       /* EOS also finishes the preroll */
553       ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
554       GST_STREAM_UNLOCK (pad);
555       break;
556     }
557     case GST_EVENT_DISCONTINUOUS:
558     {
559       GstFlowReturn ret;
560
561       GST_STREAM_LOCK (pad);
562       if (basesink->clock) {
563         //gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value;
564       }
565       ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
566       GST_STREAM_UNLOCK (pad);
567       break;
568     }
569     case GST_EVENT_FLUSH:
570       /* make sure we are not blocked on the clock also clear any pending
571        * eos state. */
572       if (bclass->event)
573         bclass->event (basesink, event);
574
575       if (!GST_EVENT_FLUSH_DONE (event)) {
576         GST_PREROLL_LOCK (pad);
577         /* we need preroll after the flush */
578         basesink->need_preroll = TRUE;
579         gst_basesink_preroll_queue_flush (basesink);
580         /* unlock from a possible state change/preroll */
581         GST_PREROLL_SIGNAL (pad);
582
583         GST_LOCK (basesink);
584         if (basesink->clock_id) {
585           gst_clock_id_unschedule (basesink->clock_id);
586         }
587         GST_UNLOCK (basesink);
588         GST_PREROLL_UNLOCK (pad);
589
590         /* and we need to commit our state again on the next
591          * prerolled buffer */
592         GST_STATE_LOCK (basesink);
593         GST_STREAM_LOCK (pad);
594         gst_element_lost_state (GST_ELEMENT (basesink));
595         GST_STREAM_UNLOCK (pad);
596         GST_STATE_UNLOCK (basesink);
597       } else {
598         /* now we are completely unblocked and the _chain method
599          * will return */
600         GST_STREAM_LOCK (pad);
601         GST_STREAM_UNLOCK (pad);
602       }
603
604       break;
605     default:
606       result = gst_pad_event_default (pad, event);
607       break;
608   }
609
610   return result;
611 }
612
613 /* default implementation to calculate the start and end
614  * timestamps on a buffer, subclasses cna override
615  */
616 static void
617 gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
618     GstClockTime * start, GstClockTime * end)
619 {
620   GstClockTime timestamp, duration;
621
622   timestamp = GST_BUFFER_TIMESTAMP (buffer);
623   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
624     duration = GST_BUFFER_DURATION (buffer);
625     if (GST_CLOCK_TIME_IS_VALID (duration)) {
626       *end = timestamp + duration;
627     }
628     *start = timestamp;
629   }
630 }
631
632 /* perform synchronisation on a buffer
633  * 
634  * 1) check if we have a clock, if not, do nothing
635  * 2) calculate the start and end time of the buffer
636  * 3) create a single shot notification to wait on
637  *    the clock, save the entry so we can unlock it
638  * 4) wait on the clock, this blocks
639  * 5) unref the clockid again
640  */
641 static gboolean
642 gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
643 {
644   gboolean result = TRUE;
645
646   if (basesink->clock) {
647     GstClockTime start, end;
648     GstBaseSinkClass *bclass;
649
650     bclass = GST_BASESINK_GET_CLASS (basesink);
651     start = end = -1;
652     if (bclass->get_times)
653       bclass->get_times (basesink, buffer, &start, &end);
654
655     GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
656         ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
657
658     if (GST_CLOCK_TIME_IS_VALID (start)) {
659       GstClockReturn ret;
660
661       /* save clock id so that we can unlock it if needed */
662       GST_LOCK (basesink);
663       basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
664           start + GST_ELEMENT (basesink)->base_time);
665       basesink->end_time = end;
666       GST_UNLOCK (basesink);
667
668       ret = gst_clock_id_wait (basesink->clock_id, NULL);
669
670       GST_LOCK (basesink);
671       if (basesink->clock_id) {
672         gst_clock_id_unref (basesink->clock_id);
673         basesink->clock_id = NULL;
674       }
675       GST_UNLOCK (basesink);
676
677       GST_LOG_OBJECT (basesink, "clock entry done: %d", ret);
678       if (ret == GST_CLOCK_UNSCHEDULED)
679         result = FALSE;
680     }
681   }
682   return result;
683 }
684
685
686 /* handle an event
687  *
688  * 2) render the event
689  * 3) unref the event
690  */
691 static inline gboolean
692 gst_basesink_handle_event (GstBaseSink * basesink, GstEvent * event)
693 {
694   GstBaseSinkClass *bclass;
695   gboolean ret;
696
697   switch (GST_EVENT_TYPE (event)) {
698     case GST_EVENT_EOS:
699       GST_LOCK (basesink);
700       if (basesink->clock) {
701         /* wait for last buffer to finish if we have a valid end time */
702         if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
703           basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
704               basesink->end_time + GST_ELEMENT (basesink)->base_time);
705           GST_UNLOCK (basesink);
706
707           gst_clock_id_wait (basesink->clock_id, NULL);
708
709           GST_LOCK (basesink);
710           if (basesink->clock_id) {
711             gst_clock_id_unref (basesink->clock_id);
712             basesink->clock_id = NULL;
713           }
714           basesink->end_time = GST_CLOCK_TIME_NONE;
715         }
716       }
717       GST_UNLOCK (basesink);
718       break;
719     default:
720       break;
721   }
722
723   bclass = GST_BASESINK_GET_CLASS (basesink);
724   if (bclass->event)
725     ret = bclass->event (basesink, event);
726   else
727     ret = TRUE;
728
729   switch (GST_EVENT_TYPE (event)) {
730     case GST_EVENT_EOS:
731       GST_PREROLL_LOCK (basesink->sinkpad);
732       /* if we are still EOS, we can post the EOS message */
733       if (basesink->eos) {
734         /* ok, now we can post the message */
735         gst_element_post_message (GST_ELEMENT (basesink),
736             gst_message_new_eos (GST_OBJECT (basesink)));
737       }
738       GST_PREROLL_UNLOCK (basesink->sinkpad);
739       break;
740     default:
741       break;
742   }
743
744   GST_DEBUG ("event unref %p %p", basesink, event);
745   gst_event_unref (event);
746
747   return ret;
748 }
749
750 /* handle a buffer
751  *
752  * 1) first sync on the buffer
753  * 2) render the buffer
754  * 3) unref the buffer
755  */
756 static inline GstFlowReturn
757 gst_basesink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf)
758 {
759   GstBaseSinkClass *bclass;
760   GstFlowReturn ret;
761
762   gst_basesink_do_sync (basesink, buf);
763
764   bclass = GST_BASESINK_GET_CLASS (basesink);
765   if (bclass->render)
766     ret = bclass->render (basesink, buf);
767   else
768     ret = GST_FLOW_OK;
769
770   GST_DEBUG ("buffer unref after render %p", basesink, buf);
771   gst_buffer_unref (buf);
772
773   return ret;
774 }
775
776 static GstFlowReturn
777 gst_basesink_chain (GstPad * pad, GstBuffer * buf)
778 {
779   GstBaseSink *basesink;
780   GstFlowReturn result;
781
782   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
783
784   result = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (buf));
785
786   return result;
787 }
788
789 /* FIXME, not all sinks can operate in pull mode 
790  */
791 static void
792 gst_basesink_loop (GstPad * pad)
793 {
794   GstBaseSink *basesink;
795   GstBuffer *buf = NULL;
796   GstFlowReturn result;
797
798   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
799
800   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
801
802   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
803   if (result != GST_FLOW_OK)
804     goto paused;
805
806   result = gst_basesink_chain (pad, buf);
807   if (result != GST_FLOW_OK)
808     goto paused;
809
810   /* default */
811   return;
812
813 paused:
814   gst_pad_pause_task (pad);
815   return;
816 }
817
818 static gboolean
819 gst_basesink_activate (GstPad * pad, GstActivateMode mode)
820 {
821   gboolean result = FALSE;
822   GstBaseSink *basesink;
823   GstBaseSinkClass *bclass;
824
825   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
826   bclass = GST_BASESINK_GET_CLASS (basesink);
827
828   switch (mode) {
829     case GST_ACTIVATE_PUSH:
830       g_return_val_if_fail (basesink->has_chain, FALSE);
831       result = TRUE;
832       break;
833     case GST_ACTIVATE_PULL:
834       /* if we have a scheduler we can start the task */
835       g_return_val_if_fail (basesink->has_loop, FALSE);
836       gst_pad_peer_set_active (pad, mode);
837       result =
838           gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
839       break;
840     case GST_ACTIVATE_NONE:
841       /* step 1, unblock clock sync (if any) or any other blocking thing */
842       GST_PREROLL_LOCK (pad);
843       GST_LOCK (basesink);
844       if (basesink->clock_id) {
845         gst_clock_id_unschedule (basesink->clock_id);
846       }
847       GST_UNLOCK (basesink);
848
849       /* unlock any subclasses */
850       if (bclass->unlock)
851         bclass->unlock (basesink);
852
853       /* flush out the data thread if it's locked in finish_preroll */
854       gst_basesink_preroll_queue_flush (basesink);
855       basesink->need_preroll = FALSE;
856       GST_PREROLL_SIGNAL (pad);
857       GST_PREROLL_UNLOCK (pad);
858
859       /* step 2, make sure streaming finishes */
860       result = gst_pad_stop_task (pad);
861       break;
862   }
863   basesink->pad_mode = mode;
864
865   return result;
866 }
867
868 static GstElementStateReturn
869 gst_basesink_change_state (GstElement * element)
870 {
871   GstElementStateReturn ret = GST_STATE_SUCCESS;
872   GstBaseSink *basesink = GST_BASESINK (element);
873   GstElementState transition = GST_STATE_TRANSITION (element);
874
875   switch (transition) {
876     case GST_STATE_NULL_TO_READY:
877       break;
878     case GST_STATE_READY_TO_PAUSED:
879       /* need to complete preroll before this state change completes, there
880        * is no data flow in READY so we can safely assume we need to preroll. */
881       basesink->offset = 0;
882       GST_PREROLL_LOCK (basesink->sinkpad);
883       basesink->have_preroll = FALSE;
884       basesink->need_preroll = TRUE;
885       GST_PREROLL_UNLOCK (basesink->sinkpad);
886       ret = GST_STATE_ASYNC;
887       break;
888     case GST_STATE_PAUSED_TO_PLAYING:
889     {
890       GST_PREROLL_LOCK (basesink->sinkpad);
891       /* if we have EOS, we should empty the queue now as there will
892        * be no more data received in the chain function.
893        * FIXME, this could block the state change function too long when
894        * we are pushing and syncing the buffers, better start a new
895        * thread to do this. */
896       if (basesink->eos) {
897         gst_basesink_preroll_queue_empty (basesink, basesink->sinkpad);
898       }
899       /* don't need the preroll anymore */
900       basesink->need_preroll = FALSE;
901       if (basesink->have_preroll) {
902         /* now let it play */
903         GST_PREROLL_SIGNAL (basesink->sinkpad);
904       }
905       GST_PREROLL_UNLOCK (basesink->sinkpad);
906       break;
907     }
908     default:
909       break;
910   }
911
912   GST_ELEMENT_CLASS (parent_class)->change_state (element);
913
914   switch (transition) {
915     case GST_STATE_PLAYING_TO_PAUSED:
916     {
917       GstBaseSinkClass *bclass;
918
919       bclass = GST_BASESINK_GET_CLASS (basesink);
920
921       GST_PREROLL_LOCK (basesink->sinkpad);
922       GST_LOCK (basesink);
923       /* unlock clock wait if any */
924       if (basesink->clock_id) {
925         gst_clock_id_unschedule (basesink->clock_id);
926       }
927       GST_UNLOCK (basesink);
928
929       /* unlock any subclasses */
930       if (bclass->unlock)
931         bclass->unlock (basesink);
932
933       /* if we don't have a preroll buffer and we have not received EOS,
934        * we need to wait for a preroll */
935       if (!basesink->have_preroll && !basesink->eos) {
936         basesink->need_preroll = TRUE;
937         ret = GST_STATE_ASYNC;
938       }
939       GST_PREROLL_UNLOCK (basesink->sinkpad);
940       break;
941     }
942     case GST_STATE_PAUSED_TO_READY:
943       break;
944     case GST_STATE_READY_TO_NULL:
945       break;
946     default:
947       break;
948   }
949
950   return ret;
951 }