Handle media bus messages
authorWim Taymans <wim.taymans@collabora.co.uk>
Fri, 13 Feb 2009 15:39:36 +0000 (16:39 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 13 Feb 2009 15:39:36 +0000 (16:39 +0100)
Handle media bus messages in a custom mainloop and dispatch them to the
RTSPMedia objects. Let the default implementation handle some common messages.

gst/rtsp-server/rtsp-media-factory.h
gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-media.h

index f44192c..ef3f6e8 100644 (file)
@@ -77,7 +77,6 @@ struct _GstRTSPMediaFactory {
  *       pay%d to create the streams.
  * @configure: configure the media created with @construct. The default
  *       implementation will configure the 'shared' property of the media.
- * @handle_message: Handle a bus message for @media created from @factory.
  *
  * The #GstRTSPMediaFactory class structure.
  */
@@ -89,9 +88,6 @@ struct _GstRTSPMediaFactoryClass {
   GstElement *      (*get_element)    (GstRTSPMediaFactory *factory, const GstRTSPUrl *url);
   GstRTSPMedia *    (*construct)      (GstRTSPMediaFactory *factory, const GstRTSPUrl *url);
   void              (*configure)      (GstRTSPMediaFactory *factory, GstRTSPMedia *media);
-
-  void              (*handle_message) (GstRTSPMediaFactory *factory, GstRTSPMedia *media, 
-                                       GstMessage *message);
 };
 
 GType                 gst_rtsp_media_factory_get_type     (void);
index d0f8ba0..2768a80 100644 (file)
@@ -34,12 +34,16 @@ static void gst_rtsp_media_set_property (GObject *object, guint propid,
     const GValue *value, GParamSpec *pspec);
 static void gst_rtsp_media_finalize (GObject * obj);
 
+static gpointer do_loop (GstRTSPMediaClass *klass);
+static gboolean default_handle_message (GstRTSPMedia *media, GstMessage *message);
+
 G_DEFINE_TYPE (GstRTSPMedia, gst_rtsp_media, G_TYPE_OBJECT);
 
 static void
 gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
 {
   GObjectClass *gobject_class;
+  GError *error = NULL;
 
   gobject_class = G_OBJECT_CLASS (klass);
 
@@ -50,6 +54,15 @@ gst_rtsp_media_class_init (GstRTSPMediaClass * klass)
   g_object_class_install_property (gobject_class, PROP_SHARED,
       g_param_spec_boolean ("shared", "Shared", "If this media pipeline can be shared",
           DEFAULT_SHARED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  klass->context = g_main_context_new ();
+  klass->loop = g_main_loop_new (klass->context, TRUE);
+
+  klass->thread = g_thread_create ((GThreadFunc) do_loop, klass, TRUE, &error);
+  if (error != NULL) {
+    g_critical ("could not start bus thread: %s", error->message);
+  }
+  klass->handle_message = default_handle_message;
 }
 
 static void
@@ -57,6 +70,8 @@ gst_rtsp_media_init (GstRTSPMedia * media)
 {
   media->streams = g_array_new (FALSE, TRUE, sizeof (GstRTSPMediaStream *));
   media->complete = FALSE;
+  media->is_live = FALSE;
+  media->buffering = FALSE;
 }
 
 static void
@@ -87,6 +102,11 @@ gst_rtsp_media_finalize (GObject * obj)
   }
   g_array_free (media->streams, TRUE);
 
+  if (media->source) {
+    g_source_destroy (media->source);
+    g_source_unref (media->source);
+  }
+
   if (media->pipeline)
     gst_object_unref (media->pipeline);
 
@@ -123,6 +143,16 @@ gst_rtsp_media_set_property (GObject *object, guint propid,
   }
 }
 
+static gpointer
+do_loop (GstRTSPMediaClass *klass)
+{
+  g_message ("enter mainloop");
+  g_main_loop_run (klass->loop);
+  g_message ("exit mainloop");
+
+  return NULL;
+}
+
 /**
  * gst_rtsp_media_new:
  *
@@ -450,6 +480,23 @@ setup_stream (GstRTSPMediaStream *stream, guint idx, GstRTSPMedia *media)
 }
 
 static void
+unlock_streams (GstRTSPMedia *media)
+{
+  guint i, n_streams;
+
+  /* unlock the udp src elements */
+  n_streams = gst_rtsp_media_n_streams (media);
+  for (i = 0; i < n_streams; i++) {
+    GstRTSPMediaStream *stream;
+
+    stream = gst_rtsp_media_get_stream (media, i);
+
+    gst_element_set_locked_state (stream->udpsrc[0], FALSE);
+    gst_element_set_locked_state (stream->udpsrc[1], FALSE);
+  }
+}
+
+static void
 collect_media_stats (GstRTSPMedia *media)
 {
   GstFormat format;
@@ -474,6 +521,92 @@ collect_media_stats (GstRTSPMedia *media)
   }
 }
 
