};
#define DEFAULT_SEND_DISPATCHED FALSE
+#define DEFAULT_SEND_MESSAGES FALSE
enum
{
PROP_0,
PROP_SEND_DISPATCHED,
+ PROP_SEND_MESSAGES,
PROP_LAST
};
gobject_class->get_property = gst_multi_socket_sink_get_property;
gobject_class->finalize = gst_multi_socket_sink_finalize;
+ /**
+ * GstMultiSocketSink:send-dispatched:
+ *
+ * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
+ * is sent to a client.
+ * The event is a CUSTOM event name GstNetworkMessageDispatched and
+ * contains:
+ *
+ * "object" G_TYPE_OBJECT : the object identifying the client
+ * "buffer" GST_TYPE_BUFFER : the buffer sent to the client
+ *
+ * Since: 1.8.0
+ */
g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
g_param_spec_boolean ("send-dispatched", "Send Dispatched",
"If GstNetworkMessageDispatched events should be pushed",
DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GstMultiSocketSink:send-messages:
+ *
+ * Sends a GstNetworkMessage event upstream whenever a buffer
+ * is received from a client.
+ * The event is a CUSTOM event name GstNetworkMessage and contains:
+ *
+ * "object" G_TYPE_OBJECT : the object identifying the client
+ * "buffer" GST_TYPE_BUFFER : the buffer with data received from the
+ * client
+ *
+ * Since: 1.8.0
+ */
+ g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
+ g_param_spec_boolean ("send-messages", "Send Messages",
+ "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstMultiSocketSink::add:
* @gstmultisocketsink: the multisocketsink element to emit this signal on
* @socket: the socket to add to multisocketsink
this->cancellable = g_cancellable_new ();
this->send_dispatched = DEFAULT_SEND_DISPATCHED;
+ this->send_messages = DEFAULT_SEND_MESSAGES;
}
static void
gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
GstSocketClient * client)
{
- gboolean ret;
- gchar dummy[256];
+ gboolean ret, do_event;
+ gchar dummy[256], *mem, *omem;
gssize nread;
GError *err = NULL;
gboolean first = TRUE;
GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
+ gssize navail, maxmem;
GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
ret = TRUE;
+ navail = g_socket_get_available_bytes (mhclient->handle.socket);
+ if (navail <= 0)
+ return TRUE;
+
+ /* only collect the data in a buffer when we need to send it with an event */
+ do_event = sink->send_messages;
+ if (do_event) {
+ omem = mem = g_malloc (navail);
+ maxmem = navail;
+ } else {
+ mem = dummy;
+ maxmem = sizeof (dummy);
+ }
+
/* just Read 'n' Drop, could also just drop the client as it's not supposed
* to write to us except for closing the socket, I guess it's because we
* like to listen to our customers. */
- do {
- gssize navail;
-
+ while (navail > 0) {
GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
- navail = g_socket_get_available_bytes (mhclient->handle.socket);
- if (navail <= 0)
- break;
-
nread =
- g_socket_receive (mhclient->handle.socket, dummy, MIN (navail,
- sizeof (dummy)), sink->cancellable, &err);
+ g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
+ maxmem), sink->cancellable, &err);
+
if (first && nread == 0) {
/* client sent close, so remove it */
GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
mhclient->debug);
mhclient->status = GST_CLIENT_STATUS_CLOSED;
ret = FALSE;
+ break;
} else if (nread < 0) {
GST_WARNING_OBJECT (sink, "%s could not read: %s",
mhclient->debug, err->message);
ret = FALSE;
break;
}
+ navail -= nread;
+ if (do_event)
+ mem += nread;
first = FALSE;
- } while (nread > 0);
+ }
g_clear_error (&err);
+ if (do_event) {
+ if (ret) {
+ GstBuffer *buf;
+ GstEvent *ev;
+
+ buf = gst_buffer_new_wrapped (omem, maxmem);
+ ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+ gst_structure_new ("GstNetworkMessage",
+ "object", G_TYPE_OBJECT, mhclient->handle.socket,
+ "buffer", GST_TYPE_BUFFER, buf, NULL));
+ gst_buffer_unref (buf);
+
+ gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
+ } else
+ g_free (omem);
+ }
return ret;
}
case PROP_SEND_DISPATCHED:
sink->send_dispatched = g_value_get_boolean (value);
break;
+ case PROP_SEND_MESSAGES:
+ sink->send_messages = g_value_get_boolean (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_SEND_DISPATCHED:
g_value_set_boolean (value, sink->send_dispatched);
break;
+ case PROP_SEND_MESSAGES:
+ g_value_set_boolean (value, sink->send_messages);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;