multisocketsink: add GstNetworkMessage event
authorWim Taymans <wtaymans@redhat.com>
Thu, 10 Dec 2015 11:18:04 +0000 (12:18 +0100)
committerWim Taymans <wtaymans@redhat.com>
Thu, 10 Dec 2015 11:44:42 +0000 (12:44 +0100)
Add a property and logic to send a GstNetworkMessage event containing
the message that was received from a client. This can be used to
implement simply bidirectional communication.

gst/tcp/gstmultisocketsink.c
gst/tcp/gstmultisocketsink.h

index b9f22e0..f9acc13 100644 (file)
@@ -137,11 +137,13 @@ enum
 };
 
 #define DEFAULT_SEND_DISPATCHED FALSE
+#define DEFAULT_SEND_MESSAGES   FALSE
 
 enum
 {
   PROP_0,
   PROP_SEND_DISPATCHED,
+  PROP_SEND_MESSAGES,
   PROP_LAST
 };
 
@@ -223,11 +225,42 @@ gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
   gobject_class->get_property = gst_multi_socket_sink_get_property;
   gobject_class->finalize = gst_multi_socket_sink_finalize;
 
+  /**
+   * GstMultiSocketSink:send-dispatched:
+   *
+   * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
+   * is sent to a client.
+   * The event is a CUSTOM event name GstNetworkMessageDispatched and
+   * contains:
+   *
+   *   "object"  G_TYPE_OBJECT     : the object identifying the client
+   *   "buffer"  GST_TYPE_BUFFER   : the buffer sent to the client
+   *
+   * Since: 1.8.0
+   */
   g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
       g_param_spec_boolean ("send-dispatched", "Send Dispatched",
           "If GstNetworkMessageDispatched events should be pushed",
           DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
+   * GstMultiSocketSink:send-messages:
+   *
+   * Sends a GstNetworkMessage event upstream whenever a buffer
+   * is received from a client.
+   * The event is a CUSTOM event name GstNetworkMessage and contains:
+   *
+   *   "object"  G_TYPE_OBJECT     : the object identifying the client
+   *   "buffer"  GST_TYPE_BUFFER   : the buffer with data received from the
+   *                                 client
+   *
+   * Since: 1.8.0
+   */
+  g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
+      g_param_spec_boolean ("send-messages", "Send Messages",
+          "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
    * GstMultiSocketSink::add:
    * @gstmultisocketsink: the multisocketsink element to emit this signal on
    * @socket:             the socket to add to multisocketsink
@@ -416,6 +449,7 @@ gst_multi_socket_sink_init (GstMultiSocketSink * this)
 
   this->cancellable = g_cancellable_new ();
   this->send_dispatched = DEFAULT_SEND_DISPATCHED;
+  this->send_messages = DEFAULT_SEND_MESSAGES;
 }
 
 static void
@@ -569,38 +603,49 @@ static gboolean
 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
     GstSocketClient * client)
 {
-  gboolean ret;
-  gchar dummy[256];
+  gboolean ret, do_event;
+  gchar dummy[256], *mem, *omem;
   gssize nread;
   GError *err = NULL;
   gboolean first = TRUE;
   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
+  gssize navail, maxmem;
 
   GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
 
   ret = TRUE;
 
+  navail = g_socket_get_available_bytes (mhclient->handle.socket);
+  if (navail <= 0)
+    return TRUE;
+
+  /* only collect the data in a buffer when we need to send it with an event */
+  do_event = sink->send_messages;
+  if (do_event) {
+    omem = mem = g_malloc (navail);
+    maxmem = navail;
+  } else {
+    mem = dummy;
+    maxmem = sizeof (dummy);
+  }
+
   /* just Read 'n' Drop, could also just drop the client as it's not supposed
    * to write to us except for closing the socket, I guess it's because we
    * like to listen to our customers. */
-  do {
-    gssize navail;
-
+  while (navail > 0) {
     GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
 
-    navail = g_socket_get_available_bytes (mhclient->handle.socket);
-    if (navail <= 0)
-      break;
-
     nread =
-        g_socket_receive (mhclient->handle.socket, dummy, MIN (navail,
-            sizeof (dummy)), sink->cancellable, &err);
+        g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
+            maxmem), sink->cancellable, &err);
+
     if (first && nread == 0) {
       /* client sent close, so remove it */
       GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
           mhclient->debug);
       mhclient->status = GST_CLIENT_STATUS_CLOSED;
       ret = FALSE;
+      break;
     } else if (nread < 0) {
       GST_WARNING_OBJECT (sink, "%s could not read: %s",
           mhclient->debug, err->message);
@@ -608,10 +653,29 @@ gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
       ret = FALSE;
       break;
     }
+    navail -= nread;
+    if (do_event)
+      mem += nread;
     first = FALSE;
-  } while (nread > 0);
+  }
   g_clear_error (&err);
 
+  if (do_event) {
+    if (ret) {
+      GstBuffer *buf;
+      GstEvent *ev;
+
+      buf = gst_buffer_new_wrapped (omem, maxmem);
+      ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+          gst_structure_new ("GstNetworkMessage",
+              "object", G_TYPE_OBJECT, mhclient->handle.socket,
+              "buffer", GST_TYPE_BUFFER, buf, NULL));
+      gst_buffer_unref (buf);
+
+      gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
+    } else
+      g_free (omem);
+  }
   return ret;
 }
 
@@ -1114,6 +1178,9 @@ gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
     case PROP_SEND_DISPATCHED:
       sink->send_dispatched = g_value_get_boolean (value);
       break;
+    case PROP_SEND_MESSAGES:
+      sink->send_messages = g_value_get_boolean (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1130,6 +1197,9 @@ gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
     case PROP_SEND_DISPATCHED:
       g_value_set_boolean (value, sink->send_dispatched);
       break;
+    case PROP_SEND_MESSAGES:
+      g_value_set_boolean (value, sink->send_messages);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index c68c9ab..c89844d 100644 (file)
@@ -68,6 +68,7 @@ struct _GstMultiSocketSink {
   /*< private >*/
   GMainContext *main_context;
   GCancellable *cancellable;
+  gboolean send_messages;
   gboolean send_dispatched;
 };