+static gboolean
+default_handle_message (GstRTSPMedia *media, GstMessage *message)
+{
+  GstMessageType type;
+
+  type = GST_MESSAGE_TYPE (message);
+
+  switch (type) {
+    case GST_MESSAGE_STATE_CHANGED:
+      break;
+    case GST_MESSAGE_BUFFERING:
+    {
+      gint percent;
+
+      gst_message_parse_buffering (message, &percent);
+
+      /* no state management needed for live pipelines */
+      if (media->is_live)
+        break;
+
+      if (percent == 100) {
+        /* a 100% message means buffering is done */
+        media->buffering = FALSE;
+        /* if the desired state is playing, go back */
+        if (media->target_state == GST_STATE_PLAYING) {
+          g_message ("Buffering done, setting pipeline to PLAYING");
+          gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
+        }
+       else {
+          g_message ("Buffering done");
+       }
+      } else {
+        /* buffering busy */
+        if (media->buffering == FALSE) {
+         if (media->target_state == GST_STATE_PLAYING) {
+            /* we were not buffering but PLAYING, PAUSE  the pipeline. */
+            g_message ("Buffering, setting pipeline to PAUSED ...");
+            gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
+         }
+         else {
+            g_message ("Buffering ...");
+         }
+        }
+        media->buffering = TRUE;
+      }
+      break;
+    }
+    case GST_MESSAGE_LATENCY:
+    {
+      gst_bin_recalculate_latency (GST_BIN_CAST (media->pipeline));
+      break;
+    }
+    case GST_MESSAGE_ERROR:
+    {
+      GError *gerror;
+      gchar *debug;
+
+      gst_message_parse_error (message, &gerror, &debug);
+      g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
+      g_error_free (gerror);
+      g_free (debug);
+      break;
+    }
+    default:
+      g_message ("%p: got message type %s", media, gst_message_type_get_name (type));
+      break;
+  }
+  return TRUE;
+}
+
+static gboolean
+bus_message (GstBus *bus, GstMessage *message, GstRTSPMedia *media)
+{
+  GstRTSPMediaClass *klass;
+  gboolean ret;
+  
+  klass = GST_RTSP_MEDIA_GET_CLASS (media);
+
+  if (klass->handle_message)
+    ret = klass->handle_message (media, message);
+  else
+    ret = FALSE;
+
+  return ret;
+}
+
 /**
  * gst_rtsp_media_prepare:
  * @obj: a #GstRTSPMedia
@@ -488,6 +621,8 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
 {
   GstStateChangeReturn ret;
   guint i, n_streams;
+  GstRTSPMediaClass *klass;
+  GstBus *bus;
 
   if (media->prepared)
     goto was_prepared;
@@ -495,6 +630,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
   g_message ("preparing media %p", media);
 
   media->pipeline = gst_pipeline_new ("media-pipeline");
+  bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
 
   gst_bin_add (GST_BIN_CAST (media->pipeline), media->element);
 
@@ -515,6 +651,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
 
   /* first go to PAUSED */
   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
