gst/gstevent.h (gst_event_new_new_segment) (gst_event_parse_new_segment, gst_event_ne...
[platform/upstream/gstreamer.git] / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSource
26  *
27  * GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. GstBaseSink handles many details for
30  * you, for example preroll, clock synchronization, state changes, activation in
31  * push or pull mode, and queries. In most cases, when writing sink elements,
32  * there is no need to implement class methods from #GstElement or to set
33  * functions on pads, because the GstBaseSink infrastructure is sufficient.
34  *
35  * There is only support in GstBaseSink for one sink pad, which should be named
36  * "sink". A sink implementation (subclass of GstBaseSink) should install a pad
37  * template in its base_init function, like so:
38  * <programlisting>
39  * static void
40  * my_element_base_init (gpointer g_class)
41  * {
42  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
43  *   
44  *   // sinktemplate should be a #GstStaticPadTemplate with direction
45  *   // #GST_PAD_SINK and name "sink"
46  *   gst_element_class_add_pad_template (gstelement_class,
47  *       gst_static_pad_template_get (&amp;sinktemplate));
48  *   // see #GstElementDetails
49  *   gst_element_class_set_details (gstelement_class, &amp;details);
50  * }
51  * </programlisting>
52  *
53  * The one method which all subclasses of GstBaseSink must implement is
54  * GstBaseSink::render. This method will be called...
55  * 
56  * preroll()
57  *
58  * event(): mostly useful for file-like sinks (seeking or flushing)
59  *
60  * get_caps/set_caps/buffer_alloc
61  *
62  * start/stop for resource allocation
63  *
64  * unlock if you block on an fd, for example
65  *
66  * get_times i'm sure is for something :P
67  *
68  * provide example of textsink
69  *
70  * admonishment not to try to implement your own sink with prerolling...
71  *
72  * extending via subclassing, setting pad functions, gstelement vmethods.
73  */
74
75 #ifdef HAVE_CONFIG_H
76 #  include "config.h"
77 #endif
78
79 #include "gstbasesink.h"
80 #include <gst/gstmarshal.h>
81 #include <gst/gst-i18n-lib.h>
82
83 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
84 #define GST_CAT_DEFAULT gst_base_sink_debug
85
86 /* BaseSink signals and properties */
87 enum
88 {
89   /* FILL ME */
90   SIGNAL_HANDOFF,
91   LAST_SIGNAL
92 };
93
94 #define DEFAULT_SIZE 1024
95 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
96 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
97
98 #define DEFAULT_SYNC TRUE
99
100 enum
101 {
102   PROP_0,
103   PROP_PREROLL_QUEUE_LEN,
104   PROP_SYNC
105 };
106
107 static GstElementClass *parent_class = NULL;
108
109 static void gst_base_sink_base_init (gpointer g_class);
110 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
111 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
112 static void gst_base_sink_finalize (GObject * object);
113
114 GType
115 gst_base_sink_get_type (void)
116 {
117   static GType base_sink_type = 0;
118
119   if (!base_sink_type) {
120     static const GTypeInfo base_sink_info = {
121       sizeof (GstBaseSinkClass),
122       (GBaseInitFunc) gst_base_sink_base_init,
123       NULL,
124       (GClassInitFunc) gst_base_sink_class_init,
125       NULL,
126       NULL,
127       sizeof (GstBaseSink),
128       0,
129       (GInstanceInitFunc) gst_base_sink_init,
130     };
131
132     base_sink_type = g_type_register_static (GST_TYPE_ELEMENT,
133         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
134   }
135   return base_sink_type;
136 }
137
138 static void gst_base_sink_set_clock (GstElement * element, GstClock * clock);
139
140 static void gst_base_sink_set_property (GObject * object, guint prop_id,
141     const GValue * value, GParamSpec * pspec);
142 static void gst_base_sink_get_property (GObject * object, guint prop_id,
143     GValue * value, GParamSpec * pspec);
144
145 static gboolean gst_base_sink_send_event (GstElement * element,
146     GstEvent * event);
147 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
148
149 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
150 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
151 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
152     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
153 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
154     GstClockTime * start, GstClockTime * end);
155
156 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
157     GstStateChange transition);
158
159 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
160 static void gst_base_sink_loop (GstPad * pad);
161 static gboolean gst_base_sink_activate (GstPad * pad);
162 static gboolean gst_base_sink_activate_push (GstPad * pad, gboolean active);
163 static gboolean gst_base_sink_activate_pull (GstPad * pad, gboolean active);
164 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
165 static inline GstFlowReturn gst_base_sink_handle_buffer (GstBaseSink * basesink,
166     GstBuffer * buf);
167 static inline gboolean gst_base_sink_handle_event (GstBaseSink * basesink,
168     GstEvent * event);
169
170 static void
171 gst_base_sink_base_init (gpointer g_class)
172 {
173   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
174       "basesink element");
175 }
176
177 static void
178 gst_base_sink_class_init (GstBaseSinkClass * klass)
179 {
180   GObjectClass *gobject_class;
181   GstElementClass *gstelement_class;
182
183   gobject_class = (GObjectClass *) klass;
184   gstelement_class = (GstElementClass *) klass;
185
186   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
187
188   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_sink_finalize);
189   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_sink_set_property);
190   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_sink_get_property);
191
192   /* FIXME, this next value should be configured using an event from the
193    * upstream element */
194   g_object_class_install_property (G_OBJECT_CLASS (klass),
195       PROP_PREROLL_QUEUE_LEN,
196       g_param_spec_uint ("preroll-queue-len", "preroll-queue-len",
197           "Number of buffers to queue during preroll", 0, G_MAXUINT, 0,
198           G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
199   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SYNC,
200       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
201           G_PARAM_READWRITE));
202
203   gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_base_sink_set_clock);
204   gstelement_class->change_state =
205       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
206   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
207   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
208
209   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
210   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
211   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
212   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
213 }
214
215 static GstCaps *
216 gst_base_sink_pad_getcaps (GstPad * pad)
217 {
218   GstBaseSinkClass *bclass;
219   GstBaseSink *bsink;
220   GstCaps *caps = NULL;
221
222   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
223   bclass = GST_BASE_SINK_GET_CLASS (bsink);
224   if (bclass->get_caps)
225     caps = bclass->get_caps (bsink);
226
227   if (caps == NULL) {
228     GstPadTemplate *pad_template;
229
230     pad_template =
231         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
232     if (pad_template != NULL) {
233       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
234     }
235   }
236   gst_object_unref (bsink);
237
238   return caps;
239 }
240
241 static gboolean
242 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
243 {
244   GstBaseSinkClass *bclass;
245   GstBaseSink *bsink;
246   gboolean res = FALSE;
247
248   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
249   bclass = GST_BASE_SINK_GET_CLASS (bsink);
250
251   if (bclass->set_caps)
252     res = bclass->set_caps (bsink, caps);
253
254   gst_object_unref (bsink);
255
256   return res;
257 }
258
259 static GstFlowReturn
260 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
261     GstCaps * caps, GstBuffer ** buf)
262 {
263   GstBaseSinkClass *bclass;
264   GstBaseSink *bsink;
265   GstFlowReturn result = GST_FLOW_OK;
266
267   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
268   bclass = GST_BASE_SINK_GET_CLASS (bsink);
269
270   if (bclass->buffer_alloc)
271     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
272   else
273     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
274
275   gst_object_unref (bsink);
276
277   return result;
278 }
279
280 static void
281 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
282 {
283   GstPadTemplate *pad_template;
284
285   pad_template =
286       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
287   g_return_if_fail (pad_template != NULL);
288
289   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
290
291   gst_pad_set_getcaps_function (basesink->sinkpad,
292       GST_DEBUG_FUNCPTR (gst_base_sink_pad_getcaps));
293   gst_pad_set_setcaps_function (basesink->sinkpad,
294       GST_DEBUG_FUNCPTR (gst_base_sink_pad_setcaps));
295   gst_pad_set_bufferalloc_function (basesink->sinkpad,
296       GST_DEBUG_FUNCPTR (gst_base_sink_pad_buffer_alloc));
297   gst_pad_set_activate_function (basesink->sinkpad,
298       GST_DEBUG_FUNCPTR (gst_base_sink_activate));
299   gst_pad_set_activatepush_function (basesink->sinkpad,
300       GST_DEBUG_FUNCPTR (gst_base_sink_activate_push));
301   gst_pad_set_activatepull_function (basesink->sinkpad,
302       GST_DEBUG_FUNCPTR (gst_base_sink_activate_pull));
303   gst_pad_set_event_function (basesink->sinkpad,
304       GST_DEBUG_FUNCPTR (gst_base_sink_event));
305   gst_pad_set_chain_function (basesink->sinkpad,
306       GST_DEBUG_FUNCPTR (gst_base_sink_chain));
307   gst_element_add_pad (GST_ELEMENT (basesink), basesink->sinkpad);
308
309   basesink->pad_mode = GST_ACTIVATE_NONE;
310   GST_PAD_TASK (basesink->sinkpad) = NULL;
311   basesink->preroll_queue = g_queue_new ();
312
313   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
314   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
315
316   basesink->sync = DEFAULT_SYNC;
317
318   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
319 }
320
321 static void
322 gst_base_sink_finalize (GObject * object)
323 {
324   GstBaseSink *basesink;
325
326   basesink = GST_BASE_SINK (object);
327
328   g_queue_free (basesink->preroll_queue);
329
330   G_OBJECT_CLASS (parent_class)->finalize (object);
331 }
332
333 static void
334 gst_base_sink_set_clock (GstElement * element, GstClock * clock)
335 {
336   GstBaseSink *sink;
337
338   sink = GST_BASE_SINK (element);
339
340   sink->clock = clock;
341 }
342
343 static void
344 gst_base_sink_set_property (GObject * object, guint prop_id,
345     const GValue * value, GParamSpec * pspec)
346 {
347   GstBaseSink *sink = GST_BASE_SINK (object);
348
349   switch (prop_id) {
350     case PROP_PREROLL_QUEUE_LEN:
351       /* preroll lock necessary to serialize with finish_preroll */
352       GST_PAD_PREROLL_LOCK (sink->sinkpad);
353       sink->preroll_queue_max_len = g_value_get_uint (value);
354       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
355       break;
356     case PROP_SYNC:
357       sink->sync = g_value_get_boolean (value);
358       break;
359     default:
360       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
361       break;
362   }
363 }
364
365 static void
366 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
367     GParamSpec * pspec)
368 {
369   GstBaseSink *sink = GST_BASE_SINK (object);
370
371   GST_OBJECT_LOCK (sink);
372   switch (prop_id) {
373     case PROP_PREROLL_QUEUE_LEN:
374       g_value_set_uint (value, sink->preroll_queue_max_len);
375       break;
376     case PROP_SYNC:
377       g_value_set_boolean (value, sink->sync);
378       break;
379     default:
380       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
381       break;
382   }
383   GST_OBJECT_UNLOCK (sink);
384 }
385
386 static GstCaps *
387 gst_base_sink_get_caps (GstBaseSink * sink)
388 {
389   return NULL;
390 }
391
392 static gboolean
393 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
394 {
395   return TRUE;
396 }
397
398 static GstFlowReturn
399 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
400     GstCaps * caps, GstBuffer ** buf)
401 {
402   *buf = NULL;
403   return GST_FLOW_OK;
404 }
405
406 /* with PREROLL_LOCK */
407 static GstFlowReturn
408 gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
409 {
410   GstMiniObject *obj;
411   GQueue *q = basesink->preroll_queue;
412   GstFlowReturn ret;
413
414   ret = GST_FLOW_OK;
415
416   if (q) {
417     GST_DEBUG_OBJECT (basesink, "emptying queue");
418     while ((obj = g_queue_pop_head (q))) {
419       gboolean is_buffer;
420
421       is_buffer = GST_IS_BUFFER (obj);
422       if (G_LIKELY (is_buffer)) {
423         basesink->preroll_queued--;
424         basesink->buffers_queued--;
425       } else {
426         switch (GST_EVENT_TYPE (obj)) {
427           case GST_EVENT_EOS:
428             basesink->preroll_queued--;
429             break;
430           default:
431             break;
432         }
433         basesink->events_queued--;
434       }
435       /* we release the preroll lock while pushing so that we
436        * can still flush it while blocking on the clock or
437        * inside the element. */
438       GST_PAD_PREROLL_UNLOCK (pad);
439
440       if (G_LIKELY (is_buffer)) {
441         GST_DEBUG_OBJECT (basesink, "popped buffer %p", obj);
442         ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER_CAST (obj));
443       } else {
444         GST_DEBUG_OBJECT (basesink, "popped event %p", obj);
445         gst_base_sink_handle_event (basesink, GST_EVENT_CAST (obj));
446         ret = GST_FLOW_OK;
447       }
448
449       GST_PAD_PREROLL_LOCK (pad);
450     }
451     GST_DEBUG_OBJECT (basesink, "queue empty");
452   }
453   return ret;
454 }
455
456 /* with PREROLL_LOCK */
457 static void
458 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
459 {
460   GstMiniObject *obj;
461   GQueue *q = basesink->preroll_queue;
462
463   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
464   if (q) {
465     while ((obj = g_queue_pop_head (q))) {
466       GST_DEBUG_OBJECT (basesink, "popped %p", obj);
467       gst_mini_object_unref (obj);
468     }
469   }
470   /* we can't have EOS anymore now */
471   basesink->eos = FALSE;
472   basesink->eos_queued = FALSE;
473   basesink->preroll_queued = 0;
474   basesink->buffers_queued = 0;
475   basesink->events_queued = 0;
476   basesink->have_preroll = FALSE;
477   /* and signal any waiters now */
478   GST_PAD_PREROLL_SIGNAL (pad);
479 }
480
481 /* with PREROLL_LOCK */
482 static gboolean
483 gst_base_sink_commit_state (GstBaseSink * basesink)
484 {
485   /* commit state and proceed to next pending state */
486   {
487     GstState current, next, pending, post_pending;
488     GstMessage *message;
489     gboolean post_paused = FALSE;
490     gboolean post_playing = FALSE;
491
492     GST_OBJECT_LOCK (basesink);
493     current = GST_STATE (basesink);
494     next = GST_STATE_NEXT (basesink);
495     pending = GST_STATE_PENDING (basesink);
496     post_pending = pending;
497
498     switch (pending) {
499       case GST_STATE_PLAYING:
500         basesink->need_preroll = FALSE;
501         post_playing = TRUE;
502         /* post PAUSED too when we were READY */
503         if (current == GST_STATE_READY) {
504           post_paused = TRUE;
505         }
506         break;
507       case GST_STATE_PAUSED:
508         basesink->need_preroll = TRUE;
509         post_paused = TRUE;
510         post_pending = GST_STATE_VOID_PENDING;
511         break;
512       case GST_STATE_READY:
513         goto stopping;
514       default:
515         break;
516     }
517
518     if (pending != GST_STATE_VOID_PENDING) {
519       GST_STATE (basesink) = pending;
520       GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
521       GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
522       GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
523     }
524     GST_OBJECT_UNLOCK (basesink);
525
526     if (post_paused) {
527       message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
528           current, next, post_pending);
529       gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
530     }
531     if (post_playing) {
532       message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
533           next, pending, GST_STATE_VOID_PENDING);
534       gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
535     }
536     /* and mark dirty */
537     if (post_paused || post_playing) {
538       gst_element_post_message (GST_ELEMENT_CAST (basesink),
539           gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
540     }
541
542     GST_STATE_BROADCAST (basesink);
543   }
544   return TRUE;
545
546 stopping:
547   {
548     /* app is going to READY */
549     GST_OBJECT_UNLOCK (basesink);
550     return FALSE;
551   }
552 }
553
554 /* with STREAM_LOCK */
555 static GstFlowReturn
556 gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
557     GstMiniObject * obj)
558 {
559   gint length;
560   gboolean have_event;
561   GstFlowReturn ret;
562
563   GST_PAD_PREROLL_LOCK (pad);
564   /* push object on the queue */
565   GST_DEBUG_OBJECT (basesink, "push %p on preroll_queue", obj);
566   g_queue_push_tail (basesink->preroll_queue, obj);
567
568   have_event = GST_IS_EVENT (obj);
569   if (have_event) {
570     GstEvent *event = GST_EVENT (obj);
571
572     switch (GST_EVENT_TYPE (obj)) {
573       case GST_EVENT_EOS:
574         basesink->preroll_queued++;
575         basesink->eos = TRUE;
576         basesink->eos_queued = TRUE;
577         break;
578       case GST_EVENT_NEWSEGMENT:
579       {
580         gboolean update;
581         gdouble rate;
582         GstFormat format;
583         gint64 start;
584         gint64 stop;
585         gint64 time;
586
587         /* the newsegment event is needed to bring the buffer timestamps to the
588          * stream time and to drop samples outside of the playback segment. */
589         gst_event_parse_new_segment (event, &update, &rate, &format,
590             &start, &stop, &time);
591
592         basesink->have_newsegment = TRUE;
593
594         gst_segment_set_newsegment (&basesink->segment, update, rate, format,
595             start, stop, time);
596
597         GST_DEBUG_OBJECT (basesink,
598             "received NEWSEGMENT %" GST_TIME_FORMAT " -- %"
599             GST_TIME_FORMAT ", time %" GST_TIME_FORMAT ", accum %"
600             GST_TIME_FORMAT,
601             GST_TIME_ARGS (basesink->segment.start),
602             GST_TIME_ARGS (basesink->segment.stop),
603             GST_TIME_ARGS (basesink->segment.time),
604             GST_TIME_ARGS (basesink->segment.accum));
605         break;
606       }
607       default:
608         break;
609     }
610     basesink->events_queued++;
611   } else {
612     GstBuffer *buf = GST_BUFFER (obj);
613
614     if (!basesink->have_newsegment) {
615       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
616           (_("Internal data flow problem.")),
617           ("Received buffer without a new-segment. Cannot sync to clock."));
618       basesink->have_newsegment = TRUE;
619       /* this means this sink will not be able to sync to the clock */
620       basesink->segment.start = -1;
621       basesink->segment.stop = -1;
622     }
623
624     /* check if the buffer needs to be dropped */
625     if (TRUE) {
626       GstClockTime start = -1, end = -1;
627
628       /* we don't use the subclassed method as it may not return
629        * valid values for our purpose here */
630       gst_base_sink_get_times (basesink, buf, &start, &end);
631
632       GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
633           ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
634           GST_TIME_ARGS (end));
635
636       if (GST_CLOCK_TIME_IS_VALID (start)) {
637         if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME,
638                 (gint64) start, (gint64) end, NULL, NULL))
639           goto dropping;
640       }
641     }
642     basesink->preroll_queued++;
643     basesink->buffers_queued++;
644   }
645
646   GST_DEBUG_OBJECT (basesink,
647       "now %d preroll, %d buffers, %d events on queue",
648       basesink->preroll_queued,
649       basesink->buffers_queued, basesink->events_queued);
650
651   /* check if we are prerolling */
652   if (!basesink->need_preroll)
653     goto no_preroll;
654
655   /* there is a buffer queued */
656   if (basesink->buffers_queued == 1) {
657     GST_DEBUG_OBJECT (basesink, "do preroll %p", obj);
658
659     /* if it's a buffer, we need to call the preroll method */
660     if (GST_IS_BUFFER (obj)) {
661       GstBaseSinkClass *bclass;
662       GstBuffer *buf = GST_BUFFER (obj);
663
664       GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
665           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
666
667       bclass = GST_BASE_SINK_GET_CLASS (basesink);
668       if (bclass->preroll)
669         if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
670           goto preroll_failed;
671     }
672   }
673   length = basesink->preroll_queued;
674   GST_DEBUG_OBJECT (basesink, "prerolled length %d", length);
675
676   if (length == 1) {
677
678     basesink->have_preroll = TRUE;
679
680     /* commit state */
681     if (!gst_base_sink_commit_state (basesink))
682       goto stopping;
683
684     GST_OBJECT_LOCK (pad);
685     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
686       goto flushing;
687     GST_OBJECT_UNLOCK (pad);
688
689     /* it is possible that commiting the state made us go to PLAYING
690      * now in which case we don't need to block anymore. */
691     if (!basesink->need_preroll)
692       goto no_preroll;
693
694     length = basesink->preroll_queued;
695
696     /* FIXME: a pad probe could have made us lose the buffer, according
697      * to one of the python tests */
698     if (length == 0) {
699       GST_ERROR_OBJECT (basesink,
700           "preroll_queued dropped from 1 to 0 while committing state change");
701     }
702     g_assert (length <= 1);
703   }
704
705   /* see if we need to block now. We cannot block on events, only
706    * on buffers, the reason is that events can be sent from the
707    * application thread and we don't want to block there. */
708   if (length > basesink->preroll_queue_max_len && !have_event) {
709     /* block until the state changes, or we get a flush, or something */
710     GST_DEBUG_OBJECT (basesink, "waiting to finish preroll");
711     GST_PAD_PREROLL_WAIT (pad);
712     GST_DEBUG_OBJECT (basesink, "done preroll");
713     GST_OBJECT_LOCK (pad);
714     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
715       goto flushing;
716     GST_OBJECT_UNLOCK (pad);
717   }
718   GST_PAD_PREROLL_UNLOCK (pad);
719
720   return GST_FLOW_OK;
721
722 no_preroll:
723   {
724     GST_DEBUG_OBJECT (basesink, "no preroll needed");
725     /* maybe it was another sink that blocked in preroll, need to check for
726        buffers to drain */
727     basesink->have_preroll = FALSE;
728     ret = gst_base_sink_preroll_queue_empty (basesink, pad);
729     GST_PAD_PREROLL_UNLOCK (pad);
730
731     return ret;
732   }
733 dropping:
734   {
735     GstBuffer *buf;
736
737     buf = GST_BUFFER (g_queue_pop_tail (basesink->preroll_queue));
738
739     gst_buffer_unref (buf);
740     GST_PAD_PREROLL_UNLOCK (pad);
741
742     return GST_FLOW_OK;
743   }
744 flushing:
745   {
746     GST_OBJECT_UNLOCK (pad);
747     gst_base_sink_preroll_queue_flush (basesink, pad);
748     GST_PAD_PREROLL_UNLOCK (pad);
749     GST_DEBUG_OBJECT (basesink, "pad is flushing");
750
751     return GST_FLOW_WRONG_STATE;
752   }
753 stopping:
754   {
755     GST_PAD_PREROLL_UNLOCK (pad);
756     GST_DEBUG_OBJECT (basesink, "stopping");
757
758     return GST_FLOW_WRONG_STATE;
759   }
760 preroll_failed:
761   {
762     GST_DEBUG_OBJECT (basesink, "preroll failed");
763     gst_base_sink_preroll_queue_flush (basesink, pad);
764
765     GST_DEBUG_OBJECT (basesink, "abort state");
766     gst_element_abort_state (GST_ELEMENT (basesink));
767
768     return ret;
769   }
770 }
771
772 static gboolean
773 gst_base_sink_event (GstPad * pad, GstEvent * event)
774 {
775   GstBaseSink *basesink;
776   gboolean result = TRUE;
777   GstBaseSinkClass *bclass;
778
779   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
780
781   bclass = GST_BASE_SINK_GET_CLASS (basesink);
782
783   GST_DEBUG_OBJECT (basesink, "event %p", event);
784
785   switch (GST_EVENT_TYPE (event)) {
786     case GST_EVENT_EOS:
787     {
788       GstFlowReturn ret;
789
790       /* EOS also finishes the preroll */
791       ret =
792           gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
793       break;
794     }
795     case GST_EVENT_NEWSEGMENT:
796     {
797       GstFlowReturn ret;
798
799       ret =
800           gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
801       break;
802     }
803     case GST_EVENT_FLUSH_START:
804       /* make sure we are not blocked on the clock also clear any pending
805        * eos state. */
806       if (bclass->event)
807         bclass->event (basesink, event);
808
809       GST_OBJECT_LOCK (basesink);
810       basesink->flushing = TRUE;
811       if (basesink->clock_id) {
812         gst_clock_id_unschedule (basesink->clock_id);
813       }
814       GST_OBJECT_UNLOCK (basesink);
815
816       GST_PAD_PREROLL_LOCK (pad);
817       /* we need preroll after the flush */
818       GST_DEBUG_OBJECT (basesink, "flushing, need preroll after flush");
819       basesink->need_preroll = TRUE;
820       /* unlock from a possible state change/preroll */
821       gst_base_sink_preroll_queue_flush (basesink, pad);
822       GST_PAD_PREROLL_UNLOCK (pad);
823
824       /* and we need to commit our state again on the next
825        * prerolled buffer */
826       GST_PAD_STREAM_LOCK (pad);
827       gst_element_lost_state (GST_ELEMENT (basesink));
828       GST_PAD_STREAM_UNLOCK (pad);
829       GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event);
830       gst_event_unref (event);
831       break;
832     case GST_EVENT_FLUSH_STOP:
833       if (bclass->event)
834         bclass->event (basesink, event);
835
836       /* now we are completely unblocked and the _chain method
837        * will return */
838       GST_OBJECT_LOCK (basesink);
839       basesink->flushing = FALSE;
840       GST_OBJECT_UNLOCK (basesink);
841       /* we need new segment info after the flush. */
842       gst_segment_init (&basesink->segment, GST_FORMAT_TIME);
843
844       GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event);
845       gst_event_unref (event);
846       break;
847     default:
848       gst_event_unref (event);
849       break;
850   }
851   gst_object_unref (basesink);
852
853   return result;
854 }
855
856 /* default implementation to calculate the start and end
857  * timestamps on a buffer, subclasses can override
858  */
859 static void
860 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
861     GstClockTime * start, GstClockTime * end)
862 {
863   GstClockTime timestamp, duration;
864
865   timestamp = GST_BUFFER_TIMESTAMP (buffer);
866   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
867
868     /* get duration to calculate end time */
869     duration = GST_BUFFER_DURATION (buffer);
870     if (GST_CLOCK_TIME_IS_VALID (duration)) {
871       *end = timestamp + duration;
872     }
873     *start = timestamp;
874   }
875 }
876
877 /* with STREAM_LOCK and LOCK*/
878 static GstClockReturn
879 gst_base_sink_wait (GstBaseSink * basesink, GstClockTime time)
880 {
881   GstClockReturn ret;
882   GstClockID id;
883
884   /* no need to attempt a clock wait if we are flushing */
885   if (basesink->flushing) {
886     return GST_CLOCK_UNSCHEDULED;
887   }
888
889   /* clock_id should be NULL outside of this function */
890   g_assert (basesink->clock_id == NULL);
891   g_assert (GST_CLOCK_TIME_IS_VALID (time));
892
893   id = gst_clock_new_single_shot_id (basesink->clock, time);
894
895   basesink->clock_id = id;
896   /* release the object lock while waiting */
897   GST_OBJECT_UNLOCK (basesink);
898
899   ret = gst_clock_id_wait (id, NULL);
900
901   GST_OBJECT_LOCK (basesink);
902   gst_clock_id_unref (id);
903   basesink->clock_id = NULL;
904
905   return ret;
906 }
907
908 /* perform synchronisation on a buffer
909  *
910  * 1) check if we have a clock, if not, do nothing
911  * 2) calculate the start and end time of the buffer
912  * 3) create a single shot notification to wait on
913  *    the clock, save the entry so we can unlock it
914  * 4) wait on the clock, this blocks
915  * 5) unref the clockid again
916  */
917 static GstClockReturn
918 gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
919 {
920   GstClockReturn result = GST_CLOCK_OK;
921   GstClockTime start, end;
922   gint64 cstart, cend;
923   GstBaseSinkClass *bclass;
924
925   bclass = GST_BASE_SINK_GET_CLASS (basesink);
926
927   start = end = -1;
928   if (bclass->get_times)
929     bclass->get_times (basesink, buffer, &start, &end);
930
931   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
932       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
933
934   /* if we don't have a timestamp, we don't sync */
935   if (!GST_CLOCK_TIME_IS_VALID (start)) {
936     GST_DEBUG_OBJECT (basesink, "start not valid");
937     goto done;
938   }
939
940   /* save last times seen. */
941   if (GST_CLOCK_TIME_IS_VALID (end))
942     gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME,
943         (gint64) end);
944   else
945     gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_TIME,
946         (gint64) start);
947
948   /* clip */
949   if (!gst_segment_clip (&basesink->segment, GST_FORMAT_TIME,
950           (gint64) start, (gint64) end, &cstart, &cend))
951     goto out_of_segment;
952
953   if (!basesink->sync) {
954     GST_DEBUG_OBJECT (basesink, "no need to sync");
955     goto done;
956   }
957
958   /* now do clocking */
959   if (basesink->clock) {
960     GstClockTime base_time;
961     GstClockTimeDiff stream_start, stream_end;
962
963     stream_start =
964         gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME,
965         cstart);
966     stream_end =
967         gst_segment_to_running_time (&basesink->segment, GST_FORMAT_TIME, cend);
968
969     GST_OBJECT_LOCK (basesink);
970
971     base_time = GST_ELEMENT_CAST (basesink)->base_time;
972
973     GST_LOG_OBJECT (basesink,
974         "waiting for clock, base time %" GST_TIME_FORMAT
975         " stream_start %" GST_TIME_FORMAT,
976         GST_TIME_ARGS (base_time), GST_TIME_ARGS (stream_start));
977
978     /* also save end_time of this buffer so that we can wait
979      * to signal EOS */
980     if (GST_CLOCK_TIME_IS_VALID (stream_end))
981       basesink->end_time = stream_end + base_time;
982     else
983       basesink->end_time = GST_CLOCK_TIME_NONE;
984
985     result = gst_base_sink_wait (basesink, stream_start + base_time);
986
987     GST_OBJECT_UNLOCK (basesink);
988
989     GST_LOG_OBJECT (basesink, "clock entry done: %d", result);
990   } else {
991     GST_DEBUG_OBJECT (basesink, "no clock, not syncing");
992   }
993
994 done:
995   return result;
996
997 out_of_segment:
998   {
999     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
1000     return GST_CLOCK_UNSCHEDULED;
1001   }
1002 }
1003
1004
1005 /* handle an event
1006  *
1007  * 2) render the event
1008  * 3) unref the event
1009  */
1010 static inline gboolean
1011 gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event)
1012 {
1013   GstBaseSinkClass *bclass;
1014   gboolean ret;
1015
1016   switch (GST_EVENT_TYPE (event)) {
1017     case GST_EVENT_EOS:
1018       GST_OBJECT_LOCK (basesink);
1019       if (basesink->clock) {
1020         /* wait for last buffer to finish if we have a valid end time */
1021         if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
1022           gst_base_sink_wait (basesink, basesink->end_time);
1023           basesink->end_time = GST_CLOCK_TIME_NONE;
1024         }
1025       }
1026       GST_OBJECT_UNLOCK (basesink);
1027       break;
1028     default:
1029       break;
1030   }
1031
1032   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1033   if (bclass->event)
1034     ret = bclass->event (basesink, event);
1035   else
1036     ret = TRUE;
1037
1038   switch (GST_EVENT_TYPE (event)) {
1039     case GST_EVENT_EOS:
1040       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1041       /* if we are still EOS, we can post the EOS message */
1042       if (basesink->eos) {
1043         /* ok, now we can post the message */
1044         GST_DEBUG_OBJECT (basesink, "Now posting EOS");
1045         gst_element_post_message (GST_ELEMENT_CAST (basesink),
1046             gst_message_new_eos (GST_OBJECT_CAST (basesink)));
1047         basesink->eos_queued = FALSE;
1048       }
1049       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1050       break;
1051     default:
1052       break;
1053   }
1054
1055   GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event);
1056   gst_event_unref (event);
1057
1058   return ret;
1059 }
1060
1061 /* handle a buffer
1062  *
1063  * 1) first sync on the buffer
1064  * 2) render the buffer
1065  * 3) unref the buffer
1066  */
1067 static inline GstFlowReturn
1068 gst_base_sink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf)
1069 {
1070   GstFlowReturn ret = GST_FLOW_OK;
1071   GstClockReturn status;
1072
1073   status = gst_base_sink_do_sync (basesink, buf);
1074   switch (status) {
1075     case GST_CLOCK_EARLY:
1076       GST_DEBUG_OBJECT (basesink, "buffer too late!, rendering anyway");
1077       /* fallthrough for now */
1078     case GST_CLOCK_OK:
1079     {
1080       GstBaseSinkClass *bclass;
1081
1082       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1083       if (bclass->render)
1084         ret = bclass->render (basesink, buf);
1085       break;
1086     }
1087     default:
1088       GST_DEBUG_OBJECT (basesink, "clock returned %d, not rendering", status);
1089       break;
1090   }
1091
1092   GST_DEBUG_OBJECT (basesink, "buffer unref after render %p", basesink, buf);
1093   gst_buffer_unref (buf);
1094
1095   return ret;
1096 }
1097
1098 static GstFlowReturn
1099 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
1100 {
1101   GstBaseSink *basesink;
1102   GstFlowReturn result;
1103
1104   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
1105
1106   if (!(basesink->pad_mode == GST_ACTIVATE_PUSH)) {
1107     GST_OBJECT_LOCK (pad);
1108     g_warning ("Push on pad %s:%s, but it was not activated in push mode",
1109         GST_DEBUG_PAD_NAME (pad));
1110     GST_OBJECT_UNLOCK (pad);
1111     result = GST_FLOW_UNEXPECTED;
1112     goto done;
1113   }
1114
1115   result =
1116       gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT_CAST (buf));
1117
1118 done:
1119   gst_object_unref (basesink);
1120
1121   return result;
1122 }
1123
1124 static void
1125 gst_base_sink_loop (GstPad * pad)
1126 {
1127   GstBaseSink *basesink;
1128   GstBuffer *buf = NULL;
1129   GstFlowReturn result;
1130
1131   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
1132
1133   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
1134
1135   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
1136   if (result != GST_FLOW_OK)
1137     goto paused;
1138
1139   result = gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (buf));
1140   if (result != GST_FLOW_OK)
1141     goto paused;
1142
1143   gst_object_unref (basesink);
1144
1145   /* default */
1146   return;
1147
1148 paused:
1149   {
1150     gst_base_sink_event (pad, gst_event_new_eos ());
1151     gst_object_unref (basesink);
1152     gst_pad_pause_task (pad);
1153     return;
1154   }
1155 }
1156
1157 static gboolean
1158 gst_base_sink_deactivate (GstBaseSink * basesink, GstPad * pad)
1159 {
1160   gboolean result = FALSE;
1161   GstBaseSinkClass *bclass;
1162
1163   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1164
1165   /* step 1, unblock clock sync (if any) or any other blocking thing */
1166   GST_PAD_PREROLL_LOCK (pad);
1167   GST_OBJECT_LOCK (basesink);
1168   if (basesink->clock_id) {
1169     gst_clock_id_unschedule (basesink->clock_id);
1170   }
1171   GST_OBJECT_UNLOCK (basesink);
1172
1173   /* unlock any subclasses */
1174   if (bclass->unlock)
1175     bclass->unlock (basesink);
1176
1177   /* flush out the data thread if it's locked in finish_preroll */
1178   GST_DEBUG_OBJECT (basesink,
1179       "flushing out data thread, need preroll to FALSE");
1180   basesink->need_preroll = FALSE;
1181   gst_base_sink_preroll_queue_flush (basesink, pad);
1182   GST_PAD_PREROLL_SIGNAL (pad);
1183   GST_PAD_PREROLL_UNLOCK (pad);
1184
1185   /* step 2, make sure streaming finishes */
1186   result = gst_pad_stop_task (pad);
1187
1188   return result;
1189 }
1190
1191 static gboolean
1192 gst_base_sink_activate (GstPad * pad)
1193 {
1194   gboolean result = FALSE;
1195   GstBaseSink *basesink;
1196
1197   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
1198
1199   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
1200
1201   if (basesink->can_activate_pull && gst_pad_check_pull_range (pad)
1202       && gst_pad_activate_pull (pad, TRUE)) {
1203     GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
1204     result = TRUE;
1205   } else {
1206     GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
1207     if (gst_pad_activate_push (pad, TRUE)) {
1208       GST_DEBUG_OBJECT (basesink, "Success activating push mode");
1209       result = TRUE;
1210     }
1211   }
1212
1213   if (!result) {
1214     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
1215   }
1216
1217   gst_object_unref (basesink);
1218
1219   return result;
1220 }
1221
1222 static gboolean
1223 gst_base_sink_activate_push (GstPad * pad, gboolean active)
1224 {
1225   gboolean result;
1226   GstBaseSink *basesink;
1227
1228   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
1229
1230   if (active) {
1231     if (!basesink->can_activate_push) {
1232       result = FALSE;
1233       basesink->pad_mode = GST_ACTIVATE_NONE;
1234     } else {
1235       result = TRUE;
1236       basesink->pad_mode = GST_ACTIVATE_PUSH;
1237     }
1238   } else {
1239     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
1240       g_warning ("Internal GStreamer activation error!!!");
1241       result = FALSE;
1242     } else {
1243       result = gst_base_sink_deactivate (basesink, pad);
1244       basesink->pad_mode = GST_ACTIVATE_NONE;
1245     }
1246   }
1247
1248   gst_object_unref (basesink);
1249
1250   return result;
1251 }
1252
1253 /* this won't get called until we implement an activate function */
1254 static gboolean
1255 gst_base_sink_activate_pull (GstPad * pad, gboolean active)
1256 {
1257   gboolean result = FALSE;
1258   GstBaseSink *basesink;
1259
1260   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
1261
1262   if (active) {
1263     if (!basesink->can_activate_pull) {
1264       result = FALSE;
1265       basesink->pad_mode = GST_ACTIVATE_NONE;
1266     } else {
1267       GstPad *peer = gst_pad_get_peer (pad);
1268
1269       if (G_UNLIKELY (peer == NULL)) {
1270         g_warning ("Trying to activate pad in pull mode, but no peer");
1271         result = FALSE;
1272         basesink->pad_mode = GST_ACTIVATE_NONE;
1273       } else {
1274         if (gst_pad_activate_pull (peer, TRUE)) {
1275           basesink->have_newsegment = TRUE;
1276           gst_segment_init (&basesink->segment, GST_FORMAT_TIME);
1277
1278           /* set the pad mode before starting the task so that it's in the
1279              correct state for the new thread... */
1280           basesink->pad_mode = GST_ACTIVATE_PULL;
1281           result =
1282               gst_pad_start_task (pad, (GstTaskFunction) gst_base_sink_loop,
1283               pad);
1284           /* but if starting the thread fails, set it back */
1285           if (!result)
1286             basesink->pad_mode = GST_ACTIVATE_NONE;
1287         } else {
1288           GST_DEBUG_OBJECT (pad, "Failed to activate peer in pull mode");
1289           result = FALSE;
1290           basesink->pad_mode = GST_ACTIVATE_NONE;
1291         }
1292         gst_object_unref (peer);
1293       }
1294     }
1295   } else {
1296     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
1297       g_warning ("Internal GStreamer activation error!!!");
1298       result = FALSE;
1299     } else {
1300       basesink->have_newsegment = FALSE;
1301       result = gst_base_sink_deactivate (basesink, pad);
1302       basesink->pad_mode = GST_ACTIVATE_NONE;
1303     }
1304   }
1305
1306   gst_object_unref (basesink);
1307
1308   return result;
1309 }
1310
1311 static gboolean
1312 gst_base_sink_send_event (GstElement * element, GstEvent * event)
1313 {
1314   GstPad *pad;
1315   GstBaseSink *basesink = GST_BASE_SINK (element);
1316   gboolean result;
1317
1318   GST_OBJECT_LOCK (element);
1319   pad = basesink->sinkpad;
1320   gst_object_ref (pad);
1321   GST_OBJECT_UNLOCK (element);
1322
1323   result = gst_pad_push_event (pad, event);
1324
1325   gst_object_unref (pad);
1326
1327   return result;
1328 }
1329
1330 static gboolean
1331 gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query)
1332 {
1333   GstPad *peer;
1334   gboolean res = FALSE;
1335
1336   if ((peer = gst_pad_get_peer (sink->sinkpad))) {
1337     res = gst_pad_query (peer, query);
1338     gst_object_unref (peer);
1339   }
1340   return res;
1341 }
1342
1343 static gboolean
1344 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
1345     gint64 * cur)
1346 {
1347   GstClock *clock;
1348   gboolean res = FALSE;
1349
1350   switch (format) {
1351     case GST_FORMAT_TIME:
1352     {
1353       /* we can answer time format */
1354       GST_OBJECT_LOCK (basesink);
1355       if ((clock = GST_ELEMENT_CLOCK (basesink))) {
1356         GstClockTime now;
1357         gint64 time;
1358
1359         gst_object_ref (clock);
1360         GST_OBJECT_UNLOCK (basesink);
1361
1362         now = gst_clock_get_time (clock);
1363
1364         GST_OBJECT_LOCK (basesink);
1365         if (GST_CLOCK_TIME_IS_VALID (basesink->segment.time))
1366           time = basesink->segment.time;
1367         else
1368           time = 0;
1369
1370         *cur = now - GST_ELEMENT_CAST (basesink)->base_time -
1371             basesink->segment.accum + time;
1372
1373         GST_DEBUG_OBJECT (basesink,
1374             "now %" GST_TIME_FORMAT " + segment_time %" GST_TIME_FORMAT " = %"
1375             GST_TIME_FORMAT, GST_TIME_ARGS (now),
1376             GST_TIME_ARGS (time), GST_TIME_ARGS (*cur));
1377
1378         gst_object_unref (clock);
1379
1380         res = TRUE;
1381       }
1382       GST_OBJECT_UNLOCK (basesink);
1383     }
1384     default:
1385       break;
1386   }
1387   return res;
1388 }
1389
1390 static gboolean
1391 gst_base_sink_query (GstElement * element, GstQuery * query)
1392 {
1393   gboolean res = FALSE;
1394
1395   GstBaseSink *basesink = GST_BASE_SINK (element);
1396
1397   switch (GST_QUERY_TYPE (query)) {
1398     case GST_QUERY_POSITION:
1399     {
1400       gint64 cur = 0;
1401       GstFormat format;
1402       gboolean eos;
1403
1404       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1405       eos = basesink->eos;
1406       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1407
1408       if (eos) {
1409         res = gst_base_sink_peer_query (basesink, query);
1410       } else {
1411         gst_query_parse_position (query, &format, NULL);
1412
1413         GST_DEBUG_OBJECT (basesink, "current position format %d", format);
1414
1415         if ((res = gst_base_sink_get_position (basesink, format, &cur))) {
1416           gst_query_set_position (query, format, cur);
1417         } else {
1418           res = gst_base_sink_peer_query (basesink, query);
1419         }
1420       }
1421       break;
1422     }
1423     case GST_QUERY_DURATION:
1424       res = gst_base_sink_peer_query (basesink, query);
1425       break;
1426     case GST_QUERY_LATENCY:
1427       break;
1428     case GST_QUERY_JITTER:
1429       break;
1430     case GST_QUERY_RATE:
1431       //gst_query_set_rate (query, basesink->segment_rate);
1432       res = TRUE;
1433       break;
1434     case GST_QUERY_SEGMENT:
1435     {
1436       /* FIXME, bring start/stop to stream time */
1437       gst_query_set_segment (query, basesink->segment.rate,
1438           GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
1439       break;
1440     }
1441     case GST_QUERY_SEEKING:
1442     case GST_QUERY_CONVERT:
1443     case GST_QUERY_FORMATS:
1444     default:
1445       res = gst_base_sink_peer_query (basesink, query);
1446       break;
1447   }
1448   return res;
1449 }
1450
1451 static GstStateChangeReturn
1452 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
1453 {
1454   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1455   GstBaseSink *basesink = GST_BASE_SINK (element);
1456   GstBaseSinkClass *bclass;
1457
1458   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1459
1460   switch (transition) {
1461     case GST_STATE_CHANGE_NULL_TO_READY:
1462       if (bclass->start)
1463         if (!bclass->start (basesink))
1464           goto start_failed;
1465       break;
1466     case GST_STATE_CHANGE_READY_TO_PAUSED:
1467       /* need to complete preroll before this state change completes, there
1468        * is no data flow in READY so we can safely assume we need to preroll. */
1469       basesink->offset = 0;
1470       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1471       basesink->have_preroll = FALSE;
1472       GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll to FALSE");
1473       basesink->need_preroll = TRUE;
1474       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1475       gst_segment_init (&basesink->segment, GST_FORMAT_TIME);
1476       basesink->have_newsegment = FALSE;
1477       ret = GST_STATE_CHANGE_ASYNC;
1478       break;
1479     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1480       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1481       /* no preroll needed */
1482       basesink->need_preroll = FALSE;
1483
1484       /* if we have EOS, we should empty the queue now as there will
1485        * be no more data received in the chain function.
1486        * FIXME, this could block the state change function too long when
1487        * we are pushing and syncing the buffers, better start a new
1488        * thread to do this. */
1489       if (basesink->eos) {
1490         gboolean do_eos = !basesink->eos_queued;
1491
1492         gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad);
1493
1494         /* need to post EOS message here if it was not in the preroll queue we
1495          * just emptied. */
1496         if (do_eos) {
1497           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
1498           gst_element_post_message (GST_ELEMENT_CAST (basesink),
1499               gst_message_new_eos (GST_OBJECT_CAST (basesink)));
1500         }
1501       } else if (!basesink->have_preroll) {
1502         /* queue a commit_state */
1503         basesink->need_preroll = TRUE;
1504         GST_DEBUG_OBJECT (basesink,
1505             "PAUSED to PLAYING, !eos, !have_preroll, need preroll to FALSE");
1506         ret = GST_STATE_CHANGE_ASYNC;
1507         /* we know it's not waiting, no need to signal */
1508       } else {
1509         GST_DEBUG_OBJECT (basesink,
1510             "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE");
1511         /* now let it play */
1512         GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
1513       }
1514       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1515       break;
1516     default:
1517       break;
1518   }
1519
1520   {
1521     GstStateChangeReturn bret;
1522
1523     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1524     if (bret == GST_STATE_CHANGE_FAILURE)
1525       goto activate_failed;
1526   }
1527
1528   switch (transition) {
1529     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1530     {
1531       GstBaseSinkClass *bclass;
1532
1533       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1534
1535       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
1536       GST_OBJECT_LOCK (basesink);
1537       /* unlock clock wait if any */
1538       if (basesink->clock_id) {
1539         gst_clock_id_unschedule (basesink->clock_id);
1540       }
1541       GST_OBJECT_UNLOCK (basesink);
1542
1543       /* unlock any subclasses */
1544       if (bclass->unlock)
1545         bclass->unlock (basesink);
1546
1547       /* if we don't have a preroll buffer and we have not received EOS,
1548        * we need to wait for a preroll */
1549       GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d",
1550           basesink->have_preroll, basesink->eos);
1551       if (!basesink->have_preroll && !basesink->eos
1552           && GST_STATE_PENDING (basesink) == GST_STATE_PAUSED) {
1553         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll to TRUE");
1554         basesink->need_preroll = TRUE;
1555         ret = GST_STATE_CHANGE_ASYNC;
1556       }
1557       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
1558       break;
1559     }
1560     case GST_STATE_CHANGE_PAUSED_TO_READY:
1561       break;
1562     case GST_STATE_CHANGE_READY_TO_NULL:
1563       if (bclass->stop)
1564         if (!bclass->stop (basesink)) {
1565           GST_WARNING ("failed to stop");
1566         }
1567       break;
1568     default:
1569       break;
1570   }
1571
1572   return ret;
1573
1574   /* ERRORS */
1575 start_failed:
1576   {
1577     GST_DEBUG_OBJECT (basesink, "failed to start");
1578     return GST_STATE_CHANGE_FAILURE;
1579   }
1580 activate_failed:
1581   {
1582     GST_DEBUG_OBJECT (basesink,
1583         "element failed to change states -- activation problem?");
1584     return GST_STATE_CHANGE_FAILURE;
1585   }
1586 }