static void gst_pipeline_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
+static GstBusSyncReply pipeline_bus_handler (GstBus * bus, GstMessage * message,
+ GstPipeline * pipeline);
+
static GstClock *gst_pipeline_get_clock_func (GstElement * element);
static GstElementStateReturn gst_pipeline_change_state (GstElement * element);
{
GstScheduler *scheduler;
GstPipeline *pipeline = GST_PIPELINE (instance);
-
- /* pipelines are managing bins */
- GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER);
+ GstBus *bus;
/* get an instance of the default scheduler */
scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (pipeline));
"Are you sure you have a registry ?\n"
"Run gst-register as root if you haven't done so yet.", name);
}
+ bus = g_object_new (gst_bus_get_type (), NULL);
+ gst_bus_set_sync_handler (bus,
+ (GstBusSyncHandler) pipeline_bus_handler, pipeline);
+ pipeline->eosed = NULL;
+ pipeline->delay = DEFAULT_DELAY;
+ pipeline->play_timeout = DEFAULT_PLAY_TIMEOUT;
+ /* we are our own manager */
+ GST_ELEMENT_MANAGER (pipeline) = pipeline;
+ gst_element_set_bus (GST_ELEMENT (pipeline), bus);
+ gst_element_set_scheduler (GST_ELEMENT (pipeline), scheduler);
}
static void
GST_UNLOCK (pipeline);
}
+static gboolean
+is_eos (GstPipeline * pipeline)
+{
+ GstIterator *sinks;
+ gboolean result = TRUE;
+ gboolean done = FALSE;
+
+ sinks = gst_bin_iterate_sinks (GST_BIN (pipeline));
+ while (!done) {
+ gpointer data;
+
+ switch (gst_iterator_next (sinks, &data)) {
+ case GST_ITERATOR_OK:
+ {
+ GstElement *element = GST_ELEMENT (data);
+ GList *eosed;
+ GstElementState state, pending;
+ GstElementStateReturn complete;
+ gchar *name;
+
+ complete = gst_element_get_state (element, &state, &pending, NULL);
+ name = gst_element_get_name (element);
+
+ if (complete == GST_STATE_ASYNC) {
+ GST_DEBUG ("element %s still performing state change", name);
+ result = FALSE;
+ done = TRUE;
+ goto done;
+ } else if (state != GST_STATE_PLAYING) {
+ GST_DEBUG ("element %s not playing %d %d", name, state, pending);
+ goto done;
+ }
+ eosed = g_list_find (pipeline->eosed, element);
+ if (!eosed) {
+ result = FALSE;
+ done = TRUE;
+ }
+ done:
+ g_free (name);
+ gst_object_unref (GST_OBJECT (element));
+ break;
+ }
+ case GST_ITERATOR_RESYNC:
+ result = TRUE;
+ gst_iterator_resync (sinks);
+ break;
+ case GST_ITERATOR_DONE:
+ done = TRUE;
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
+ }
+ }
+ gst_iterator_free (sinks);
+ return result;
+}
+
+/* FIXME, make me threadsafe */
+static GstBusSyncReply
+pipeline_bus_handler (GstBus * bus, GstMessage * message,
+ GstPipeline * pipeline)
+{
+ GstBusSyncReply result = GST_BUS_PASS;
+ gboolean posteos = FALSE;
+
+ /* we don't want messages from the streaming thread while we're doing the
+ * state change. We do want them from the state change functions. */
+
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_EOS:
+ if (GST_MESSAGE_SRC (message) != GST_OBJECT (pipeline)) {
+ GST_LOCK (bus);
+ pipeline->eosed =
+ g_list_prepend (pipeline->eosed, GST_MESSAGE_SRC (message));
+ GST_UNLOCK (bus);
+ if (is_eos (pipeline)) {
+ posteos = TRUE;
+ }
+ /* we drop all EOS messages */
+ result = GST_BUS_DROP;
+ gst_message_unref (message);
+ }
+ break;
+ case GST_MESSAGE_ERROR:
+ break;
+ default:
+ break;
+ }
+
+ if (posteos) {
+ gst_bus_post (bus, gst_message_new_eos (GST_OBJECT (pipeline)));
+ }
+
+ return result;
+}
+
/**
* gst_pipeline_new:
* @name: name of new pipeline
return gst_element_factory_make ("pipeline", name);
}
+/* MT safe */
static GstElementStateReturn
gst_pipeline_change_state (GstElement * element)
{
- switch (GST_STATE_TRANSITION (element)) {
- case GST_STATE_NULL_TO_READY:
- gst_scheduler_setup (GST_ELEMENT_SCHEDULER (element));
- break;
- case GST_STATE_READY_TO_PAUSED:
- case GST_STATE_PAUSED_TO_PLAYING:
- case GST_STATE_PLAYING_TO_PAUSED:
- case GST_STATE_PAUSED_TO_READY:
- case GST_STATE_READY_TO_NULL:
- break;
- }
+ /* implement me */
+ return GST_STATE_FAILURE;
+}
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+/**
+ * gst_pipeline_get_scheduler:
+ * @pipeline: the pipeline
+ *
+ * Gets the #GstScheduler of this pipeline.
+ *
+ * Returns: a GstScheduler.
+ *
+ * MT safe.
+ */
+GstScheduler *
+gst_pipeline_get_scheduler (GstPipeline * pipeline)
+{
+ return gst_element_get_scheduler (GST_ELEMENT (pipeline));
+}
- return GST_STATE_SUCCESS;
+/**
+ * gst_pipeline_get_bus:
+ * @pipeline: the pipeline
+ *
+ * Gets the #GstBus of this pipeline.
+ *
+ * Returns: a GstBus
+ *
+ * MT safe.
+ */
+GstBus *
+gst_pipeline_get_bus (GstPipeline * pipeline)
+{
+ return gst_element_get_bus (GST_ELEMENT (pipeline));
}
static GstClock *