wpe: Relay messages from WPE internal pipelines
authorThibault Saunier <tsaunier@igalia.com>
Sun, 2 May 2021 01:48:23 +0000 (21:48 -0400)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 19 May 2021 13:41:15 +0000 (13:41 +0000)
It is based on a tracer as it allows us to very easily get
every message that are posted on any bus inside the process.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/2252>

ext/wpe/WPEThreadedView.cpp
ext/wpe/gstwpesrcbin.cpp
ext/wpe/wpe-extension/gstwpebusmsgforwarder.c [new file with mode: 0644]
ext/wpe/wpe-extension/gstwpeextension.c
ext/wpe/wpe-extension/gstwpeextension.h
ext/wpe/wpe-extension/meson.build

index 5dcb216..8f5902d 100644 (file)
@@ -174,6 +174,87 @@ initialize_web_extensions (WebKitWebContext *context)
     webkit_web_context_set_web_extensions_directory (context, gst_wpe_get_extension_path ());
 }
 
+static void
+webkit_extension_gerror_msg_received (GstWpeSrc *src, GVariant *params)
+{
+    GstMessage *message;
+    const gchar *message_type, *src_path, *error_domain, *msg, *debug_str, *details_str;
+    guint32 error_code;
+
+    g_variant_get (params, "(sssusss)",
+       &message_type,
+       &src_path,
+       &error_domain,
+       &error_code,
+       &msg,
+       &debug_str,
+       &details_str
+    );
+
+    GError *error = g_error_new(g_quark_from_string(error_domain), error_code, "%s", msg);
+    GstStructure *details = (details_str[0] != '\0') ? gst_structure_new_from_string(details_str) : NULL;
+    gchar * our_message = g_strdup_printf(
+        "`%s` posted from %s running inside the web page",
+        debug_str, src_path
+    );
+
+    if (!details)
+        details = gst_structure_new_empty("wpesrcdetails");
+    gst_structure_set(details,
+                      "wpesrc_original_src_path", G_TYPE_STRING, src_path,
+                      NULL);
+
+    if (!g_strcmp0(message_type, "error")) {
+        message =
+            gst_message_new_error_with_details(GST_OBJECT(src), error,
+                                               our_message, details);
+    } else if (!g_strcmp0(message_type, "warning")) {
+        message =
+            gst_message_new_warning_with_details(GST_OBJECT(src), error,
+                                                 our_message, details);
+    } else {
+        message =
+            gst_message_new_info_with_details(GST_OBJECT(src), error, our_message, details);
+    }
+
+    g_free (our_message);
+    gst_element_post_message(GST_ELEMENT(src), message);
+    g_error_free(error);
+}
+
+static void
+webkit_extension_bus_message_received (GstWpeSrc *src, GVariant *params)
+{
+    GstStructure *structure;
+    const gchar *message_type, *src_name, *src_type, *src_path, *struct_str;
+
+    g_variant_get (params, "(sssss)",
+       &message_type,
+       &src_name,
+       &src_type,
+       &src_path,
+       &struct_str
+    );
+
+    structure = (struct_str[0] != '\0') ? gst_structure_new_from_string(struct_str) : NULL;
+    if (!structure)
+    {
+        if (struct_str[0] != '\0')
+            GST_ERROR_OBJECT(src, "Could not deserialize: %s", struct_str);
+        structure = gst_structure_new_empty("wpesrc");
+    }
+
+    gst_structure_set(structure,
+                      "wpesrc_original_message_type", G_TYPE_STRING, message_type,
+                      "wpesrc_original_src_name", G_TYPE_STRING, src_name,
+                      "wpesrc_original_src_type", G_TYPE_STRING, src_type,
+                      "wpesrc_original_src_path", G_TYPE_STRING, src_path,
+                      NULL);
+
+    gst_element_post_message(GST_ELEMENT(src), gst_message_new_custom(GST_MESSAGE_ELEMENT,
+                                                                      GST_OBJECT(src), structure));
+}
+
 static gboolean
 webkit_extension_msg_received (WebKitWebContext  *context,
                WebKitUserMessage *message,
@@ -208,6 +289,10 @@ webkit_extension_msg_received (WebKitWebContext  *context,
         guint32 id = g_variant_get_uint32 (params);
 
         gst_wpe_src_stop_audio_stream (src, id);
+    } else if (!g_strcmp0(name, "gstwpe.bus_gerror_message")) {
+        webkit_extension_gerror_msg_received (src, params);
+    } else if (!g_strcmp0(name, "gstwpe.bus_message")) {
+        webkit_extension_bus_message_received (src, params);
     } else {
         res = FALSE;
         g_error("Unknown event: %s", name);
index 2b50aaf..b080b1f 100644 (file)
  *
  * Additionally, any audio stream created by WPE is exposed as "sometimes" audio
  * source pads.
+ *
+ * This source also relays GStreamer bus messages from the GStreamer pipelines
+ * running inside the web pages. Error, warning and info messages are made ours
+ * with the addition of the following fields into the GstMessage details (See
+ * gst_message_parse_error_details(), gst_message_parse_warning_details() and
+ * gst_message_parse_info_details()):
+ *   * `wpesrc_original_src_path`: [Path](gst_object_get_path_string) of the
+ *      original element posting the message
+ *
+ * Other message types are posted as [element custom](gst_message_new_custom)
+ * messages reusing the same GstStructure as the one from the message from the
+ * message posted in the web page with the addition of the following fields:
+ *    * `wpesrc_original_message_type`: Type of the original message from
+ *      gst_message_type_get_name().
+ *    * `wpesrc_original_src_name`: Name of the original element posting the
+ *      message
+ *    * `wpesrc_original_src_type`: Name of the GType of the original element
+ *      posting the message
+ *    * `wpesrc_original_src_path`: [Path](gst_object_get_path_string) of the
+ *      original element positing the message
  */
 
 #include "gstwpesrcbin.h"
diff --git a/ext/wpe/wpe-extension/gstwpebusmsgforwarder.c b/ext/wpe/wpe-extension/gstwpebusmsgforwarder.c
new file mode 100644 (file)
index 0000000..2d82510
--- /dev/null
@@ -0,0 +1,175 @@
+/* Copyright (C) <2021> Thibault Saunier <tsaunier@igalia.com>
+ *
+ * This library is free software; you can redistribute it and/or modify it under the terms of the
+ * GNU Library General Public License as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
+ * even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public License along with this
+ * library; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+#include "gstwpeextension.h"
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+GST_DEBUG_CATEGORY (wpe_bus_msg_forwarder_debug);
+#define GST_CAT_DEFAULT wpe_bus_msg_forwarder_debug
+
+struct _GstWpeBusMsgForwarder
+{
+  GstTracer parent;
+  GCancellable *cancellable;
+};
+
+G_DEFINE_TYPE_WITH_CODE (GstWpeBusMsgForwarder, gst_wpe_bus_msg_forwarder,
+    GST_TYPE_TRACER, GST_DEBUG_CATEGORY_INIT (wpe_bus_msg_forwarder_debug,
+        "wpebusmsgforwarder", 0, "WPE message forwarder"););
+
+static void
+dispose (GObject * object)
+{
+  GstWpeBusMsgForwarder *self = GST_WPE_BUS_MSG_FORWARDER (object);
+
+  g_clear_object (&self->cancellable);
+}
+
+static WebKitUserMessage *
+create_gerror_bus_msg (GstElement * element, GstMessage * message)
+{
+  GError *error;
+  gchar *debug_str, *details_structure, *src_path;
+  const GstStructure *details = NULL;
+
+  if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
+    gst_message_parse_error (message, &error, &debug_str);
+    gst_message_parse_error_details (message, &details);
+  } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING) {
+    gst_message_parse_warning (message, &error, &debug_str);
+    gst_message_parse_warning_details (message, &details);
+  } else {
+    gst_message_parse_info (message, &error, &debug_str);
+    gst_message_parse_info_details (message, &details);
+  }
+
+  details_structure =
+      details ? gst_structure_to_string (details) : g_strdup ("");
+  src_path = gst_object_get_path_string (GST_MESSAGE_SRC (message));
+  return webkit_user_message_new ("gstwpe.bus_gerror_message",
+      /* (message_type, src_path, error_domain, error_code, msg, debug_str, details_structure) */
+      g_variant_new ("(sssusss)",
+          gst_message_type_get_name (GST_MESSAGE_TYPE (message)),
+          src_path,
+          g_quark_to_string (error->domain),
+          error->code, error->message, debug_str, details_structure)
+      );
+  g_free (src_path);
+}
+
+/* Those types can't be deserialized on the receiver
+ * side, so we just ignore them for now */
+#define IS_UNDESERIALIZABLE_TYPE(value) \
+  (g_type_is_a ((G_VALUE_TYPE (value)), G_TYPE_OBJECT)  || \
+   g_type_is_a ((G_VALUE_TYPE (value)), G_TYPE_ERROR) || \
+   g_type_is_a ((G_VALUE_TYPE (value)), GST_TYPE_CONTEXT) || \
+   g_type_is_a ((G_VALUE_TYPE (value)), G_TYPE_POINTER))
+
+static gboolean
+cleanup_structure (GQuark field_id, GValue * value, gpointer self)
+{
+  /* We need soome API in core to make that happen cleanly */
+  if (IS_UNDESERIALIZABLE_TYPE (value)) {
+    return FALSE;
+  }
+
+  if (GST_VALUE_HOLDS_LIST (value)) {
+    gint i;
+
+    for (i = 0; i < gst_value_list_get_size (value); i++) {
+      if (IS_UNDESERIALIZABLE_TYPE (gst_value_list_get_value (value, i)))
+        return FALSE;
+    }
+  }
+
+  if (GST_VALUE_HOLDS_ARRAY (value)) {
+    gint i;
+
+    for (i = 0; i < gst_value_array_get_size (value); i++) {
+      if (IS_UNDESERIALIZABLE_TYPE (gst_value_array_get_value (value, i)))
+        return FALSE;
+    }
+  }
+
+  return TRUE;
+}
+
+static void
+gst_message_post_cb (GObject * object, GstClockTime ts, GstElement * element,
+    GstMessage * message)
+{
+  gchar *str;
+  WebKitUserMessage *msg = NULL;
+  GstStructure *structure;
+  GstWpeBusMsgForwarder *self;
+  const GstStructure *message_struct;
+
+  if (!GST_IS_PIPELINE (element))
+    return;
+
+  self = GST_WPE_BUS_MSG_FORWARDER (object);
+  message_struct = gst_message_get_structure (message);
+  structure = message_struct ? gst_structure_copy (message_struct) : NULL;
+
+  if (structure) {
+    gst_structure_filter_and_map_in_place (structure, cleanup_structure, self);
+    str = gst_structure_to_string (structure);
+  } else {
+    str = g_strdup ("");
+  }
+
+  /* we special case error as gst can't serialize/de-serialize it */
+  if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR
+      || GST_MESSAGE_TYPE (message) == GST_MESSAGE_WARNING
+      || GST_MESSAGE_TYPE (message) == GST_MESSAGE_INFO) {
+    msg = create_gerror_bus_msg (element, message);
+  } else {
+    gchar *src_path = gst_object_get_path_string (GST_MESSAGE_SRC (message));
+    msg = webkit_user_message_new ("gstwpe.bus_message",
+        g_variant_new ("(sssss)",
+            gst_message_type_get_name (GST_MESSAGE_TYPE (message)),
+            GST_MESSAGE_SRC_NAME (message),
+            G_OBJECT_TYPE_NAME (GST_MESSAGE_SRC (message)), src_path, str));
+    g_free (src_path);
+  }
+  if (msg)
+    gst_wpe_extension_send_message (msg, self->cancellable, NULL, NULL);
+  g_free (str);
+}
+
+static void
+constructed (GObject * object)
+{
+  GstTracer *tracer = GST_TRACER (object);
+  gst_tracing_register_hook (tracer, "element-post-message-pre",
+      G_CALLBACK (gst_message_post_cb));
+}
+
+static void
+gst_wpe_bus_msg_forwarder_init (GstWpeBusMsgForwarder * self)
+{
+  self->cancellable = g_cancellable_new ();
+}
+
+static void
+gst_wpe_bus_msg_forwarder_class_init (GstWpeBusMsgForwarderClass * klass)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+  object_class->dispose = dispose;
+  object_class->constructed = constructed;
+}
index f73a7e5..7bd7d72 100644 (file)
@@ -45,6 +45,7 @@ webkit_web_extension_initialize (WebKitWebExtension * extension)
   /* Register our own audio sink to */
   gst_element_register (NULL, "gstwpeaudiosink", GST_RANK_PRIMARY + 500,
       gst_wpe_audio_sink_get_type ());
