socketsrc: handle GstNetworkMessage events
authorWim Taymans <wtaymans@redhat.com>
Fri, 4 Dec 2015 10:17:37 +0000 (11:17 +0100)
committerWim Taymans <wtaymans@redhat.com>
Thu, 10 Dec 2015 11:44:42 +0000 (12:44 +0100)
Add a property to handle GstNetworkMessage events. These events contain
a buffer that is sent on the socket to allow for simple bidirectional
communication.

gst/tcp/gstsocketsrc.c
gst/tcp/gstsocketsrc.h

index 149ced1..bb85402 100644 (file)
@@ -65,11 +65,14 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS_ANY);
 
 
+#define DEFAULT_SEND_MESSAGES FALSE
+
 enum
 {
   PROP_0,
   PROP_SOCKET,
   PROP_CAPS,
+  PROP_SEND_MESSAGES
 };
 
 enum
@@ -87,6 +90,7 @@ G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
 static void gst_socket_src_finalize (GObject * gobject);
 
 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
+static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
     GstBuffer * outbuf);
 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
@@ -126,6 +130,24 @@ gst_socket_src_class_init (GstSocketSrcClass * klass)
           "The caps of the source pad", GST_TYPE_CAPS,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstSocketSrc:send-messages:
+   *
+   * Control if the source will handle GstNetworkMessage events.
+   * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
+   *
+   *   "buffer", GST_TYPE_BUFFER    : the buffer with data to send
+   *
+   * The buffer in the event will be sent on the socket. This allows
+   * for simple bidirectional communication.
+   *
+   * Since: 1.8.0
+   **/
+  g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
+      g_param_spec_boolean ("send-messages", "Send Messages",
+          "If GstNetworkMessage events should be handled",
+          DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
       g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
@@ -140,6 +162,7 @@ gst_socket_src_class_init (GstSocketSrcClass * klass)
       "Thomas Vander Stichele <thomas at apestaart dot org>, "
       "William Manley <will@williammanley.net>");
 
+  gstbasesrc_class->event = gst_socketsrc_event;
   gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
   gstbasesrc_class->unlock = gst_socket_src_unlock;
   gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
@@ -154,6 +177,7 @@ gst_socket_src_init (GstSocketSrc * this)
 {
   this->socket = NULL;
   this->cancellable = g_cancellable_new ();
+  this->send_messages = DEFAULT_SEND_MESSAGES;
 }
 
 static void
@@ -169,6 +193,59 @@ gst_socket_src_finalize (GObject * gobject)
   G_OBJECT_CLASS (parent_class)->finalize (gobject);
 }
 
+static gboolean
+gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
+{
+  GstSocketSrc *src;
+  gboolean res = FALSE;
+
+  src = GST_SOCKET_SRC (bsrc);
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_CUSTOM_UPSTREAM:
+      if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
+        const GstStructure *str = gst_event_get_structure (event);
+        GSocket *socket;
+
+        GST_OBJECT_LOCK (src);
+        if ((socket = src->socket))
+          g_object_ref (socket);
+        GST_OBJECT_UNLOCK (src);
+
+        if (socket) {
+          GstBuffer *buf;
+          GstMapInfo map;
+          GError *err = NULL;
+          gssize ret;
+
+          gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
+
+          if (buf) {
+            gst_buffer_map (buf, &map, GST_MAP_READ);
+            GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
+            ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
+                map.size, FALSE, src->cancellable, &err);
+            gst_buffer_unmap (buf, &map);
+
+            if (ret == -1) {
+              GST_WARNING ("could not send message: %s", err->message);
+              g_clear_error (&err);
+              res = FALSE;
+            } else
+              res = TRUE;
+            gst_buffer_unref (buf);
+          }
+          g_object_unref (socket);
+        }
+      }
+      break;
+    default:
+      res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
+      break;
+  }
+  return res;
+}
+
 static GstCaps *
 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
 {
@@ -344,6 +421,9 @@ gst_socket_src_set_property (GObject * object, guint prop_id,
       gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
       break;
     }
+    case PROP_SEND_MESSAGES:
+      socketsrc->send_messages = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -365,6 +445,9 @@ gst_socket_src_get_property (GObject * object, guint prop_id,
       gst_value_set_caps (value, socketsrc->caps);
       GST_OBJECT_UNLOCK (socketsrc);
       break;
+    case PROP_SEND_MESSAGES:
+      g_value_set_boolean (value, socketsrc->send_messages);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index 9529e12..02f56d6 100644 (file)
@@ -50,6 +50,7 @@ struct _GstSocketSrc {
  /*< private >*/
   GstCaps *caps;
   GSocket *socket;
+  gboolean send_messages;
   GCancellable *cancellable;
 };