/* this is really arbitrarily chosen */
-#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE
#define DEFAULT_MODE 1
#define DEFAULT_BUFFERS_MAX -1
#define DEFAULT_BUFFERS_SOFT_MAX -1
enum
{
PROP_0,
- PROP_PROTOCOL,
PROP_MODE,
PROP_BUFFERS_QUEUED,
PROP_BYTES_QUEUED,
gobject_class->get_property = gst_multi_fd_sink_get_property;
gobject_class->finalize = gst_multi_fd_sink_finalize;
- g_object_class_install_property (gobject_class, PROP_PROTOCOL,
- g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in"
- ". GDP protocol here is deprecated. Please use gdppay element.",
- GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
/**
* GstMultiFdSink::mode
*
{
GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN);
- this->protocol = DEFAULT_PROTOCOL;
this->mode = DEFAULT_MODE;
CLIENTS_LOCK_INIT (this);
}
}
-/* Queue raw data for this client, creating a new buffer.
- * This takes ownership of the data by
- * setting it as GST_BUFFER_MALLOCDATA() on the created buffer so
- * be sure to pass g_free()-able @data.
- */
-static gboolean
-gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink,
- GstTCPClient * client, gchar * data, gint len)
-{
- GstBuffer *buf;
-
- buf = gst_buffer_new ();
- GST_BUFFER_DATA (buf) = (guint8 *) data;
- GST_BUFFER_MALLOCDATA (buf) = (guint8 *) data;
- GST_BUFFER_SIZE (buf) = len;
-
- GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d",
- client->fd.fd, len);
-
- client->sending = g_slist_append (client->sending, buf);
-
- return TRUE;
-}
-
-/* GDP-encode given caps and queue them for sending */
-static gboolean
-gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink,
- GstTCPClient * client, const GstCaps * caps)
-{
- guint8 *header;
- guint8 *payload;
- guint length;
- gchar *string;
-
- g_return_val_if_fail (caps != NULL, FALSE);
-
- string = gst_caps_to_string (caps);
- GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP",
- client->fd.fd, string);
- g_free (string);
-
- if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header,
- &payload)) {
- GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
- return FALSE;
- }
- gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, length);
-
- length = gst_dp_header_payload_length (header);
- gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) payload, length);
-
- return TRUE;
-}
-
static gboolean
is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
{
client->fd.fd, GST_BUFFER_SIZE (buffer));
gst_buffer_ref (buffer);
- if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
- guint8 *header;
- guint len;
-
- if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len,
- &header)) {
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] could not create header, removing client",
- client->fd.fd);
- return FALSE;
- }
- gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header,
- len);
- }
-
client->sending = g_slist_append (client->sending, buffer);
}
}
gst_caps_unref (caps);
caps = NULL;
- /* now we can send the buffer, possibly sending a GDP header first */
- if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
- guint8 *header;
- guint len;
-
- if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) {
- GST_DEBUG_OBJECT (sink,
- "[fd %5d] could not create header, removing client", client->fd.fd);
- return FALSE;
- }
- gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, len);
- }
GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d",
client->fd.fd, GST_BUFFER_SIZE (buffer));
{
int fd = client->fd.fd;
gboolean more;
- gboolean res;
gboolean flushing;
GstClockTime now;
GTimeVal nowtv;
flushing = client->status == GST_CLIENT_STATUS_FLUSHING;
- /* when using GDP, first check if we have queued caps yet */
- if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
- /* don't need to do anything when the client is flushing */
- if (!client->caps_sent && !flushing) {
- GstPad *peer;
- GstCaps *caps;
-
- peer = gst_pad_get_peer (GST_BASE_SINK_PAD (sink));
- if (!peer) {
- GST_WARNING_OBJECT (sink, "pad has no peer");
- return FALSE;
- }
- gst_object_unref (peer);
-
- caps = gst_pad_get_negotiated_caps (GST_BASE_SINK_PAD (sink));
- if (!caps) {
- GST_WARNING_OBJECT (sink, "pad caps not yet negotiated");
- return FALSE;
- }
-
- /* queue caps for sending */
- res = gst_multi_fd_sink_client_queue_caps (sink, client, caps);
-
- gst_caps_unref (caps);
-
- if (!res) {
- GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client");
- return FALSE;
- }
- client->caps_sent = TRUE;
- }
- }
-
more = TRUE;
do {
gint maxsize;
multifdsink = GST_MULTI_FD_SINK (object);
switch (prop_id) {
- case PROP_PROTOCOL:
- multifdsink->protocol = g_value_get_enum (value);
- break;
case PROP_MODE:
multifdsink->mode = g_value_get_enum (value);
break;
multifdsink = GST_MULTI_FD_SINK (object);
switch (prop_id) {
- case PROP_PROTOCOL:
- g_value_set_enum (value, multifdsink->protocol);
- break;
case PROP_MODE:
g_value_set_enum (value, multifdsink->mode);
break;
gboolean discont;
- GstTCPProtocol protocol;
-
gboolean caps_sent;
gboolean new_connection;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps;
- GstTCPProtocol protocol;
guint mtu;
gint qos_dscp;
gboolean handle_read;
return GST_FLOW_ERROR;
}
}
-
-/* write a GDP header to the socket. Return false if fails. */
-gboolean
-gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer,
- gboolean fatal, const gchar * host, int port)
-{
- guint length;
- guint8 *header;
- size_t wrote;
-
- if (!gst_dp_header_from_buffer (buffer, 0, &length, &header))
- goto create_error;
-
- GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
- wrote = gst_tcp_socket_write (socket, header, length);
- g_free (header);
-
- if (wrote != length)
- goto write_error;
-
- return TRUE;
-
- /* ERRORS */
-create_error:
- {
- if (fatal)
- GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
- ("Could not create GDP header from buffer"));
- return FALSE;
- }
-write_error:
- {
- if (fatal)
- GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
- (_("Error while sending data to \"%s:%d\"."), host, port),
- ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s",
- wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
- return FALSE;
- }
-}
-
-/* write GDP header and payload to the given socket for the given caps.
- * Return false if fails. */
-gboolean
-gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
- gboolean fatal, const char *host, int port)
-{
- guint length;
- guint8 *header;
- guint8 *payload;
- size_t wrote;
-
- if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload))
- goto create_error;
-
- GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
- wrote = gst_tcp_socket_write (socket, header, length);
- if (wrote != length)
- goto write_header_error;
-
- length = gst_dp_header_payload_length (header);
- g_free (header);
-
- GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
- wrote = gst_tcp_socket_write (socket, payload, length);
- g_free (payload);
-
- if (wrote != length)
- goto write_payload_error;
-
- return TRUE;
-
- /* ERRORS */
-create_error:
- {
- if (fatal)
- GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
- ("Could not create GDP packet from caps"));
- return FALSE;
- }
-write_header_error:
- {
- g_free (header);
- g_free (payload);
- if (fatal)
- GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
- (_("Error while sending gdp header data to \"%s:%d\"."), host, port),
- ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s",
- wrote, length, g_strerror (errno)));
- return FALSE;
- }
-write_payload_error:
- {
- if (fatal)
- GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
- (_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
- ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s",
- wrote, length, g_strerror (errno)));
- return FALSE;
- }
-}
G_BEGIN_DECLS
-/**
- * GstTCPProtocol:
- * @GST_TCP_PROTOCOL_NONE: Raw data transmission
- * @GST_TCP_PROTOCOL_GDP: #GstBuffers are wrapped and sent/received using the
- * GDP protocol.
- *
- * This enum is provided by the tcp/multifd elements to configure the format of
- * data transmission/reception.
- *
- * The GDP protocol wraps data buffers in a header that also carries format
- * information and timestamps. The None value indicates the data is
- * sent/received as-is. In that case, format information and timestamping
- * must be transmitted separately, or implicit in the bytestream itself.
- */
-typedef enum
-{
- GST_TCP_PROTOCOL_NONE,
- GST_TCP_PROTOCOL_GDP
-} GstTCPProtocol;
-
gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf);
GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps **caps);
-GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, GstPoll * fdset);
-
-gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
-gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
-gboolean gst_tcp_gdp_write_caps (GstElement *elem, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
-
G_END_DECLS
#endif /* __GST_TCP_HELP_H__ */
{
ARG_0,
ARG_HOST,
- ARG_PORT,
- ARG_PROTOCOL
- /* FILL ME */
+ ARG_PORT
};
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
g_param_spec_int ("port", "Port", "The port to send the packets to",
0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, ARG_PROTOCOL,
- g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
- GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state = gst_tcp_client_sink_change_state;
this->port = TCP_DEFAULT_PORT;
this->sock_fd.fd = -1;
- this->protocol = GST_TCP_PROTOCOL_NONE;
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
}
sink = GST_TCP_CLIENT_SINK (bsink);
- /* write the buffer header if we have one */
- switch (sink->protocol) {
- case GST_TCP_PROTOCOL_NONE:
- break;
-
- case GST_TCP_PROTOCOL_GDP:
- /* if we haven't send caps yet, send them first */
- if (!sink->caps_sent) {
- const GstCaps *caps;
- gchar *string;
-
- caps = GST_PAD_CAPS (GST_PAD_PEER (GST_BASE_SINK_PAD (bsink)));
- string = gst_caps_to_string (caps);
- GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
- g_free (string);
-
- if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd.fd,
- caps, TRUE, sink->host, sink->port))
- goto gdp_write_error;
-
- sink->caps_sent = TRUE;
- }
- break;
- default:
- g_warning ("Unhandled protocol type");
- break;
- }
-
return TRUE;
-
- /* ERRORS */
-gdp_write_error:
- {
- return FALSE;
- }
}
static GstFlowReturn
GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", size);
- /* write the buffer header if we have one */
- switch (sink->protocol) {
- case GST_TCP_PROTOCOL_NONE:
- break;
- case GST_TCP_PROTOCOL_GDP:
- GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
- if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd.fd, buf,
- TRUE, sink->host, sink->port))
- goto gdp_write_error;
- break;
- default:
- break;
- }
-
/* write buffer data */
wrote = gst_tcp_socket_write (sink->sock_fd.fd, GST_BUFFER_DATA (buf), size);
return GST_FLOW_OK;
/* ERRORS */
-gdp_write_error:
- {
- return FALSE;
- }
write_error:
{
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
case ARG_PORT:
tcpclientsink->port = g_value_get_int (value);
break;
- case ARG_PROTOCOL:
- tcpclientsink->protocol = g_value_get_enum (value);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
case ARG_PORT:
g_value_set_int (value, tcpclientsink->port);
break;
- case ARG_PROTOCOL:
- g_value_set_enum (value, tcpclientsink->protocol);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstPollFD sock_fd;
size_t data_written; /* how much bytes have we written ? */
- GstTCPProtocol protocol; /* used with the protocol enum */
gboolean caps_sent; /* whether or not we sent caps already */
};
{
PROP_0,
PROP_HOST,
- PROP_PORT,
- PROP_PROTOCOL
+ PROP_PORT
};
g_param_spec_int ("port", "Port", "The port to receive packets from", 0,
TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_PROTOCOL,
- g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
- GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstbasesrc_class->get_caps = gst_tcp_client_src_getcaps;
gstbasesrc_class->start = gst_tcp_client_src_start;
this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
this->sock_fd.fd = -1;
- this->protocol = GST_TCP_PROTOCOL_NONE;
this->caps = NULL;
GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
GST_LOG_OBJECT (src, "asked for a buffer");
- /* read the buffer header if we're using a protocol */
- switch (src->protocol) {
- case GST_TCP_PROTOCOL_NONE:
- ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
- src->fdset, outbuf);
- break;
-
- case GST_TCP_PROTOCOL_GDP:
- /* get the caps if we're using GDP */
- if (!src->caps_received) {
- GstCaps *caps;
-
- GST_DEBUG_OBJECT (src, "getting caps through GDP");
- ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd,
- src->fdset, &caps);
-
- if (ret != GST_FLOW_OK)
- goto no_caps;
-
- src->caps_received = TRUE;
- src->caps = caps;
- }
-
- ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
- src->fdset, outbuf);
- break;
- default:
- /* need to assert as buf == NULL */
- g_assert ("Unhandled protocol type");
- break;
- }
+ /* read the buffer header */
+ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd,
+ src->fdset, outbuf);
if (ret == GST_FLOW_OK) {
GST_LOG_OBJECT (src,
GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
return GST_FLOW_WRONG_STATE;
}
-no_caps:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Could not read caps through GDP"));
- return ret;
- }
}
static void
case PROP_PORT:
tcpclientsrc->port = g_value_get_int (value);
break;
- case PROP_PROTOCOL:
- tcpclientsrc->protocol = g_value_get_enum (value);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
case PROP_PORT:
g_value_set_int (value, tcpclientsrc->port);
break;
- case PROP_PROTOCOL:
- g_value_set_enum (value, tcpclientsrc->protocol);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstPollFD sock_fd;
GstPoll *fdset;
- GstTCPProtocol protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
GstCaps *caps;
};
{
PROP_0,
PROP_HOST,
- PROP_PORT,
- PROP_PROTOCOL
+ PROP_PORT
};
g_param_spec_int ("port", "Port", "The port to listen to",
0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- g_object_class_install_property (gobject_class, PROP_PROTOCOL,
- g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
- GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstbasesrc_class->start = gst_tcp_server_src_start;
gstbasesrc_class->stop = gst_tcp_server_src_stop;
src->host = g_strdup (TCP_DEFAULT_HOST);
src->server_sock_fd.fd = -1;
src->client_sock_fd.fd = -1;
- src->protocol = GST_TCP_PROTOCOL_NONE;
GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
}
GST_LOG_OBJECT (src, "asked for a buffer");
- switch (src->protocol) {
- case GST_TCP_PROTOCOL_NONE:
- ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
- src->fdset, outbuf);
- break;
-
- case GST_TCP_PROTOCOL_GDP:
- if (!src->caps_received) {
- GstCaps *caps;
- gchar *string;
-
- ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd.fd,
- src->fdset, &caps);
-
- if (ret == GST_FLOW_WRONG_STATE)
- goto gdp_cancelled;
-
- if (ret != GST_FLOW_OK)
- goto gdp_caps_read_error;
-
- src->caps_received = TRUE;
- string = gst_caps_to_string (caps);
- GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
- g_free (string);
-
- gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps);
- }
-
- ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
- src->fdset, outbuf);
-
- if (ret == GST_FLOW_OK)
- gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
-
- break;
-
- default:
- /* need to assert as buf == NULL */
- g_assert ("Unhandled protocol type");
- break;
- }
+ ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd,
+ src->fdset, outbuf);
if (ret == GST_FLOW_OK) {
GST_LOG_OBJECT (src,
("Could not accept client on server socket: %s", g_strerror (errno)));
return GST_FLOW_ERROR;
}
-gdp_cancelled:
- {
- GST_DEBUG_OBJECT (src, "reading gdp canceled");
- return GST_FLOW_WRONG_STATE;
- }
-gdp_caps_read_error:
- {
- /* if we did not get canceled, report an error */
- if (ret != GST_FLOW_WRONG_STATE) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Could not read caps through GDP"));
- }
- return ret;
- }
}
static void
case PROP_PORT:
tcpserversrc->server_port = g_value_get_int (value);
break;
- case PROP_PROTOCOL:
- tcpserversrc->protocol = g_value_get_enum (value);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
case PROP_PORT:
g_value_set_int (value, tcpserversrc->server_port);
break;
- case PROP_PROTOCOL:
- g_value_set_enum (value, tcpserversrc->protocol);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstPoll *fdset;
- GstTCPProtocol protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
};