Allow elements to post EOS in the state change function.
[platform/upstream/gstreamer.git] / gst / gstpipeline.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2004,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstpipeline.c: Overall pipeline management element
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #include "gst_private.h"
24
25 #include "gstpipeline.h"
26 #include "gstinfo.h"
27 #include "gstscheduler.h"
28 #include "gstsystemclock.h"
29
30 static GstElementDetails gst_pipeline_details =
31 GST_ELEMENT_DETAILS ("Pipeline object",
32     "Generic/Bin",
33     "Complete pipeline object",
34     "Erik Walthinsen <omega@cse.ogi.edu>, Wim Taymans <wim@fluendo.com>");
35
36 /* Pipeline signals and args */
37 enum
38 {
39   /* FILL ME */
40   LAST_SIGNAL
41 };
42
43 #define DEFAULT_DELAY 0
44 #define DEFAULT_PLAY_TIMEOUT  (2*GST_SECOND)
45 enum
46 {
47   ARG_0,
48   ARG_DELAY,
49   ARG_PLAY_TIMEOUT,
50   /* FILL ME */
51 };
52
53
54 static void gst_pipeline_base_init (gpointer g_class);
55 static void gst_pipeline_class_init (gpointer g_class, gpointer class_data);
56 static void gst_pipeline_init (GTypeInstance * instance, gpointer g_class);
57
58 static void gst_pipeline_dispose (GObject * object);
59 static void gst_pipeline_set_property (GObject * object, guint prop_id,
60     const GValue * value, GParamSpec * pspec);
61 static void gst_pipeline_get_property (GObject * object, guint prop_id,
62     GValue * value, GParamSpec * pspec);
63
64 static gboolean gst_pipeline_send_event (GstElement * element,
65     GstEvent * event);
66 static GstBusSyncReply pipeline_bus_handler (GstBus * bus, GstMessage * message,
67     GstPipeline * pipeline);
68
69 static GstClock *gst_pipeline_get_clock_func (GstElement * element);
70 static GstElementStateReturn gst_pipeline_change_state (GstElement * element);
71
72 static GstBinClass *parent_class = NULL;
73
74 /* static guint gst_pipeline_signals[LAST_SIGNAL] = { 0 }; */
75
76 GType
77 gst_pipeline_get_type (void)
78 {
79   static GType pipeline_type = 0;
80
81   if (!pipeline_type) {
82     static const GTypeInfo pipeline_info = {
83       sizeof (GstPipelineClass),
84       gst_pipeline_base_init,
85       NULL,
86       (GClassInitFunc) gst_pipeline_class_init,
87       NULL,
88       NULL,
89       sizeof (GstPipeline),
90       0,
91       gst_pipeline_init,
92       NULL
93     };
94
95     pipeline_type =
96         g_type_register_static (GST_TYPE_BIN, "GstPipeline", &pipeline_info, 0);
97   }
98   return pipeline_type;
99 }
100
101 static void
102 gst_pipeline_base_init (gpointer g_class)
103 {
104   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
105
106   gst_element_class_set_details (gstelement_class, &gst_pipeline_details);
107 }
108
109 static void
110 gst_pipeline_class_init (gpointer g_class, gpointer class_data)
111 {
112   GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
113   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
114   GstPipelineClass *klass = GST_PIPELINE_CLASS (g_class);
115
116   parent_class = g_type_class_peek_parent (klass);
117
118   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pipeline_set_property);
119   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pipeline_get_property);
120
121   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DELAY,
122       g_param_spec_uint64 ("delay", "Delay",
123           "Expected delay needed for elements "
124           "to spin up to PLAYING in nanoseconds", 0, G_MAXUINT64, DEFAULT_DELAY,
125           G_PARAM_READWRITE));
126   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PLAY_TIMEOUT,
127       g_param_spec_uint64 ("play-timeout", "Play Timeout",
128           "Max timeout for going " "to PLAYING in nanoseconds", 0, G_MAXUINT64,
129           DEFAULT_PLAY_TIMEOUT, G_PARAM_READWRITE));
130
131   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pipeline_dispose);
132
133   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_pipeline_send_event);
134   gstelement_class->change_state =
135       GST_DEBUG_FUNCPTR (gst_pipeline_change_state);
136   gstelement_class->get_clock = GST_DEBUG_FUNCPTR (gst_pipeline_get_clock_func);
137 }
138
139 static void
140 gst_pipeline_init (GTypeInstance * instance, gpointer g_class)
141 {
142   GstScheduler *scheduler;
143   GstPipeline *pipeline = GST_PIPELINE (instance);
144   GstBus *bus;
145
146   /* get an instance of the default scheduler */
147   scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (pipeline));
148
149   /* FIXME need better error handling */
150   if (scheduler == NULL) {
151     const gchar *name = gst_scheduler_factory_get_default_name ();
152
153     g_error ("Critical error: could not get scheduler \"%s\"\n"
154         "Are you sure you have a registry ?\n"
155         "Run gst-register as root if you haven't done so yet.", name);
156   }
157   bus = g_object_new (gst_bus_get_type (), NULL);
158   gst_bus_set_sync_handler (bus,
159       (GstBusSyncHandler) pipeline_bus_handler, pipeline);
160   pipeline->eosed = NULL;
161   pipeline->delay = DEFAULT_DELAY;
162   pipeline->play_timeout = DEFAULT_PLAY_TIMEOUT;
163   /* we are our own manager */
164   GST_ELEMENT_MANAGER (pipeline) = pipeline;
165   gst_element_set_bus (GST_ELEMENT (pipeline), bus);
166   /* set_bus refs the bus via gst_object_replace, we drop our ref */
167   gst_object_unref ((GstObject *) bus);
168   gst_element_set_scheduler (GST_ELEMENT (pipeline), scheduler);
169 }
170
171 static void
172 gst_pipeline_dispose (GObject * object)
173 {
174   GstPipeline *pipeline = GST_PIPELINE (object);
175
176   gst_element_set_bus (GST_ELEMENT (pipeline), NULL);
177   gst_scheduler_reset (GST_ELEMENT_SCHEDULER (object));
178   gst_object_replace ((GstObject **) & pipeline->fixed_clock, NULL);
179
180   G_OBJECT_CLASS (parent_class)->dispose (object);
181 }
182
183 static void
184 gst_pipeline_set_property (GObject * object, guint prop_id,
185     const GValue * value, GParamSpec * pspec)
186 {
187   GstPipeline *pipeline = GST_PIPELINE (object);
188
189   GST_LOCK (pipeline);
190   switch (prop_id) {
191     case ARG_DELAY:
192       pipeline->delay = g_value_get_uint64 (value);
193       break;
194     case ARG_PLAY_TIMEOUT:
195       pipeline->play_timeout = g_value_get_uint64 (value);
196       break;
197     default:
198       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
199       break;
200   }
201   GST_UNLOCK (pipeline);
202 }
203
204 static void
205 gst_pipeline_get_property (GObject * object, guint prop_id,
206     GValue * value, GParamSpec * pspec)
207 {
208   GstPipeline *pipeline = GST_PIPELINE (object);
209
210   GST_LOCK (pipeline);
211   switch (prop_id) {
212     case ARG_DELAY:
213       g_value_set_uint64 (value, pipeline->delay);
214       break;
215     case ARG_PLAY_TIMEOUT:
216       g_value_set_uint64 (value, pipeline->play_timeout);
217       break;
218     default:
219       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
220       break;
221   }
222   GST_UNLOCK (pipeline);
223 }
224
225 static gboolean
226 is_eos (GstPipeline * pipeline)
227 {
228   GstIterator *sinks;
229   gboolean result = TRUE;
230   gboolean done = FALSE;
231
232   sinks = gst_bin_iterate_sinks (GST_BIN (pipeline));
233   while (!done) {
234     gpointer data;
235
236     switch (gst_iterator_next (sinks, &data)) {
237       case GST_ITERATOR_OK:
238       {
239         GstElement *element = GST_ELEMENT (data);
240         GList *eosed;
241         gchar *name;
242
243         name = gst_element_get_name (element);
244         eosed = g_list_find (pipeline->eosed, element);
245         if (!eosed) {
246           GST_DEBUG ("element %s did not post EOS yet", name);
247           result = FALSE;
248           done = TRUE;
249         } else {
250           GST_DEBUG ("element %s posted EOS", name);
251         }
252         g_free (name);
253         gst_object_unref (GST_OBJECT (element));
254         break;
255       }
256       case GST_ITERATOR_RESYNC:
257         result = TRUE;
258         gst_iterator_resync (sinks);
259         break;
260       case GST_ITERATOR_DONE:
261         done = TRUE;
262         break;
263       default:
264         g_assert_not_reached ();
265         break;
266     }
267   }
268   gst_iterator_free (sinks);
269   return result;
270 }
271
272 /* sending an event on the pipeline pauses the pipeline if it
273  * was playing.
274  */
275 static gboolean
276 gst_pipeline_send_event (GstElement * element, GstEvent * event)
277 {
278   gboolean was_playing;
279   gboolean res;
280   GstElementState state;
281   GstEventType event_type = GST_EVENT_TYPE (event);
282   GTimeVal timeout;
283
284   /* need to call _get_state() since a bin state is only updated
285    * with this call. */
286   GST_TIME_TO_TIMEVAL (0, timeout);
287
288   gst_element_get_state (element, &state, NULL, &timeout);
289   was_playing = state == GST_STATE_PLAYING;
290
291   if (was_playing && event_type == GST_EVENT_SEEK)
292     gst_element_set_state (element, GST_STATE_PAUSED);
293
294   res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
295
296   if (was_playing && event_type == GST_EVENT_SEEK)
297     gst_element_set_state (element, GST_STATE_PLAYING);
298
299   return res;
300 }
301
302 /* FIXME, make me threadsafe */
303 static GstBusSyncReply
304 pipeline_bus_handler (GstBus * bus, GstMessage * message,
305     GstPipeline * pipeline)
306 {
307   GstBusSyncReply result = GST_BUS_PASS;
308   gboolean posteos = FALSE;
309
310   /* we don't want messages from the streaming thread while we're doing the 
311    * state change. We do want them from the state change functions. */
312
313   switch (GST_MESSAGE_TYPE (message)) {
314     case GST_MESSAGE_EOS:
315       if (GST_MESSAGE_SRC (message) != GST_OBJECT (pipeline)) {
316         GST_DEBUG ("got EOS message");
317         GST_LOCK (bus);
318         pipeline->eosed =
319             g_list_prepend (pipeline->eosed, GST_MESSAGE_SRC (message));
320         GST_UNLOCK (bus);
321         if (is_eos (pipeline)) {
322           posteos = TRUE;
323           GST_DEBUG ("all sinks posted EOS");
324         }
325         /* we drop all EOS messages */
326         result = GST_BUS_DROP;
327         gst_message_unref (message);
328       }
329       break;
330     case GST_MESSAGE_ERROR:
331       break;
332     default:
333       break;
334   }
335
336   if (posteos) {
337     gst_bus_post (bus, gst_message_new_eos (GST_OBJECT (pipeline)));
338   }
339
340   return result;
341 }
342
343 /**
344  * gst_pipeline_new:
345  * @name: name of new pipeline
346  *
347  * Create a new pipeline with the given name.
348  *
349  * Returns: newly created GstPipeline
350  *
351  * MT safe.
352  */
353 GstElement *
354 gst_pipeline_new (const gchar * name)
355 {
356   return gst_element_factory_make ("pipeline", name);
357 }
358
359 /* MT safe */
360 static GstElementStateReturn
361 gst_pipeline_change_state (GstElement * element)
362 {
363   GstElementStateReturn result = GST_STATE_SUCCESS;
364   GstPipeline *pipeline = GST_PIPELINE (element);
365   gint transition = GST_STATE_TRANSITION (element);
366
367   switch (transition) {
368     case GST_STATE_NULL_TO_READY:
369       if (element->bus)
370         gst_bus_set_flushing (element->bus, FALSE);
371       gst_scheduler_setup (GST_ELEMENT_SCHEDULER (pipeline));
372       break;
373     case GST_STATE_READY_TO_PAUSED:
374     {
375       GstClock *clock;
376
377       clock = gst_element_get_clock (element);
378       gst_element_set_clock (element, clock);
379       pipeline->eosed = NULL;
380       break;
381     }
382     case GST_STATE_PAUSED_TO_PLAYING:
383       if (element->clock) {
384         GstClockTime start_time = gst_clock_get_time (element->clock);
385
386         element->base_time = start_time -
387             pipeline->stream_time + pipeline->delay;
388         GST_DEBUG ("stream_time=%" GST_TIME_FORMAT ", start_time=%"
389             GST_TIME_FORMAT ", base time %" GST_TIME_FORMAT,
390             GST_TIME_ARGS (pipeline->stream_time),
391             GST_TIME_ARGS (start_time), GST_TIME_ARGS (element->base_time));
392       } else {
393         element->base_time = 0;
394         GST_DEBUG ("no clock, using base time of 0");
395       }
396       break;
397     case GST_STATE_PLAYING_TO_PAUSED:
398     case GST_STATE_PAUSED_TO_READY:
399     case GST_STATE_READY_TO_NULL:
400       break;
401   }
402
403   result = GST_ELEMENT_CLASS (parent_class)->change_state (element);
404
405   switch (transition) {
406     case GST_STATE_READY_TO_PAUSED:
407       pipeline->stream_time = 0;
408       break;
409     case GST_STATE_PAUSED_TO_PLAYING:
410       break;
411     case GST_STATE_PLAYING_TO_PAUSED:
412       if (element->clock) {
413         GstClockTime now;
414
415         now = gst_clock_get_time (element->clock);
416         pipeline->stream_time = now - element->base_time;
417         GST_DEBUG ("stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
418             ", base time %" GST_TIME_FORMAT,
419             GST_TIME_ARGS (pipeline->stream_time),
420             GST_TIME_ARGS (now), GST_TIME_ARGS (element->base_time));
421       }
422       break;
423     case GST_STATE_PAUSED_TO_READY:
424       break;
425     case GST_STATE_READY_TO_NULL:
426       if (element->bus) {
427         gst_bus_set_flushing (element->bus, TRUE);
428       }
429       break;
430   }
431
432   /* we wait for async state changes ourselves when we are in an
433    * intermediate state.
434    * FIXME this can block forever, better do this in a worker
435    * thread or use a timeout? */
436   if (result == GST_STATE_ASYNC &&
437       (GST_STATE_FINAL (pipeline) != GST_STATE_PENDING (pipeline))) {
438     GTimeVal *timeval, timeout;
439
440     GST_STATE_UNLOCK (pipeline);
441
442     GST_LOCK (pipeline);
443     if (pipeline->play_timeout > 0) {
444       GST_TIME_TO_TIMEVAL (pipeline->play_timeout, timeout);
445       timeval = &timeout;
446     } else {
447       timeval = NULL;
448     }
449     GST_UNLOCK (pipeline);
450
451     result = gst_element_get_state (element, NULL, NULL, timeval);
452     GST_STATE_LOCK (pipeline);
453   }
454
455   return result;
456 }
457
458 /**
459  * gst_pipeline_get_scheduler:
460  * @pipeline: the pipeline
461  *
462  * Gets the #GstScheduler of this pipeline.
463  *
464  * Returns: a GstScheduler.
465  *
466  * MT safe.
467  */
468 GstScheduler *
469 gst_pipeline_get_scheduler (GstPipeline * pipeline)
470 {
471   return gst_element_get_scheduler (GST_ELEMENT (pipeline));
472 }
473
474 /**
475  * gst_pipeline_get_bus:
476  * @pipeline: the pipeline
477  *
478  * Gets the #GstBus of this pipeline.
479  *
480  * Returns: a GstBus
481  *
482  * MT safe.
483  */
484 GstBus *
485 gst_pipeline_get_bus (GstPipeline * pipeline)
486 {
487   return gst_element_get_bus (GST_ELEMENT (pipeline));
488 }
489
490 static GstClock *
491 gst_pipeline_get_clock_func (GstElement * element)
492 {
493   GstClock *clock = NULL;
494   GstPipeline *pipeline = GST_PIPELINE (element);
495
496   /* if we have a fixed clock, use that one */
497   GST_LOCK (pipeline);
498   if (GST_FLAG_IS_SET (pipeline, GST_PIPELINE_FLAG_FIXED_CLOCK)) {
499     clock = pipeline->fixed_clock;
500     gst_object_ref (GST_OBJECT (clock));
501     GST_UNLOCK (pipeline);
502
503     GST_CAT_DEBUG (GST_CAT_CLOCK, "pipeline using fixed clock %p (%s)",
504         clock, clock ? GST_STR_NULL (GST_OBJECT_NAME (clock)) : "-");
505   } else {
506     GST_UNLOCK (pipeline);
507     clock =
508         GST_ELEMENT_CLASS (parent_class)->get_clock (GST_ELEMENT (pipeline));
509     /* no clock, use a system clock */
510     if (!clock) {
511       clock = gst_system_clock_obtain ();
512       /* we unref since this function is not supposed to increase refcount
513        * of clock object returned; this is ok since the systemclock always
514        * has a refcount of at least one in the current code. */
515       gst_object_unref (GST_OBJECT (clock));
516       GST_CAT_DEBUG (GST_CAT_CLOCK, "pipeline obtained system clock: %p (%s)",
517           clock, clock ? GST_STR_NULL (GST_OBJECT_NAME (clock)) : "-");
518     } else {
519       GST_CAT_DEBUG (GST_CAT_CLOCK, "pipeline obtained clock: %p (%s)",
520           clock, clock ? GST_STR_NULL (GST_OBJECT_NAME (clock)) : "-");
521     }
522   }
523   return clock;
524 }
525
526 /**
527  * gst_pipeline_get_clock:
528  * @pipeline: the pipeline
529  *
530  * Gets the current clock used by the pipeline.
531  *
532  * Returns: a GstClock
533  */
534 GstClock *
535 gst_pipeline_get_clock (GstPipeline * pipeline)
536 {
537   g_return_val_if_fail (GST_IS_PIPELINE (pipeline), NULL);
538
539   return gst_pipeline_get_clock_func (GST_ELEMENT (pipeline));
540 }
541
542
543 /**
544  * gst_pipeline_use_clock:
545  * @pipeline: the pipeline
546  * @clock: the clock to use
547  *
548  * Force the pipeline to use the given clock. The pipeline will
549  * always use the given clock even if new clock providers are added
550  * to this pipeline.
551  *
552  * MT safe.
553  */
554 void
555 gst_pipeline_use_clock (GstPipeline * pipeline, GstClock * clock)
556 {
557   g_return_if_fail (GST_IS_PIPELINE (pipeline));
558
559   GST_LOCK (pipeline);
560   GST_FLAG_SET (pipeline, GST_PIPELINE_FLAG_FIXED_CLOCK);
561
562   gst_object_replace ((GstObject **) & pipeline->fixed_clock,
563       (GstObject *) clock);
564   GST_UNLOCK (pipeline);
565
566   GST_CAT_DEBUG (GST_CAT_CLOCK, "pipeline using fixed clock %p (%s)", clock,
567       (clock ? GST_OBJECT_NAME (clock) : "nil"));
568 }
569
570 /**
571  * gst_pipeline_set_clock:
572  * @pipeline: the pipeline
573  * @clock: the clock to set
574  *
575  * Set the clock for the pipeline. The clock will be distributed
576  * to all the elements managed by the pipeline.
577  *
578  * MT safe.
579  */
580 void
581 gst_pipeline_set_clock (GstPipeline * pipeline, GstClock * clock)
582 {
583   g_return_if_fail (pipeline != NULL);
584   g_return_if_fail (GST_IS_PIPELINE (pipeline));
585
586   GST_ELEMENT_CLASS (parent_class)->set_clock (GST_ELEMENT (pipeline), clock);
587 }
588
589 /**
590  * gst_pipeline_auto_clock:
591  * @pipeline: the pipeline
592  *
593  * Let the pipeline select a clock automatically.
594  *
595  * MT safe.
596  */
597 void
598 gst_pipeline_auto_clock (GstPipeline * pipeline)
599 {
600   g_return_if_fail (pipeline != NULL);
601   g_return_if_fail (GST_IS_PIPELINE (pipeline));
602
603   GST_LOCK (pipeline);
604   GST_FLAG_UNSET (pipeline, GST_PIPELINE_FLAG_FIXED_CLOCK);
605
606   gst_object_replace ((GstObject **) & pipeline->fixed_clock, NULL);
607   GST_UNLOCK (pipeline);
608
609   GST_CAT_DEBUG (GST_CAT_CLOCK, "pipeline using automatic clock");
610 }