+  media->target_state = GST_STATE_PAUSED;
 
   switch (ret) {
     case GST_STATE_CHANGE_SUCCESS:
@@ -524,6 +661,7 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
     case GST_STATE_CHANGE_NO_PREROLL:
       /* we need to go to PLAYING */
       g_message ("live media %p", media);
+      media->is_live = TRUE;
       ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
       break;
     case GST_STATE_CHANGE_FAILURE:
@@ -539,18 +677,21 @@ gst_rtsp_media_prepare (GstRTSPMedia *media)
   /* collect stats about the media */
   collect_media_stats (media);
 
-  /* unlock the udp src elements */
-  n_streams = gst_rtsp_media_n_streams (media);
-  for (i = 0; i < n_streams; i++) {
-    GstRTSPMediaStream *stream;
+  /* unlock the streams so that they follow the state changes from now on */
+  unlock_streams (media);
 
-    stream = gst_rtsp_media_get_stream (media, i);
+  g_message ("object %p is prerolled", media);
 
-    gst_element_set_locked_state (stream->udpsrc[0], FALSE);
-    gst_element_set_locked_state (stream->udpsrc[1], FALSE);
-  }
+  /* add the pipeline bus to our custom mainloop */
+  bus = gst_pipeline_get_bus (GST_PIPELINE_CAST (media->pipeline));
+  media->source = gst_bus_create_watch (bus);
+  gst_object_unref (bus);
+
+  g_source_set_callback (media->source, (GSourceFunc) bus_message, media, NULL);
+
+  klass = GST_RTSP_MEDIA_GET_CLASS (media);
+  media->id = g_source_attach (media->source, klass->context);
 
-  g_message ("object %p is prerolled", media);
   media->prepared = TRUE;
 
   return TRUE;
@@ -563,7 +704,33 @@ was_prepared:
   /* ERRORS */
 state_failed:
   {
+    GstMessage *message;
+
     g_message ("state change failed for media %p", media);
+    while ((message = gst_bus_pop (bus))) {
+      GstMessageType type;
+
+      type = GST_MESSAGE_TYPE (message);
+      switch (type) {
+        case GST_MESSAGE_ERROR:
+        {
+          GError *gerror;
+          gchar *debug;
+
+          gst_message_parse_error (message, &gerror, &debug);
+          g_warning ("%p: got error %s (%s)", media, gerror->message, debug);
+          g_error_free (gerror);
+          g_free (debug);
+          break;
+       }
+       default:
+         break;
+      }
+      gst_message_unref (message);
+    }
+    unlock_streams (media);
+    gst_element_set_state (media->pipeline, GST_STATE_NULL);
+    gst_object_unref (bus);
     return FALSE;
   }
 }
@@ -611,6 +778,7 @@ gst_rtsp_media_play (GstRTSPMedia *media, GArray *transports)
   }
 
   g_message ("playing");
+  media->target_state = GST_STATE_PLAYING;
   ret = gst_element_set_state (media->pipeline, GST_STATE_PLAYING);
 
   return TRUE;
@@ -659,6 +827,7 @@ gst_rtsp_media_pause (GstRTSPMedia *media, GArray *transports)
   }
 
   g_message ("pause");
+  media->target_state = GST_STATE_PAUSED;
   ret = gst_element_set_state (media->pipeline, GST_STATE_PAUSED);
 
   return TRUE;
@@ -685,6 +854,7 @@ gst_rtsp_media_stop (GstRTSPMedia *media, GArray *transports)
   gst_rtsp_media_pause (media, transports);
 
   g_message ("stop");
+  media->target_state = GST_STATE_NULL;
   ret = gst_element_set_state (media->pipeline, GST_STATE_NULL);
 
   return TRUE;
index c00b165..798ceef 100644 (file)
@@ -124,6 +124,12 @@ struct _GstRTSPMedia {
 
   /* the pipeline for the media */
   GstElement   *pipeline;
+  GSource      *source;
+  guint         id;
+
+  gboolean      is_live;
+  gboolean      buffering;
+  GstState      target_state;
 
   /* RTP session manager */
   GstElement   *rtpbin;
@@ -135,8 +141,24 @@ struct _GstRTSPMedia {
   GstRTSPTimeRange range;
 };
 
+/**
+ * GstRTSPMediaClass:
+ * @context: the main context for dispatching messages
+ * @loop: the mainloop for message.
+ * @thread: the thread dispatching messages.
+ * @handle_message: handle a message
+ *
+ * The RTSP media class
+ */
 struct _GstRTSPMediaClass {
   GObjectClass  parent_class;
+
+  /* thread for the mainloop */
+  GMainContext *context;
+  GMainLoop    *loop;
+  GThread      *thread;
+
+  gboolean     (*handle_message)  (GstRTSPMedia *media, GstMessage *message);
 };
 
 GType                 gst_rtsp_media_get_type         (void);