+  gst_object_unref (g_object_new (gst_wpe_bus_msg_forwarder_get_type (), NULL));
 
   GST_INFO ("Mark processus as WebProcess");
   if (!g_setenv ("GST_WPE_ID", "1", TRUE))
index 9f3ec57..3cd62cd 100644 (file)
@@ -22,4 +22,5 @@
 
 void gst_wpe_extension_send_message (WebKitUserMessage *msg, GCancellable *cancellable, GAsyncReadyCallback cb, gpointer udata);
 
-G_DECLARE_FINAL_TYPE (GstWpeAudioSink, gst_wpe_audio_sink, GST, WPE_AUDIO_SINK, GstBaseSink);
\ No newline at end of file
+G_DECLARE_FINAL_TYPE (GstWpeAudioSink, gst_wpe_audio_sink, GST, WPE_AUDIO_SINK, GstBaseSink);
+G_DECLARE_FINAL_TYPE (GstWpeBusMsgForwarder, gst_wpe_bus_msg_forwarder, GST, WPE_BUS_MSG_FORWARDER, GstTracer);
\ No newline at end of file
index 9e159c7..7bf98d6 100644 (file)
@@ -1,5 +1,5 @@
 library('gstwpeextension',
-  ['gstwpeextension.c', 'gstwpeaudiosink.c'],
+  ['gstwpeextension.c', 'gstwpeaudiosink.c', 'gstwpebusmsgforwarder.c'],
   dependencies : [wpe_dep, gst_dep, gstbase_dep, giounix_dep],
   c_args : ['-DHAVE_CONFIG_H=1'],
   include_directories : [configinc],