+2005-07-05 Wim Taymans <wim@fluendo.com>
+
+ * configure.ac:
+ * gst/tcp/Makefile.am:
+ * gst/tcp/README:
+ * gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type),
+ (gst_multifdsink_base_init), (gst_multifdsink_class_init),
+ (gst_multifdsink_init), (gst_multifdsink_remove_client_link),
+ (is_sync_frame), (gst_multifdsink_handle_client_write),
+ (gst_multifdsink_render), (gst_multifdsink_start),
+ (gst_multifdsink_stop), (gst_multifdsink_change_state):
+ * gst/tcp/gstmultifdsink.h:
+ * gst/tcp/gsttcp.c: (gst_tcp_host_to_ip),
+ (gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps),
+ (gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps):
+ * gst/tcp/gsttcp.h:
+ * gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init),
+ (gst_tcpclientsink_init), (gst_tcpclientsink_setcaps),
+ (gst_tcpclientsink_render), (gst_tcpclientsink_start),
+ (gst_tcpclientsink_stop), (gst_tcpclientsink_change_state):
+ * gst/tcp/gsttcpclientsink.h:
+ * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
+ (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
+ (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
+ (gst_tcpclientsrc_create), (gst_tcpclientsrc_start),
+ (gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock):
+ * gst/tcp/gsttcpclientsrc.h:
+ * gst/tcp/gsttcpplugin.c: (plugin_init):
+ * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init):
+ * gst/tcp/gsttcpserversink.h:
+ * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
+ (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
+ (gst_tcpserversrc_init), (gst_tcpserversrc_finalize),
+ (gst_tcpserversrc_create), (gst_tcpserversrc_start),
+ (gst_tcpserversrc_stop):
+ * gst/tcp/gsttcpserversrc.h:
+ * gst/tcp/gsttcpsink.c:
+ * gst/tcp/gsttcpsink.h:
+ * gst/tcp/gsttcpsrc.c:
+ * gst/tcp/gsttcpsrc.h:
+ Ported tcp plugins to 0.9.
+
+
2005-07-05 Andy Wingo <wingo@pobox.com>
* gst/playback/gstplaybasebin.c (fill_buffer):
AC_SUBST(GST_CONTROL_LIBS)
+dnl check for gstreamer-dataprotocol; uninstalled is selected preferentially
+PKG_CHECK_MODULES(GST_GDP, gstreamer-dataprotocol-$GST_MAJORMINOR >= $GST_REQ,
+ HAVE_GST_GDP="yes", HAVE_GST_GDP="no")
+
+if test "x$HAVE_GST_GDP" = "xno"; then
+ AC_MSG_ERROR(no GStreamer Dataprotocol Libs found)
+fi
+
+AC_SUBST(GST_GDP_LIBS)
+
PKG_CHECK_MODULES(GST_BASE, gstreamer-base-$GST_MAJORMINOR >= $GST_REQ,
HAVE_GST_BASE="yes", HAVE_GST_BASE="no")
playback \
sine \
subparse \
+ tcp \
typefind \
videotestsrc \
videorate \
gst/playback/Makefile
gst/sine/Makefile
gst/subparse/Makefile
+gst/tcp/Makefile
gst/typefind/Makefile
gst/videotestsrc/Makefile
gst/videorate/Makefile
libgsttcp_la_SOURCES = \
gsttcpplugin.c \
- gsttcpsrc.c gsttcpsink.c \
gsttcp.c \
gstfdset.c \
gstmultifdsink.c \
# remove ENABLE_NEW when dataprotocol is stable
libgsttcp_la_CFLAGS = $(GST_CFLAGS) -DGST_ENABLE_NEW
libgsttcp_la_LIBADD =
-libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
+libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_GDP_LIBS)
noinst_HEADERS = \
gsttcpplugin.h \
- gsttcpsrc.h gsttcpsink.h \
gsttcp.h \
gstfdset.h \
gstmultifdsink.h \
- tcpserversrc
- tcpserversink
-which are created to replace the old tcpsrc/tcpsink
-
TESTS
-----
Use these tests to test functionality of the various tcp plugins
----
- implement DNS resolution
---------
-
-This is the old documentation for the original tcpsrc/tcpsink elements.
-
-* What is TCP src/sink?
-
-solution, like icecast or realaudio or whatever.
-But the future RTP plugins shall not do the actual transmission/reception
-of packets on the network themselve but the Application developer would be
-encouraged to use either the TCP or the UDP plugins for that. UDP would be
-used mostly but there could be situations where TCP would be the only
-available choice. For example streaming accross firewalls that do not
-allow UDP.
-
-* Shortcomings
-
-Even given our modest ambitions, the current code doesn't handle
-caps negotiation robustly.
-
-* Todo
-
-The caps nego should do bi-directional negotiation.
-
-Perhaps this plugin can be the example of how to do caps negotiation
-via a point-to-point protocol.
-
-12 Sep 2001
-Wim Taymans <wim.taymans@chello.be>
-Joshua N Pritikin <vishnu@pobox.com>
-Zeeshan Ali <zak147@yahoo.com>
"Thomas Vander Stichele <thomas at apestaart dot org>, "
"Wim Taymans <wim@fluendo.com>");
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+ GST_PAD_SINK,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
GST_DEBUG_CATEGORY (multifdsink_debug);
#define GST_CAT_DEFAULT (multifdsink_debug)
static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
GList * link);
-static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
+static GstFlowReturn gst_multifdsink_render (GstBaseSink * bsink,
+ GstBuffer * buf);
static GstElementStateReturn gst_multifdsink_change_state (GstElement *
element);
};
multifdsink_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstMultiFdSink",
+ g_type_register_static (GST_TYPE_BASESINK, "GstMultiFdSink",
&multifdsink_info, 0);
}
return multifdsink_type;
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&sinktemplate));
+
gst_element_class_set_details (element_class, &gst_multifdsink_details);
}
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
+ GstBaseSinkClass *gstbasesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
+ gstbasesink_class = (GstBaseSinkClass *) klass;
+
+ parent_class = g_type_class_ref (GST_TYPE_BASESINK);
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+ gobject_class->set_property = gst_multifdsink_set_property;
+ gobject_class->get_property = gst_multifdsink_get_property;
g_object_class_install_property (gobject_class, ARG_PROTOCOL,
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
- gobject_class->set_property = gst_multifdsink_set_property;
- gobject_class->get_property = gst_multifdsink_get_property;
-
gstelement_class->change_state = gst_multifdsink_change_state;
+ gstbasesink_class->render = gst_multifdsink_render;
+
klass->add = gst_multifdsink_add;
klass->remove = gst_multifdsink_remove;
klass->clear = gst_multifdsink_clear;
static void
gst_multifdsink_init (GstMultiFdSink * this)
{
- /* create the sink pad */
- this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
- gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
- gst_pad_set_chain_function (this->sinkpad, gst_multifdsink_chain);
-
GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
this->protocol = DEFAULT_PROTOCOL;
client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
/* free client buffers */
- g_slist_foreach (client->sending, (GFunc) gst_data_unref, NULL);
+ g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (client->sending);
client->sending = NULL;
static gboolean
is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
{
- if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
+ if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
return FALSE;
- } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
+ } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
return TRUE;
}
return FALSE;
/* when using GDP, first check if we have queued caps yet */
if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
if (!client->caps_sent) {
- const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad));
+ const GstCaps *caps =
+ GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (sink)));
/* queue caps for sending */
res = gst_multifdsink_client_queue_caps (sink, client, caps);
return NULL;
}
-static void
-gst_multifdsink_chain (GstPad * pad, GstData * _data)
+static GstFlowReturn
+gst_multifdsink_render (GstBaseSink * bsink, GstBuffer * buf)
{
- GstBuffer *buf = GST_BUFFER (_data);
GstMultiFdSink *sink;
- g_return_if_fail (pad != NULL);
- g_return_if_fail (GST_IS_PAD (pad));
- g_return_if_fail (buf != NULL);
- sink = GST_MULTIFDSINK (GST_OBJECT_PARENT (pad));
- g_return_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN));
+ sink = GST_MULTIFDSINK (bsink);
- if (GST_IS_EVENT (buf)) {
- g_warning ("FIXME: handle events");
- return;
- }
+ /* since we keep this buffer out of the scope of this method */
+ gst_buffer_ref (buf);
+
+ g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN),
+ GST_FLOW_ERROR);
GST_LOG_OBJECT (sink, "received buffer %p", buf);
/* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
* it means we're getting new streamheader buffers, and we should clear
* the old ones */
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) &&
+ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
sink->previous_buffer_in_caps == FALSE) {
GST_DEBUG_OBJECT (sink,
"receiving new IN_CAPS buffers, clearing old streamheader");
- g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL);
+ g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (sink->streamheader);
sink->streamheader = NULL;
}
* After that we return, since we only send these out when we get
* non IN_CAPS buffers so we properly keep track of clients that got
* streamheaders. */
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
+ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
sink->previous_buffer_in_caps = TRUE;
GST_DEBUG_OBJECT (sink,
"appending IN_CAPS buffer with length %d to streamheader",
GST_BUFFER_SIZE (buf));
sink->streamheader = g_slist_append (sink->streamheader, buf);
- return;
+ return GST_FLOW_OK;
}
sink->previous_buffer_in_caps = FALSE;
gst_multifdsink_queue_buffer (sink, buf);
sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
+
+ return GST_FLOW_OK;
}
static void
/* create a socket for sending to remote machine */
static gboolean
-gst_multifdsink_init_send (GstMultiFdSink * this)
+gst_multifdsink_start (GstBaseSink * bsink)
{
GstMultiFdSinkClass *fclass;
int control_socket[2];
+ GstMultiFdSink *this;
+
+ if (GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
+ return TRUE;
+ this = GST_MULTIFDSINK (bsink);
fclass = GST_MULTIFDSINK_GET_CLASS (this);
GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
this->fdset = gst_fdset_new (this->mode);
- if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) {
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
- GST_ERROR_SYSTEM);
- return FALSE;
- }
+ if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
+ goto socket_pair;
+
READ_SOCKET (this).fd = control_socket[0];
WRITE_SOCKET (this).fd = control_socket[1];
this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread,
this, TRUE, NULL);
+ GST_FLAG_SET (this, GST_MULTIFDSINK_OPEN);
+
return TRUE;
+
+ /* ERRORS */
+socket_pair:
+ {
+ GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
+ GST_ERROR_SYSTEM);
+ return FALSE;
+ }
}
-static void
-gst_multifdsink_close (GstMultiFdSink * this)
+static gboolean
+gst_multifdsink_stop (GstBaseSink * bsink)
{
GstMultiFdSinkClass *fclass;
+ GstMultiFdSink *this;
+ this = GST_MULTIFDSINK (bsink);
fclass = GST_MULTIFDSINK_GET_CLASS (this);
+ if (!GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
+ return TRUE;
+
this->running = FALSE;
SEND_COMMAND (this, CONTROL_STOP);
close (WRITE_SOCKET (this).fd);
if (this->streamheader) {
- g_slist_foreach (this->streamheader, (GFunc) gst_data_unref, NULL);
+ g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
g_slist_free (this->streamheader);
this->streamheader = NULL;
}
gst_fdset_free (this->fdset);
this->fdset = NULL;
}
+ GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
+
+ return TRUE;
}
static GstElementStateReturn
gst_multifdsink_change_state (GstElement * element)
{
GstMultiFdSink *sink;
+ gint transition;
+ GstElementStateReturn ret;
- g_return_val_if_fail (GST_IS_MULTIFDSINK (element), GST_STATE_FAILURE);
sink = GST_MULTIFDSINK (element);
/* we disallow changing the state from the streaming thread */
if (g_thread_self () == sink->thread)
return GST_STATE_FAILURE;
- switch (GST_STATE_TRANSITION (element)) {
+ transition = GST_STATE_TRANSITION (element);
+
+ switch (transition) {
case GST_STATE_NULL_TO_READY:
- if (!GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
- if (!gst_multifdsink_init_send (sink))
- return GST_STATE_FAILURE;
- GST_FLAG_SET (sink, GST_MULTIFDSINK_OPEN);
- }
+ if (!gst_multifdsink_start (GST_BASESINK (sink)))
+ goto start_failed;
break;
case GST_STATE_READY_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
+ default:
+ break;
+ }
+
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+ switch (transition) {
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
break;
case GST_STATE_READY_TO_NULL:
- if (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
- gst_multifdsink_close (GST_MULTIFDSINK (element));
- GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN);
- }
+ gst_multifdsink_stop (GST_BASESINK (sink));
break;
}
+ return ret;
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
- return GST_STATE_SUCCESS;
+ /* ERRORS */
+start_failed:
+ {
+ return GST_STATE_FAILURE;
+ }
}
#ifndef __GST_MULTIFDSINK_H__
#define __GST_MULTIFDSINK_H__
-
#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
#include "gsttcp.h"
#include "gstfdset.h"
} GstTCPClient;
struct _GstMultiFdSink {
- GstElement element;
-
- /* pad */
- GstPad *sinkpad;
+ GstBaseSink element;
guint64 bytes_to_serve; /* how much bytes we must serve */
guint64 bytes_served; /* how much bytes have we served */
};
struct _GstMultiFdSinkClass {
- GstElementClass parent_class;
+ GstBaseSinkClass parent_class;
/* element methods */
void (*add) (GstMultiFdSink *sink, int fd);
void gst_multifdsink_clear (GstMultiFdSink *sink);
GValueArray* gst_multifdsink_get_stats (GstMultiFdSink *sink, int fd);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
+G_END_DECLS
#endif /* __GST_MULTIFDSINK_H__ */
struct in_addr addr;
GST_DEBUG_OBJECT (element, "resolving host %s", host);
+
/* first check if it already is an IP address */
if (inet_aton (host, &addr)) {
ip = g_strdup (host);
goto beach;
}
-
/* FIXME: could do a localhost check here */
/* perform a name lookup */
- hostinfo = gethostbyname (host);
- if (!hostinfo) {
- GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
- ("Could not find IP address for host \"%s\".", host));
- return NULL;
- }
+ if (!(hostinfo = gethostbyname (host)))
+ goto resolve_error;
- if (hostinfo->h_addrtype != AF_INET) {
- GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
- ("host \"%s\" is not an IP host", host));
- return NULL;
- }
+ if (hostinfo->h_addrtype != AF_INET)
+ goto not_ip;
addrs = hostinfo->h_addr_list;
+
/* There could be more than one IP address, but we just return the first */
ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
beach:
GST_DEBUG_OBJECT (element, "resolved to IP %s", ip);
return ip;
+
+resolve_error:
+ {
+ GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
+ ("Could not find IP address for host \"%s\".", host));
+ return NULL;
+ }
+not_ip:
+ {
+ GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
+ ("host \"%s\" is not an IP host", host));
+ return NULL;
+ }
}
/* write buffer to given socket incrementally.
*socket = -1;
}
-/* read the gdp buffer header from the given socket
+/* read a buffer from the given socket
* returns:
- * - a GstData representing a GstBuffer in which data should be read
- * - a GstData representing a GstEvent
+ * - a GstBuffer in which data should be read
* - NULL, indicating a connection close or an error, to be handled with
* EOS
*/
-GstData *
-gst_tcp_gdp_read_header (GstElement * this, int socket)
+GstBuffer *
+gst_tcp_gdp_read_buffer (GstElement * this, int socket)
{
size_t header_length = GST_DP_HEADER_LENGTH;
size_t readsize;
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
- ret = gst_tcp_socket_read (socket, header, readsize);
- /* if we read 0 bytes, and we're blocking, we hit eos */
- if (ret == 0) {
- GST_DEBUG ("blocking read returns 0, returning NULL");
- g_free (header);
- return NULL;
- }
- if (ret < 0) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- g_free (header);
- return NULL;
+ if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
+ goto read_error;
+
+ if (ret != readsize)
+ goto short_read;
+
+ if (!gst_dp_validate_header (header_length, header))
+ goto validate_error;
+
+ GST_LOG_OBJECT (this, "validated buffer packet header");
+
+ buffer = gst_dp_buffer_from_header (header_length, header);
+ g_free (header);
+
+ GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
+
+ return buffer;
+
+ /* ERRORS */
+read_error:
+ {
+ if (ret == 0) {
+ /* if we read 0 bytes, and we're blocking, we hit eos */
+ GST_DEBUG ("blocking read returns 0, returning NULL");
+ g_free (header);
+ return NULL;
+ } else {
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ g_free (header);
+ return NULL;
+ }
}
- if (ret != readsize) {
+short_read:
+ {
+ GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
+ return NULL;
}
- g_assert (ret == readsize);
-
- if (!gst_dp_validate_header (header_length, header)) {
+validate_error:
+ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP buffer packet header does not validate"));
g_free (header);
return NULL;
}
- GST_LOG_OBJECT (this, "validated buffer packet header");
-
- buffer = gst_dp_buffer_from_header (header_length, header);
- g_free (header);
-
- GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
- return GST_DATA (buffer);
}
/* read the GDP caps packet from the given socket
readsize = header_length;
GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
- ret = gst_tcp_socket_read (socket, header, readsize);
- if (ret < 0) {
- g_free (header);
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- return NULL;
- }
- if (ret == 0) {
- GST_WARNING_OBJECT (this, "read returned EOF");
- return NULL;
+ if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
+ goto read_error;
+
+ if (ret != readsize)
+ goto short_read;
+
+ if (!gst_dp_validate_header (header_length, header))
+ goto validate_error;
+
+ readsize = gst_dp_header_payload_length (header);
+ payload = g_malloc (readsize);
+
+ GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
+ if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0)
+ goto socket_read_error;
+
+ if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
+ goto is_not_caps;
+
+ g_assert (ret == readsize);
+
+ if (!gst_dp_validate_payload (readsize, header, payload))
+ goto packet_validate_error;
+
+ caps = gst_dp_caps_from_packet (header_length, header, payload);
+ string = gst_caps_to_string (caps);
+ GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
+ g_free (string);
+
+ g_free (header);
+ g_free (payload);
+
+ return caps;
+
+ /* ERRORS */
+read_error:
+ {
+ if (ret < 0) {
+ g_free (header);
+ GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ return NULL;
+ }
+ if (ret == 0) {
+ GST_WARNING_OBJECT (this, "read returned EOF");
+ return NULL;
+ }
}
- if (ret != readsize) {
+short_read:
+ {
GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
readsize, ret);
return NULL;
}
-
- if (!gst_dp_validate_header (header_length, header)) {
+validate_error:
+ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet header does not validate"));
g_free (header);
return NULL;
}
-
- readsize = gst_dp_header_payload_length (header);
- payload = g_malloc (readsize);
- GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
- ret = gst_tcp_socket_read (socket, payload, readsize);
-
- if (ret < 0) {
+socket_read_error:
+ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
g_free (header);
g_free (payload);
return NULL;
}
- if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) {
+is_not_caps:
+ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("Header read doesn't describe CAPS payload"));
g_free (header);
g_free (payload);
return NULL;
}
- g_assert (ret == readsize);
-
- if (!gst_dp_validate_payload (readsize, header, payload)) {
+packet_validate_error:
+ {
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("GDP caps packet payload does not validate"));
g_free (header);
g_free (payload);
return NULL;
}
-
- caps = gst_dp_caps_from_packet (header_length, header, payload);
- string = gst_caps_to_string (caps);
- GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
- g_free (string);
-
- g_free (header);
- g_free (payload);
-
- return caps;
}
/* write a GDP header to the socket. Return false if fails. */
gboolean
-gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
+gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer,
gboolean fatal, const gchar * host, int port)
{
guint length;
guint8 *header;
size_t wrote;
- if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) {
+ if (!gst_dp_header_from_buffer (buffer, 0, &length, &header))
+ goto create_error;
+
+ GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
+ wrote = gst_tcp_socket_write (socket, header, length);
+ g_free (header);
+
+ if (wrote != length)
+ goto write_error;
+
+ return TRUE;
+
+ /* ERRORS */
+create_error:
+ {
if (fatal)
GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
("Could not create GDP header from buffer"));
return FALSE;
}
-
- GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
- wrote = gst_tcp_socket_write (socket, header, length);
- g_free (header);
- if (wrote != length) {
+write_error:
+ {
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), host, port),
wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
return FALSE;
}
-
- return TRUE;
}
/* write GDP header and payload to the given socket for the given caps.
guint8 *payload;
size_t wrote;
- if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+ if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload))
+ goto create_error;
+
+ GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
+ wrote = gst_tcp_socket_write (socket, header, length);
+ if (wrote != length)
+ goto write_header_error;
+
+ length = gst_dp_header_payload_length (header);
+ g_free (header);
+
+ GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
+ wrote = gst_tcp_socket_write (socket, payload, length);
+ g_free (payload);
+
+ if (wrote != length)
+ goto write_payload_error;
+
+ return TRUE;
+
+ /* ERRORS */
+create_error:
+ {
if (fatal)
GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
("Could not create GDP packet from caps"));
return FALSE;
}
- GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
- wrote = gst_tcp_socket_write (socket, header, length);
- if (wrote != length) {
+write_header_error:
+ {
g_free (header);
g_free (payload);
if (fatal)
wrote, length, g_strerror (errno)));
return FALSE;
}
-
- length = gst_dp_header_payload_length (header);
- g_free (header);
- GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
- wrote = gst_tcp_socket_write (socket, payload, length);
- g_free (payload);
- if (wrote != length) {
+write_payload_error:
+ {
if (fatal)
GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
(_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
wrote, length, g_strerror (errno)));
return FALSE;
}
- return TRUE;
}
void gst_tcp_socket_close (int *socket);
-GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
-GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
+GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket);
+GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket);
+GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket);
-gboolean gst_tcp_gdp_write_header (GstElement *this, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
-gboolean gst_tcp_gdp_write_caps (GstElement *this, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_caps (GstElement *elem, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
G_END_DECLS
static void gst_tcpclientsink_init (GstTCPClientSink * tcpclientsink);
static void gst_tcpclientsink_finalize (GObject * gobject);
-static void gst_tcpclientsink_set_clock (GstElement * element,
- GstClock * clock);
-
-static void gst_tcpclientsink_chain (GstPad * pad, GstData * _data);
+static gboolean gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps);
+static GstFlowReturn gst_tcpclientsink_render (GstBaseSink * bsink,
+ GstBuffer * buf);
static GstElementStateReturn gst_tcpclientsink_change_state (GstElement *
element);
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
+ GstBaseSinkClass *gstbasesink_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
+ gstbasesink_class = (GstBaseSinkClass *) klass;
+
+ parent_class = g_type_class_ref (GST_TYPE_BASESINK);
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+ gobject_class->set_property = gst_tcpclientsink_set_property;
+ gobject_class->get_property = gst_tcpclientsink_get_property;
+ gobject_class->finalize = gst_tcpclientsink_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
- gobject_class->set_property = gst_tcpclientsink_set_property;
- gobject_class->get_property = gst_tcpclientsink_get_property;
- gobject_class->finalize = gst_tcpclientsink_finalize;
gstelement_class->change_state = gst_tcpclientsink_change_state;
- gstelement_class->set_clock = gst_tcpclientsink_set_clock;
-
- GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
-}
-
-static void
-gst_tcpclientsink_set_clock (GstElement * element, GstClock * clock)
-{
- GstTCPClientSink *tcpclientsink;
- tcpclientsink = GST_TCPCLIENTSINK (element);
+ gstbasesink_class->set_caps = gst_tcpclientsink_setcaps;
+ gstbasesink_class->render = gst_tcpclientsink_render;
- tcpclientsink->clock = clock;
+ GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
}
static void
gst_tcpclientsink_init (GstTCPClientSink * this)
{
- /* create the sink pad */
- this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
- gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
- gst_pad_set_chain_function (this->sinkpad, gst_tcpclientsink_chain);
-
this->host = g_strdup (TCP_DEFAULT_HOST);
this->port = TCP_DEFAULT_PORT;
- /* should support as minimum 576 for IPV4 and 1500 for IPV6 */
- /* this->mtu = 1500; */
this->sock_fd = -1;
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
-
- this->clock = NULL;
}
static void
g_free (this->host);
}
-static void
-gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
+static gboolean
+gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps)
{
- size_t wrote = 0;
-
- GstBuffer *buf = GST_BUFFER (_data);
GstTCPClientSink *sink;
- g_return_if_fail (pad != NULL);
- g_return_if_fail (GST_IS_PAD (pad));
- g_return_if_fail (buf != NULL);
- sink = GST_TCPCLIENTSINK (GST_OBJECT_PARENT (pad));
- g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN));
-
- if (GST_IS_EVENT (buf)) {
- g_warning ("FIXME: handl events");
- return;
- }
+ sink = GST_TCPCLIENTSINK (bsink);
/* write the buffer header if we have one */
switch (sink->protocol) {
const GstCaps *caps;
gchar *string;
- caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
+ caps = GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (bsink)));
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
- if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
- TRUE, sink->host, sink->port)) {
- g_free (string);
- return;
- }
g_free (string);
+
+ if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
+ TRUE, sink->host, sink->port))
+ goto gdp_write_error;
+
sink->caps_sent = TRUE;
}
+ break;
+ default:
+ g_warning ("Unhandled protocol type");
+ break;
+ }
+
+ return TRUE;
+
+ /* ERRORS */
+gdp_write_error:
+ {
+ return FALSE;
+ }
+}
+
+static GstFlowReturn
+gst_tcpclientsink_render (GstBaseSink * bsink, GstBuffer * buf)
+{
+ size_t wrote = 0;
+ GstTCPClientSink *sink;
+ gint size;
+
+ sink = GST_TCPCLIENTSINK (bsink);
+
+ g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN),
+ GST_FLOW_WRONG_STATE);
+
+ size = GST_BUFFER_SIZE (buf);
+
+ GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", size);
+
+ /* write the buffer header if we have one */
+ switch (sink->protocol) {
+ case GST_TCP_PROTOCOL_TYPE_NONE:
+ break;
+ case GST_TCP_PROTOCOL_TYPE_GDP:
GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
- if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), sink->sock_fd, buf,
+ if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf,
TRUE, sink->host, sink->port))
- return;
+ goto gdp_write_error;
break;
default:
- g_warning ("Unhandled protocol type");
break;
}
- GST_LOG_OBJECT (sink, "writing %d bytes for buffer data",
- GST_BUFFER_SIZE (buf));
- wrote =
- gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf),
- GST_BUFFER_SIZE (buf));
+ /* write buffer data */
+ wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size);
+
+ if (wrote < size)
+ goto write_error;
- if (wrote < GST_BUFFER_SIZE (buf)) {
+ sink->data_written += wrote;
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+gdp_write_error:
+ {
+ return FALSE;
+ }
+write_error:
+ {
GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
(_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
("Only %d of %d bytes written: %s",
wrote, GST_BUFFER_SIZE (buf), g_strerror (errno)));
+ return GST_FLOW_ERROR;
}
- sink->data_written += wrote;
-
- gst_buffer_unref (buf);
-
- /* FIXME: emit signal ? */
}
static void
/* create a socket for sending to remote machine */
static gboolean
-gst_tcpclientsink_init_send (GstTCPClientSink * this)
+gst_tcpclientsink_start (GstTCPClientSink * this)
{
int ret;
gchar *ip;
+ if (GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN))
+ return TRUE;
+
/* reset caps_sent flag */
this->caps_sent = FALSE;
return TRUE;
}
-static void
-gst_tcpclientsink_close (GstTCPClientSink * this)
+static gboolean
+gst_tcpclientsink_stop (GstTCPClientSink * this)
{
+ if (!GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN))
+ return TRUE;
+
if (this->sock_fd != -1) {
close (this->sock_fd);
this->sock_fd = -1;
}
GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
+
+ return TRUE;
}
static GstElementStateReturn
gst_tcpclientsink_change_state (GstElement * element)
{
- g_return_val_if_fail (GST_IS_TCPCLIENTSINK (element), GST_STATE_FAILURE);
-
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN))
- gst_tcpclientsink_close (GST_TCPCLIENTSINK (element));
- } else {
- if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) {
- if (!gst_tcpclientsink_init_send (GST_TCPCLIENTSINK (element)))
- return GST_STATE_FAILURE;
- }
+ GstTCPClientSink *sink;
+ gint transition;
+ GstElementStateReturn res;
+
+ sink = GST_TCPCLIENTSINK (element);
+ transition = GST_STATE_TRANSITION (element);
+
+ switch (transition) {
+ case GST_STATE_NULL_TO_READY:
+ case GST_STATE_READY_TO_PAUSED:
+ if (!gst_tcpclientsink_start (GST_TCPCLIENTSINK (element)))
+ goto start_failure;
+ break;
+ default:
+ break;
}
+ res = GST_ELEMENT_CLASS (parent_class)->change_state (element);
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+ switch (transition) {
+ case GST_STATE_READY_TO_NULL:
+ gst_tcpclientsink_stop (GST_TCPCLIENTSINK (element));
+ default:
+ break;
+ }
+ return res;
- return GST_STATE_SUCCESS;
+start_failure:
+ {
+ return GST_STATE_FAILURE;
+ }
}
#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+
#include "gsttcp.h"
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
#include <stdio.h>
#include <stdlib.h>
} GstTCPClientSinkFlags;
struct _GstTCPClientSink {
- GstElement element;
-
- /* pad */
- GstPad *sinkpad;
+ GstBaseSink element;
/* server information */
int port;
size_t data_written; /* how much bytes have we written ? */
GstTCPProtocolType protocol; /* used with the protocol enum */
gboolean caps_sent; /* whether or not we sent caps already */
-
- guint mtu;
- GstClock *clock;
};
struct _GstTCPClientSinkClass {
- GstElementClass parent_class;
+ GstBaseSinkClass parent_class;
};
GType gst_tcpclientsink_get_type(void);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
+G_END_DECLS
#endif /* __GST_TCPCLIENTSINK_H__ */
"Receive data as a client over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
/* TCPClientSrc signals and args */
enum
{
static void gst_tcpclientsrc_init (GstTCPClientSrc * tcpclientsrc);
static void gst_tcpclientsrc_finalize (GObject * gobject);
-static GstCaps *gst_tcpclientsrc_getcaps (GstPad * pad);
+static GstCaps *gst_tcpclientsrc_getcaps (GstBaseSrc * psrc);
-static GstData *gst_tcpclientsrc_get (GstPad * pad);
-static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement *
- element);
+static GstFlowReturn gst_tcpclientsrc_create (GstPushSrc * psrc,
+ GstBuffer ** outbuf);
+static gboolean gst_tcpclientsrc_stop (GstBaseSrc * bsrc);
+static gboolean gst_tcpclientsrc_start (GstBaseSrc * bsrc);
+static gboolean gst_tcpclientsrc_unlock (GstBaseSrc * bsrc);
static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static void gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
{
static GType tcpclientsrc_type = 0;
-
if (!tcpclientsrc_type) {
static const GTypeInfo tcpclientsrc_info = {
sizeof (GstTCPClientSrcClass),
};
tcpclientsrc_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSrc",
+ g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPClientSrc",
&tcpclientsrc_info, 0);
}
return tcpclientsrc_type;
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&srctemplate));
+
gst_element_class_set_details (element_class, &gst_tcpclientsrc_details);
}
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
+ GstBaseSrcClass *gstbasesrc_class;
+ GstPushSrcClass *gstpushsrc_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
+ gstbasesrc_class = (GstBaseSrcClass *) klass;
+ gstpushsrc_class = (GstPushSrcClass *) klass;
+
+ parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+ gobject_class->set_property = gst_tcpclientsrc_set_property;
+ gobject_class->get_property = gst_tcpclientsrc_get_property;
+ gobject_class->finalize = gst_tcpclientsrc_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
- gobject_class->set_property = gst_tcpclientsrc_set_property;
- gobject_class->get_property = gst_tcpclientsrc_get_property;
- gobject_class->finalize = gst_tcpclientsrc_finalize;
+ gstbasesrc_class->get_caps = gst_tcpclientsrc_getcaps;
+ gstbasesrc_class->start = gst_tcpclientsrc_start;
+ gstbasesrc_class->stop = gst_tcpclientsrc_stop;
+ gstbasesrc_class->unlock = gst_tcpclientsrc_unlock;
- gstelement_class->change_state = gst_tcpclientsrc_change_state;
- gstelement_class->set_clock = gst_tcpclientsrc_set_clock;
+ gstpushsrc_class->create = gst_tcpclientsrc_create;
GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
"TCP Client Source");
}
static void
-gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock)
-{
- GstTCPClientSrc *tcpclientsrc;
-
- tcpclientsrc = GST_TCPCLIENTSRC (element);
-
- tcpclientsrc->clock = clock;
-}
-
-static void
gst_tcpclientsrc_init (GstTCPClientSrc * this)
{
- /* create the src pad */
- this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
- gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
- gst_pad_set_get_function (this->srcpad, gst_tcpclientsrc_get);
- gst_pad_set_getcaps_function (this->srcpad, gst_tcpclientsrc_getcaps);
-
this->port = TCP_DEFAULT_PORT;
this->host = g_strdup (TCP_DEFAULT_HOST);
- this->clock = NULL;
this->sock_fd = -1;
this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
- this->curoffset = 0;
this->caps = NULL;
+ this->curoffset = 0;
+
+ gst_base_src_set_live (GST_BASESRC (this), TRUE);
GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
}
}
static GstCaps *
-gst_tcpclientsrc_getcaps (GstPad * pad)
+gst_tcpclientsrc_getcaps (GstBaseSrc * bsrc)
{
GstTCPClientSrc *src;
GstCaps *caps = NULL;
- src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
+ src = GST_TCPCLIENTSRC (bsrc);
if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN))
caps = gst_caps_new_any ();
return caps;
}
-/* close the socket and associated resources
- * unset OPEN flag
- * used both to recover from errors and go to NULL state */
-static void
-gst_tcpclientsrc_close (GstTCPClientSrc * this)
-{
- GST_DEBUG_OBJECT (this, "closing socket");
- if (this->sock_fd != -1) {
- close (this->sock_fd);
- this->sock_fd = -1;
- }
- this->caps_received = FALSE;
- if (this->caps) {
- gst_caps_free (this->caps);
- this->caps = NULL;
- }
- GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
-}
-
-/* close socket and related items and return an EOS GstData
- * called from _get */
-static GstData *
-gst_tcpclientsrc_eos (GstTCPClientSrc * src)
-{
- GST_DEBUG_OBJECT (src, "going to EOS");
- gst_element_set_eos (GST_ELEMENT (src));
- gst_tcpclientsrc_close (src);
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
-}
-
-static GstData *
-gst_tcpclientsrc_get (GstPad * pad)
+static GstFlowReturn
+gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPClientSrc *src;
size_t readsize;
int ret;
-
- GstData *data = NULL;
GstBuffer *buf = NULL;
- g_return_val_if_fail (pad != NULL, NULL);
- g_return_val_if_fail (GST_IS_PAD (pad), NULL);
- src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
- if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) {
- GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
- return NULL;
- }
- GST_LOG_OBJECT (src, "asked for a buffer");
+ src = GST_TCPCLIENTSRC (psrc);
- /* try to negotiate here */
- if (!gst_pad_is_negotiated (pad)) {
- if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) {
- GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM);
- gst_buffer_unref (buf);
- return gst_tcpclientsrc_eos (src);
- }
- }
+ if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN))
+ goto wrong_state;
- /* if we have a left over buffer after a discont, return that */
- if (src->buffer_after_discont) {
- buf = src->buffer_after_discont;
- GST_LOG_OBJECT (src,
- "Returning buffer after discont of size %d, ts %"
- GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
- ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
- GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
- GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
- GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
- src->buffer_after_discont = NULL;
- return GST_DATA (buf);
- }
+ GST_LOG_OBJECT (src, "asked for a buffer");
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->sock_fd, &testfds);
- ret = select (src->sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, 0);
+
/* no action (0) is an error too in our case */
- if (ret <= 0) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("select failed: %s", g_strerror (errno)));
- return gst_tcpclientsrc_eos (src);
- }
+ if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0)
+ goto select_error;
/* ask how much is available for reading on the socket */
- ret = ioctl (src->sock_fd, FIONREAD, &readsize);
- if (ret < 0) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("ioctl failed: %s", g_strerror (errno)));
- return gst_tcpclientsrc_eos (src);
- }
+ if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0)
+ goto ioctl_error;
+
GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
+
buf = gst_buffer_new_and_alloc (readsize);
break;
- case GST_TCP_PROTOCOL_TYPE_GDP:
- if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
- return gst_tcpclientsrc_eos (src);
- }
- if (GST_IS_EVENT (data)) {
- /* if we got back an EOS event, then we should go into eos ourselves */
- if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
- gst_event_unref (data);
- return gst_tcpclientsrc_eos (src);
- }
- return data;
- }
- buf = GST_BUFFER (data);
+ case GST_TCP_PROTOCOL_TYPE_GDP:
+ if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd)))
+ goto hit_eos;
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
+
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break;
default:
- g_warning ("Unhandled protocol type");
+ /* need to assert as buf == NULL */
+ g_assert ("Unhandled protocol type");
break;
}
GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
- ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
- if (ret < 0) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- gst_buffer_unref (buf);
- return gst_tcpclientsrc_eos (src);
- }
+ if ((ret =
+ gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf),
+ readsize)) < 0)
+ goto read_error;
/* if we read 0 bytes, and we're blocking, we hit eos */
- if (ret == 0) {
- GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
- gst_buffer_unref (buf);
- return gst_tcpclientsrc_eos (src);
- }
+ if (ret == 0)
+ goto zero_read;
readsize = ret;
GST_BUFFER_SIZE (buf) = readsize;
- GST_BUFFER_MAXSIZE (buf) = readsize;
-
- /* FIXME: we could decide to set OFFSET and OFFSET_END for non-protocol
- * streams to mean the bytes processed */
-
- /* if this is our first buffer, we need to send a discont with the
- * given timestamp or the current offset, and store the buffer for
- * the next iteration through the get loop */
- if (src->send_discont) {
- GstClockTime timestamp;
- GstEvent *event;
-
- src->send_discont = FALSE;
- src->buffer_after_discont = buf;
- /* if the timestamp is valid, send a timed discont
- * taking into account the incoming buffer's timestamps */
- timestamp = GST_BUFFER_TIMESTAMP (buf);
- if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
- GST_DEBUG_OBJECT (src,
- "sending discontinuous with timestamp %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timestamp));
- event =
- gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, timestamp, NULL);
- return GST_DATA (event);
- }
- /* otherwise, send an offset discont */
- GST_DEBUG_OBJECT (src, "sending discontinuous with offset %d",
- src->curoffset);
- event =
- gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset,
- NULL);
- return GST_DATA (event);
- }
src->curoffset += readsize;
+
GST_LOG_OBJECT (src,
"Returning buffer from _get of size %d, ts %"
GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
- return GST_DATA (buf);
+
+ gst_buffer_set_caps (buf, src->caps);
+
+ *outbuf = buf;
+
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+wrong_state:
+ {
+ GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
+ return GST_FLOW_WRONG_STATE;
+ }
+select_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("select failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+ioctl_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("ioctl failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+hit_eos:
+ {
+ return GST_FLOW_WRONG_STATE;
+ }
+read_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+zero_read:
+ {
+ GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
+ gst_buffer_unref (buf);
+ return GST_FLOW_WRONG_STATE;
+ }
}
static void
/* create a socket for connecting to remote server */
static gboolean
-gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
+gst_tcpclientsrc_start (GstBaseSrc * bsrc)
{
int ret;
gchar *ip;
+ GstTCPClientSrc *src = GST_TCPCLIENTSRC (bsrc);
/* create receiving client socket */
- GST_DEBUG_OBJECT (this, "opening receiving client socket to %s:%d",
- this->host, this->port);
- if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
- return FALSE;
- }
- GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
- this->sock_fd);
- GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
+ GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
+ src->host, src->port);
+
+ if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+ goto no_socket;
+
+ GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
+ src->sock_fd);
+ GST_FLAG_SET (src, GST_TCPCLIENTSRC_OPEN);
/* look up name if we need to */
- ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
- if (!ip) {
- gst_tcpclientsrc_close (this);
- return FALSE;
- }
- GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
+ if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
+ goto name_resolv;
+
+ GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
/* connect to server */
- memset (&this->server_sin, 0, sizeof (this->server_sin));
- this->server_sin.sin_family = AF_INET; /* network socket */
- this->server_sin.sin_port = htons (this->port); /* on port */
- this->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
+ memset (&src->server_sin, 0, sizeof (src->server_sin));
+ src->server_sin.sin_family = AF_INET; /* network socket */
+ src->server_sin.sin_port = htons (src->port); /* on port */
+ src->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */
g_free (ip);
- GST_DEBUG_OBJECT (this, "connecting to server");
- ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
- sizeof (this->server_sin));
+ GST_DEBUG_OBJECT (src, "connecting to server");
+ ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin,
+ sizeof (src->server_sin));
if (ret) {
- gst_tcpclientsrc_close (this);
+ gst_tcpclientsrc_stop (GST_BASESRC (src));
switch (errno) {
case ECONNREFUSED:
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
- (_("Connection to %s:%d refused."), this->host, this->port),
- (NULL));
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
+ (_("Connection to %s:%d refused."), src->host, src->port), (NULL));
return FALSE;
break;
default:
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
- ("connect to %s:%d failed: %s", this->host, this->port,
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+ ("connect to %s:%d failed: %s", src->host, src->port,
g_strerror (errno)));
return FALSE;
break;
}
}
- this->send_discont = TRUE;
- this->buffer_after_discont = NULL;
-
/* get the caps if we're using GDP */
- if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
+ if (src->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
/* if we haven't received caps yet, we should get them first */
- if (!this->caps_received) {
+ if (!src->caps_received) {
GstCaps *caps;
- GST_DEBUG_OBJECT (this, "getting caps through GDP");
- if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) {
- gst_tcpclientsrc_close (this);
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("Could not read caps through GDP"));
- return FALSE;
- }
- if (!GST_IS_CAPS (caps)) {
- gst_tcpclientsrc_close (this);
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("Could not read caps through GDP"));
- return FALSE;
- }
- GST_DEBUG_OBJECT (this, "Received caps through GDP: %" GST_PTR_FORMAT,
+ GST_DEBUG_OBJECT (src, "getting caps through GDP");
+ if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd)))
+ goto no_caps;
+
+ if (!GST_IS_CAPS (caps))
+ goto no_caps;
+
+ GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT,
caps);
- this->caps_received = TRUE;
- this->caps = caps;
+
+ src->caps_received = TRUE;
+ src->caps = caps;
}
}
return TRUE;
+
+no_socket:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+ return FALSE;
+ }
+name_resolv:
+ {
+ gst_tcpclientsrc_stop (GST_BASESRC (src));
+ return FALSE;
+ }
+no_caps:
+ {
+ gst_tcpclientsrc_stop (GST_BASESRC (src));
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Could not read caps through GDP"));
+ return FALSE;
+ }
}
-static GstElementStateReturn
-gst_tcpclientsrc_change_state (GstElement * element)
+/* close the socket and associated resources
+ * unset OPEN flag
+ * used both to recover from errors and go to NULL state */
+static gboolean
+gst_tcpclientsrc_stop (GstBaseSrc * bsrc)
{
- g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
+ GstTCPClientSrc *src;
- /* if open and going to NULL, close it */
- if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
- GST_STATE_PENDING (element) == GST_STATE_NULL) {
- gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
+ src = GST_TCPCLIENTSRC (bsrc);
+
+ GST_DEBUG_OBJECT (src, "closing socket");
+ if (src->sock_fd != -1) {
+ close (src->sock_fd);
+ src->sock_fd = -1;
}
- /* if closed and going to a state higher than NULL, open it */
- if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
- GST_STATE_PENDING (element) > GST_STATE_NULL) {
- if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
- return GST_STATE_FAILURE;
+ src->caps_received = FALSE;
+ if (src->caps) {
+ gst_caps_unref (src->caps);
+ src->caps = NULL;
}
+ GST_FLAG_UNSET (src, GST_TCPCLIENTSRC_OPEN);
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+ return TRUE;
+}
- return GST_STATE_SUCCESS;
+static gboolean
+gst_tcpclientsrc_unlock (GstBaseSrc * bsrc)
+{
+ return TRUE;
}
#define __GST_TCPCLIENTSRC_H__
#include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
#include <netdb.h> /* sockaddr_in */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> /* sockaddr_in */
#include <unistd.h>
+
#include "gsttcp.h"
#define GST_TYPE_TCPCLIENTSRC \
} GstTCPClientSrcFlags;
struct _GstTCPClientSrc {
- GstElement element;
-
- /* pad */
- GstPad *srcpad;
+ GstPushSrc element;
/* server information */
int port;
GstTCPProtocolType protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
GstCaps *caps;
- GstClock *clock;
-
- gboolean send_discont; /* TRUE when we need to send a discont */
- GstBuffer *buffer_after_discont; /* temporary storage for buffer */
};
struct _GstTCPClientSrcClass {
- GstElementClass parent_class;
+ GstPushSrcClass parent_class;
};
GType gst_tcpclientsrc_get_type (void);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_END_DECLS
#endif /* __GST_TCPCLIENTSRC_H__ */
#include "config.h"
#endif
-#include "gsttcpsrc.h"
-#include "gsttcpsink.h"
+#include <gst/dataprotocol/dataprotocol.h>
#include "gsttcpclientsrc.h"
#include "gsttcpclientsink.h"
#include "gsttcpserversrc.h"
static gboolean
plugin_init (GstPlugin * plugin)
{
- if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE,
- GST_TYPE_TCPSINK))
- return FALSE;
-
- if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC))
- return FALSE;
+ gst_dp_init ();
if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE,
GST_TYPE_TCPCLIENTSINK))
parent_class = g_type_class_ref (GST_TYPE_MULTIFDSINK);
+ gobject_class->set_property = gst_tcpserversink_set_property;
+ gobject_class->get_property = gst_tcpserversink_get_property;
+ gobject_class->finalize = gst_tcpserversink_finalize;
+
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE));
g_param_spec_int ("port", "port", "The port to send the packets to",
0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
- gobject_class->set_property = gst_tcpserversink_set_property;
- gobject_class->get_property = gst_tcpserversink_get_property;
- gobject_class->finalize = gst_tcpserversink_finalize;
-
gstmultifdsink_class->init = gst_tcpserversink_init_send;
gstmultifdsink_class->wait = gst_tcpserversink_handle_wait;
gstmultifdsink_class->close = gst_tcpserversink_close;
#include <gst/gst.h>
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
#include <stdio.h>
#include <stdlib.h>
GType gst_tcpserversink_get_type (void);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
+G_END_DECLS
#endif /* __GST_TCPSERVERSINK_H__ */
"Receive data as a server over the network via TCP",
"Thomas Vander Stichele <thomas at apestaart dot org>");
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+ GST_PAD_SRC,
+ GST_PAD_ALWAYS,
+ GST_STATIC_CAPS_ANY);
+
+
/* TCPServerSrc signals and args */
enum
{
static void gst_tcpserversrc_init (GstTCPServerSrc * tcpserversrc);
static void gst_tcpserversrc_finalize (GObject * gobject);
-static GstData *gst_tcpserversrc_get (GstPad * pad);
-static GstElementStateReturn gst_tcpserversrc_change_state (GstElement *
- element);
+static gboolean gst_tcpserversrc_start (GstBaseSrc * bsrc);
+static gboolean gst_tcpserversrc_stop (GstBaseSrc * bsrc);
+static GstFlowReturn gst_tcpserversrc_create (GstPushSrc * psrc,
+ GstBuffer ** buf);
static void gst_tcpserversrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_tcpserversrc_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static void gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock);
static GstElementClass *parent_class = NULL;
};
tcpserversrc_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSrc",
+ g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPServerSrc",
&tcpserversrc_info, 0);
}
return tcpserversrc_type;
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+ gst_element_class_add_pad_template (element_class,
+ gst_static_pad_template_get (&srctemplate));
+
gst_element_class_set_details (element_class, &gst_tcpserversrc_details);
}
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
+ GstBaseSrcClass *gstbasesrc_class;
+ GstPushSrcClass *gstpushsrc_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
+ gstbasesrc_class = (GstBaseSrcClass *) klass;
+ gstpushsrc_class = (GstPushSrcClass *) klass;
+
+ parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+ gobject_class->set_property = gst_tcpserversrc_set_property;
+ gobject_class->get_property = gst_tcpserversrc_get_property;
+ gobject_class->finalize = gst_tcpserversrc_finalize;
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
g_param_spec_string ("host", "Host", "The hostname to listen as",
GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
G_PARAM_READWRITE));
- gobject_class->set_property = gst_tcpserversrc_set_property;
- gobject_class->get_property = gst_tcpserversrc_get_property;
- gobject_class->finalize = gst_tcpserversrc_finalize;
+ gstbasesrc_class->start = gst_tcpserversrc_start;
+ gstbasesrc_class->stop = gst_tcpserversrc_stop;
- gstelement_class->change_state = gst_tcpserversrc_change_state;
- gstelement_class->set_clock = gst_tcpserversrc_set_clock;
+ gstpushsrc_class->create = gst_tcpserversrc_create;
GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
"TCP Server Source");
}
static void
-gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock)
+gst_tcpserversrc_init (GstTCPServerSrc * src)
{
- GstTCPServerSrc *tcpserversrc;
-
- tcpserversrc = GST_TCPSERVERSRC (element);
-
- tcpserversrc->clock = clock;
-}
-
-static void
-gst_tcpserversrc_init (GstTCPServerSrc * this)
-{
- /* create the src pad */
- this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
- gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
- gst_pad_set_get_function (this->srcpad, gst_tcpserversrc_get);
-
- this->server_port = TCP_DEFAULT_PORT;
- this->host = g_strdup (TCP_DEFAULT_HOST);
- this->clock = NULL;
- this->server_sock_fd = -1;
- this->client_sock_fd = -1;
- this->curoffset = 0;
- this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
-
- GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
+ src->server_port = TCP_DEFAULT_PORT;
+ src->host = g_strdup (TCP_DEFAULT_HOST);
+ src->server_sock_fd = -1;
+ src->client_sock_fd = -1;
+ src->curoffset = 0;
+ src->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
+
+ GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN);
}
static void
gst_tcpserversrc_finalize (GObject * gobject)
{
- GstTCPServerSrc *this = GST_TCPSERVERSRC (gobject);
+ GstTCPServerSrc *src = GST_TCPSERVERSRC (gobject);
- g_free (this->host);
+ g_free (src->host);
}
-/* read the gdp caps packet from the socket */
-static GstCaps *
-gst_tcpserversrc_gdp_read_caps (GstTCPServerSrc * this)
-{
- size_t header_length = GST_DP_HEADER_LENGTH;
- size_t readsize;
- guint8 *header = NULL;
- guint8 *payload = NULL;
- ssize_t ret;
- GstCaps *caps;
- gchar *string;
-
- header = g_malloc (header_length);
-
- readsize = header_length;
- GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
- ret = read (this->client_sock_fd, header, readsize);
- if (ret < 0) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- g_free (header);
- return NULL;
- }
- g_assert (ret == readsize);
-
- if (!gst_dp_validate_header (header_length, header)) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP caps packet header does not validate"));
- g_free (header);
- return NULL;
- }
-
- readsize = gst_dp_header_payload_length (header);
- payload = g_malloc (readsize);
- GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
- ret = read (this->client_sock_fd, payload, readsize);
- if (ret < 0) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- g_free (header);
- g_free (payload);
- return NULL;
- }
- g_assert (ret == readsize);
-
- if (!gst_dp_validate_payload (readsize, header, payload)) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP caps packet payload does not validate"));
- g_free (header);
- g_free (payload);
- return NULL;
- }
-
- caps = gst_dp_caps_from_packet (header_length, header, payload);
- string = gst_caps_to_string (caps);
- GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
- g_free (string);
-
- g_free (header);
- g_free (payload);
-
- return caps;
-}
-
-/* read the gdp buffer header from the socket
- * returns a GstData,
- * representing the new GstBuffer to read data into, or an EOS event
- */
-static GstData *
-gst_tcpserversrc_gdp_read_header (GstTCPServerSrc * this)
-{
- size_t header_length = GST_DP_HEADER_LENGTH;
- size_t readsize;
- guint8 *header = NULL;
- ssize_t ret;
- GstBuffer *buffer;
-
- header = g_malloc (header_length);
- readsize = header_length;
-
- GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
- ret = read (this->client_sock_fd, header, readsize);
- /* if we read 0 bytes, and we're blocking, we hit eos */
- if (ret == 0) {
- GST_DEBUG ("blocking read returns 0, EOS");
- gst_element_set_eos (GST_ELEMENT (this));
- g_free (header);
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
- if (ret < 0) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- g_free (header);
- return NULL;
- }
- if (ret != readsize) {
- g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
- }
- g_assert (ret == readsize);
-
- if (!gst_dp_validate_header (header_length, header)) {
- GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
- ("GDP buffer packet header does not validate"));
- g_free (header);
- return NULL;
- }
- GST_LOG_OBJECT (this, "validated buffer packet header");
-
- buffer = gst_dp_buffer_from_header (header_length, header);
-
- GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
- return GST_DATA (buffer);
-}
-
-static GstData *
-gst_tcpserversrc_get (GstPad * pad)
+static GstFlowReturn
+gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
{
GstTCPServerSrc *src;
size_t readsize;
int ret;
-
- GstData *data = NULL;
GstBuffer *buf = NULL;
GstCaps *caps;
- g_return_val_if_fail (pad != NULL, NULL);
- g_return_val_if_fail (GST_IS_PAD (pad), NULL);
- src = GST_TCPSERVERSRC (GST_OBJECT_PARENT (pad));
- g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), NULL);
+ src = GST_TCPSERVERSRC (psrc);
+
+ g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN),
+ GST_FLOW_ERROR);
/* read the buffer header if we're using a protocol */
switch (src->protocol) {
+ case GST_TCP_PROTOCOL_TYPE_NONE:
+ {
fd_set testfds;
- case GST_TCP_PROTOCOL_TYPE_NONE:
/* do a blocking select on the socket */
FD_ZERO (&testfds);
FD_SET (src->client_sock_fd, &testfds);
- ret =
- select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0,
- 0);
+
/* no action (0) is an error too in our case */
- if (ret <= 0) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("select failed: %s", g_strerror (errno)));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
+ if ((ret =
+ select (src->client_sock_fd + 1, &testfds, (fd_set *) 0,
+ (fd_set *) 0, 0)) <= 0)
+ goto select_error;
+
/* ask how much is available for reading on the socket */
- ret = ioctl (src->client_sock_fd, FIONREAD, &readsize);
- if (ret < 0) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("ioctl failed: %s", g_strerror (errno)));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
+ if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0)
+ goto ioctl_error;
buf = gst_buffer_new_and_alloc (readsize);
break;
+ }
case GST_TCP_PROTOCOL_TYPE_GDP:
/* if we haven't received caps yet, we should get them first */
if (!src->caps_received) {
gchar *string;
- if (!(caps = gst_tcpserversrc_gdp_read_caps (src))) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Could not read caps through GDP"));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
+ if (!(caps =
+ gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd)))
+ goto gdp_caps_read_error;
+
src->caps_received = TRUE;
string = gst_caps_to_string (caps);
GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
g_free (string);
- if (!gst_pad_try_set_caps (pad, caps)) {
- g_warning ("Could not set caps");
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
+ gst_pad_set_caps (GST_BASESRC_PAD (psrc), caps);
}
/* now receive the buffer header */
- if (!(data = gst_tcpserversrc_gdp_read_header (src))) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
- ("Could not read data header through GDP"));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
- if (GST_IS_EVENT (data))
- return data;
- buf = GST_BUFFER (data);
+ if (!(buf =
+ gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd)))
+ goto gdp_buffer_read_error;
GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
buf);
+
/* use this new buffer to read data into */
readsize = GST_BUFFER_SIZE (buf);
break;
}
GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
- ret =
- gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
- readsize);
- if (ret < 0) {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- gst_buffer_unref (buf);
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
+ if ((ret =
+ gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
+ readsize)) < 0)
+ goto read_error;
/* if we read 0 bytes, and we're blocking, we hit eos */
- if (ret == 0) {
- GST_DEBUG ("blocking read returns 0, EOS");
- gst_buffer_unref (buf);
- gst_element_set_eos (GST_ELEMENT (src));
- return GST_DATA (gst_event_new (GST_EVENT_EOS));
- }
+ if (ret == 0)
+ goto hit_eos;
readsize = ret;
GST_LOG_OBJECT (src, "Read %d bytes", readsize);
GST_BUFFER_SIZE (buf) = readsize;
- GST_BUFFER_MAXSIZE (buf) = readsize;
GST_BUFFER_OFFSET (buf) = src->curoffset;
GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
src->curoffset += readsize;
- return GST_DATA (buf);
+
+ *outbuf = buf;
+
+ return GST_FLOW_OK;
+
+ /* ERROR */
+select_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("select failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+ioctl_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("ioctl failed: %s", g_strerror (errno)));
+ return GST_FLOW_ERROR;
+ }
+gdp_caps_read_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Could not read caps through GDP"));
+ return GST_FLOW_ERROR;
+ }
+gdp_buffer_read_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("Could not read buffer header through GDP"));
+ return GST_FLOW_ERROR;
+ }
+read_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ gst_buffer_unref (buf);
+ return GST_FLOW_ERROR;
+ }
+hit_eos:
+ {
+ GST_DEBUG ("blocking read returns 0, EOS");
+ gst_buffer_unref (buf);
+ return GST_FLOW_WRONG_STATE;
+ }
}
/* set up server */
static gboolean
-gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
+gst_tcpserversrc_start (GstBaseSrc * bsrc)
{
int ret;
+ GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc);
/* reset caps_received flag */
- this->caps_received = FALSE;
+ src->caps_received = FALSE;
/* create the server listener socket */
- if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
- return FALSE;
- }
- GST_DEBUG_OBJECT (this, "opened receiving server socket with fd %d",
- this->server_sock_fd);
+ if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+ goto socket_error;
+
+ GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
+ src->server_sock_fd);
/* make address reusable */
ret = 1;
- if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
- sizeof (int)) < 0) {
- GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
- ("Could not setsockopt: %s", g_strerror (errno)));
- return FALSE;
- }
+ if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
+ sizeof (int)) < 0)
+ goto sock_opt;
/* name the socket */
- memset (&this->server_sin, 0, sizeof (this->server_sin));
- this->server_sin.sin_family = AF_INET; /* network socket */
- this->server_sin.sin_port = htons (this->server_port); /* on port */
- if (this->host) {
- gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
-
- if (!host) {
- gst_tcp_socket_close (&this->server_sock_fd);
- return FALSE;
- }
-
- this->server_sin.sin_addr.s_addr = inet_addr (host);
+ memset (&src->server_sin, 0, sizeof (src->server_sin));
+ src->server_sin.sin_family = AF_INET; /* network socket */
+ src->server_sin.sin_port = htons (src->server_port); /* on port */
+ if (src->host) {
+ gchar *host;
+
+ if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
+ goto host_error;
+ src->server_sin.sin_addr.s_addr = inet_addr (host);
g_free (host);
} else
- this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
+ src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
/* bind it */
- GST_DEBUG_OBJECT (this, "binding server socket to address");
- ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
- sizeof (this->server_sin));
+ GST_DEBUG_OBJECT (src, "binding server socket to address");
+ if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
+ sizeof (src->server_sin))) < 0)
+ goto bind_error;
+
+ GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
+ src->server_sock_fd, TCP_BACKLOG);
+
+ if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
+ goto listen_error;
+
+ /* FIXME: maybe we should think about moving actual client accepting
+ somewhere else */
+ GST_DEBUG_OBJECT (src, "waiting for client");
+ if ((src->client_sock_fd =
+ accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
+ &src->client_sin_len)) == -1)
+ goto accept_error;
+
+ GST_DEBUG_OBJECT (src, "received client");
+
+ GST_FLAG_SET (src, GST_TCPSERVERSRC_OPEN);
- if (ret) {
- gst_tcp_socket_close (&this->server_sock_fd);
+ return TRUE;
+
+ /* ERRORS */
+socket_error:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+ return FALSE;
+ }
+sock_opt:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+ ("Could not setsockopt: %s", g_strerror (errno)));
+ return FALSE;
+ }
+host_error:
+ {
+ gst_tcp_socket_close (&src->server_sock_fd);
+ return FALSE;
+ }
+bind_error:
+ {
+ gst_tcp_socket_close (&src->server_sock_fd);
switch (errno) {
default:
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("bind failed: %s", g_strerror (errno)));
- return FALSE;
break;
}
+ return FALSE;
}
-
- GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
- this->server_sock_fd, TCP_BACKLOG);
- if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
- gst_tcp_socket_close (&this->server_sock_fd);
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+listen_error:
+ {
+ gst_tcp_socket_close (&src->server_sock_fd);
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno)));
return FALSE;
}
-
- /* FIXME: maybe we should think about moving actual client accepting
- somewhere else */
- GST_DEBUG_OBJECT (this, "waiting for client");
- this->client_sock_fd =
- accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
- &this->client_sin_len);
- if (this->client_sock_fd == -1) {
- gst_tcp_socket_close (&this->server_sock_fd);
- GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+accept_error:
+ {
+ gst_tcp_socket_close (&src->server_sock_fd);
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
("Could not accept client on server socket: %s", g_strerror (errno)));
return FALSE;
}
- GST_DEBUG_OBJECT (this, "received client");
-
- GST_FLAG_SET (this, GST_TCPSERVERSRC_OPEN);
- return TRUE;
}
-static void
-gst_tcpserversrc_close (GstTCPServerSrc * this)
+static gboolean
+gst_tcpserversrc_stop (GstBaseSrc * bsrc)
{
- if (this->server_sock_fd != -1) {
- close (this->server_sock_fd);
- this->server_sock_fd = -1;
- }
- if (this->client_sock_fd != -1) {
- close (this->client_sock_fd);
- this->client_sock_fd = -1;
- }
- GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
-}
+ GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc);
-static GstElementStateReturn
-gst_tcpserversrc_change_state (GstElement * element)
-{
- g_return_val_if_fail (GST_IS_TCPSERVERSRC (element), GST_STATE_FAILURE);
-
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN))
- gst_tcpserversrc_close (GST_TCPSERVERSRC (element));
- } else {
- if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) {
- if (!gst_tcpserversrc_init_receive (GST_TCPSERVERSRC (element)))
- return GST_STATE_FAILURE;
- }
+ if (src->server_sock_fd != -1) {
+ close (src->server_sock_fd);
+ src->server_sock_fd = -1;
}
+ if (src->client_sock_fd != -1) {
+ close (src->client_sock_fd);
+ src->client_sock_fd = -1;
+ }
+ GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN);
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
- return GST_STATE_SUCCESS;
+ return TRUE;
}
#define __GST_TCPSERVERSRC_H__
#include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_END_DECLS
#include <errno.h>
#include <string.h>
} GstTCPServerSrcFlags;
struct _GstTCPServerSrc {
- GstElement element;
-
- /* pad */
- GstPad *srcpad;
+ GstPushSrc element;
/* server information */
int server_port;
GstTCPProtocolType protocol; /* protocol used for reading data */
gboolean caps_received; /* if we have received caps yet */
- GstClock *clock;
};
struct _GstTCPServerSrcClass {
- GstElementClass parent_class;
+ GstPushSrcClass parent_class;
};
GType gst_tcpserversrc_get_type (void);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_BEGIN_DECLS
#endif /* __GST_TCPSERVERSRC_H__ */
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-#include "gsttcpsink.h"
-
-#define TCP_DEFAULT_HOST "localhost"
-#define TCP_DEFAULT_PORT 4953
-
-/* elementfactory information */
-static GstElementDetails gst_tcpsink_details =
-GST_ELEMENT_DETAILS ("TCP packet sender",
- "Sink/Network",
- "Send data over the network via TCP",
- "Zeeshan Ali <zak147@yahoo.com>");
-
-/* TCPSink signals and args */
-enum
-{
- FRAME_ENCODED,
- /* FILL ME */
- LAST_SIGNAL
-};
-
-enum
-{
- ARG_0,
- ARG_HOST,
- ARG_PORT,
- ARG_CONTROL,
- ARG_MTU
- /* FILL ME */
-};
-
-#define GST_TYPE_TCPSINK_CONTROL (gst_tcpsink_control_get_type())
-static GType
-gst_tcpsink_control_get_type (void)
-{
- static GType tcpsink_control_type = 0;
- static GEnumValue tcpsink_control[] = {
- {CONTROL_NONE, "1", "none"},
- {CONTROL_TCP, "2", "tcp"},
- {CONTROL_ZERO, NULL, NULL}
- };
-
- if (!tcpsink_control_type) {
- tcpsink_control_type =
- g_enum_register_static ("GstTCPSinkControl", tcpsink_control);
- }
- return tcpsink_control_type;
-}
-
-static void gst_tcpsink_base_init (gpointer g_class);
-static void gst_tcpsink_class_init (GstTCPSink * klass);
-static void gst_tcpsink_init (GstTCPSink * tcpsink);
-
-static void gst_tcpsink_set_clock (GstElement * element, GstClock * clock);
-
-static void gst_tcpsink_chain (GstPad * pad, GstData * _data);
-static GstElementStateReturn gst_tcpsink_change_state (GstElement * element);
-
-static void gst_tcpsink_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec);
-static void gst_tcpsink_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec);
-
-
-static GstElementClass *parent_class = NULL;
-
-/*static guint gst_tcpsink_signals[LAST_SIGNAL] = { 0 }; */
-
-GType
-gst_tcpsink_get_type (void)
-{
- static GType tcpsink_type = 0;
-
-
- if (!tcpsink_type) {
- static const GTypeInfo tcpsink_info = {
- sizeof (GstTCPSinkClass),
- gst_tcpsink_base_init,
- NULL,
- (GClassInitFunc) gst_tcpsink_class_init,
- NULL,
- NULL,
- sizeof (GstTCPSink),
- 0,
- (GInstanceInitFunc) gst_tcpsink_init,
- NULL
- };
-
- tcpsink_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSink", &tcpsink_info,
- 0);
- }
- return tcpsink_type;
-}
-
-static void
-gst_tcpsink_base_init (gpointer g_class)
-{
- GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
-
- gst_element_class_set_details (element_class, &gst_tcpsink_details);
-}
-
-static void
-gst_tcpsink_class_init (GstTCPSink * klass)
-{
- GObjectClass *gobject_class;
- GstElementClass *gstelement_class;
-
- gobject_class = (GObjectClass *) klass;
- gstelement_class = (GstElementClass *) klass;
-
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
-
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
- g_param_spec_string ("host", "host", "The host/IP to send the packets to",
- TCP_DEFAULT_HOST, G_PARAM_READWRITE));
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
- g_param_spec_int ("port", "port", "The port to send the packets to",
- 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_CONTROL,
- g_param_spec_enum ("control", "control", "The type of control",
- GST_TYPE_TCPSINK_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_MTU, g_param_spec_int ("mtu", "mtu", "mtu", G_MININT, G_MAXINT, 0, G_PARAM_READWRITE)); /* CHECKME */
- gobject_class->set_property = gst_tcpsink_set_property;
- gobject_class->get_property = gst_tcpsink_get_property;
-
- gstelement_class->change_state = gst_tcpsink_change_state;
- gstelement_class->set_clock = gst_tcpsink_set_clock;
-}
-
-
-static GstPadLinkReturn
-gst_tcpsink_sink_link (GstPad * pad, const GstCaps * caps)
-{
- GstTCPSink *tcpsink;
-
-#ifndef GST_DISABLE_LOADSAVE
- struct sockaddr_in serv_addr;
- struct in_addr addr;
- struct hostent *he;
- int fd;
- FILE *f;
- xmlDocPtr doc;
-#endif
-
- tcpsink = GST_TCPSINK (gst_pad_get_parent (pad));
-
- switch (tcpsink->control) {
-#ifndef GST_DISABLE_LOADSAVE
- case CONTROL_TCP:
- memset (&serv_addr, 0, sizeof (serv_addr));
-
- /* if its an IP address */
- if (inet_aton (tcpsink->host, &addr)) {
- memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
- }
-
- /* we dont need to lookup for localhost */
- else if (strcmp (tcpsink->host, TCP_DEFAULT_HOST) == 0) {
- if (inet_aton ("127.0.0.1", &addr)) {
- memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
- }
- }
-
- /* if its a hostname */
- else if ((he = gethostbyname (tcpsink->host))) {
- memmove (&(serv_addr.sin_addr), he->h_addr, he->h_length);
- }
-
- else {
- perror ("hostname lookup error?");
- return GST_PAD_LINK_REFUSED;
- }
-
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons (tcpsink->port + 1);
-
- doc = xmlNewDoc ("1.0");
- doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL);
-
- gst_caps_save_thyself (caps, doc->xmlRootNode);
-
- if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
- perror ("socket");
- return GST_PAD_LINK_REFUSED;
- }
-
- if (connect (fd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) != 0) {
- g_printerr ("tcpsink: connect to %s port %d failed: %s\n",
- tcpsink->host, tcpsink->port + 1, g_strerror (errno));
- return GST_PAD_LINK_REFUSED;
- }
-
- f = fdopen (dup (fd), "wb");
-
- xmlDocDump (f, doc);
- fclose (f);
- close (fd);
- break;
-
-#endif
- case CONTROL_NONE:
- return GST_PAD_LINK_OK;
- break;
- default:
- return GST_PAD_LINK_REFUSED;
- break;
- }
-
- return GST_PAD_LINK_OK;
-}
-
-static void
-gst_tcpsink_set_clock (GstElement * element, GstClock * clock)
-{
- GstTCPSink *tcpsink;
-
- tcpsink = GST_TCPSINK (element);
-
- tcpsink->clock = clock;
-}
-
-static void
-gst_tcpsink_init (GstTCPSink * tcpsink)
-{
- /* create the sink and src pads */
- tcpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
- gst_element_add_pad (GST_ELEMENT (tcpsink), tcpsink->sinkpad);
- gst_pad_set_chain_function (tcpsink->sinkpad, gst_tcpsink_chain);
- gst_pad_set_link_function (tcpsink->sinkpad, gst_tcpsink_sink_link);
-
- tcpsink->host = g_strdup (TCP_DEFAULT_HOST);
- tcpsink->port = TCP_DEFAULT_PORT;
- tcpsink->control = CONTROL_TCP;
- /* should support as minimum 576 for IPV4 and 1500 for IPV6 */
- tcpsink->mtu = 1500;
-
- tcpsink->clock = NULL;
-}
-
-static void
-gst_tcpsink_chain (GstPad * pad, GstData * _data)
-{
- GstBuffer *buf = GST_BUFFER (_data);
- GstTCPSink *tcpsink;
-
- g_return_if_fail (pad != NULL);
- g_return_if_fail (GST_IS_PAD (pad));
- g_return_if_fail (buf != NULL);
-
- tcpsink = GST_TCPSINK (GST_OBJECT_PARENT (pad));
-
- if (tcpsink->clock && GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
- gst_element_wait (GST_ELEMENT (tcpsink), GST_BUFFER_TIMESTAMP (buf));
- }
-
- if (write (tcpsink->sock, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)) <= 0) {
- perror ("write");
- }
-
- gst_buffer_unref (buf);
-}
-
-static void
-gst_tcpsink_set_property (GObject * object, guint prop_id, const GValue * value,
- GParamSpec * pspec)
-{
- GstTCPSink *tcpsink;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_TCPSINK (object));
- tcpsink = GST_TCPSINK (object);
-
- switch (prop_id) {
- case ARG_HOST:
- if (tcpsink->host != NULL)
- g_free (tcpsink->host);
- if (g_value_get_string (value) == NULL)
- tcpsink->host = NULL;
- else
- tcpsink->host = g_strdup (g_value_get_string (value));
- break;
- case ARG_PORT:
- tcpsink->port = g_value_get_int (value);
- break;
- case ARG_CONTROL:
- tcpsink->control = g_value_get_enum (value);
- break;
- case ARG_MTU:
- tcpsink->mtu = g_value_get_int (value);
- break;
- default:
- break;
- }
-}
-
-static void
-gst_tcpsink_get_property (GObject * object, guint prop_id, GValue * value,
- GParamSpec * pspec)
-{
- GstTCPSink *tcpsink;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_TCPSINK (object));
- tcpsink = GST_TCPSINK (object);
-
- switch (prop_id) {
- case ARG_HOST:
- g_value_set_string (value, tcpsink->host);
- break;
- case ARG_PORT:
- g_value_set_int (value, tcpsink->port);
- break;
- case ARG_CONTROL:
- g_value_set_enum (value, tcpsink->control);
- break;
- case ARG_MTU:
- g_value_set_int (value, tcpsink->mtu);
- break;
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-
-/* create a socket for sending to remote machine */
-static gboolean
-gst_tcpsink_init_send (GstTCPSink * sink)
-{
- struct hostent *he;
- struct in_addr addr;
-
- memset (&sink->theiraddr, 0, sizeof (sink->theiraddr));
- sink->theiraddr.sin_family = AF_INET; /* host byte order */
- sink->theiraddr.sin_port = htons (sink->port); /* short, network byte order */
-
- /* if its an IP address */
- if (inet_aton (sink->host, &addr)) {
- memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
- }
-
- /* we dont need to lookup for localhost */
- else if (strcmp (sink->host, TCP_DEFAULT_HOST) == 0) {
- if (inet_aton ("127.0.0.1", &addr)) {
- memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
- }
- }
-
- /* if its a hostname */
- else if ((he = gethostbyname (sink->host))) {
- memmove (&(sink->theiraddr.sin_addr), he->h_addr, he->h_length);
- }
-
- else {
- perror ("hostname lookup error?");
- return FALSE;
- }
-
- if ((sink->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
- perror ("socket");
- return FALSE;
- }
-
- if (connect (sink->sock, (struct sockaddr *) &(sink->theiraddr),
- sizeof (sink->theiraddr)) != 0) {
- perror ("stream connect");
- return FALSE;
- }
-
- GST_FLAG_SET (sink, GST_TCPSINK_OPEN);
-
- return TRUE;
-}
-
-static void
-gst_tcpsink_close (GstTCPSink * sink)
-{
- close (sink->sock);
-
- GST_FLAG_UNSET (sink, GST_TCPSINK_OPEN);
-}
-
-static GstElementStateReturn
-gst_tcpsink_change_state (GstElement * element)
-{
- g_return_val_if_fail (GST_IS_TCPSINK (element), GST_STATE_FAILURE);
-
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN))
- gst_tcpsink_close (GST_TCPSINK (element));
- } else {
- if (!GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) {
- if (!gst_tcpsink_init_send (GST_TCPSINK (element)))
- return GST_STATE_FAILURE;
- }
- }
-
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
- return GST_STATE_SUCCESS;
-}
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-
-#ifndef __GST_TCPSINK_H__
-#define __GST_TCPSINK_H__
-
-
-#include <gst/gst.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <errno.h>
-#include <string.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <sys/wait.h>
-#include <fcntl.h>
-#include <arpa/inet.h>
-#include "gsttcpplugin.h"
-
-#define GST_TYPE_TCPSINK \
- (gst_tcpsink_get_type())
-#define GST_TCPSINK(obj) \
- (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSINK,GstTCPSink))
-#define GST_TCPSINK_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSINK,GstTCPSink))
-#define GST_IS_TCPSINK(obj) \
- (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSINK))
-#define GST_IS_TCPSINK_CLASS(obj) \
- (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSINK))
-
-typedef struct _GstTCPSink GstTCPSink;
-typedef struct _GstTCPSinkClass GstTCPSinkClass;
-
-typedef enum {
- GST_TCPSINK_OPEN = GST_ELEMENT_FLAG_LAST,
-
- GST_TCPSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2,
-} GstTCPSinkFlags;
-
-struct _GstTCPSink {
- GstElement element;
-
- /* pads */
- GstPad *sinkpad,*srcpad;
-
- int sock;
- struct sockaddr_in theiraddr;
- Gst_TCP_Control control;
-
- gint port;
- gchar *host;
-
- guint mtu;
-
- GstClock *clock;
-};
-
-struct _GstTCPSinkClass {
- GstElementClass parent_class;
-};
-
-GType gst_tcpsink_get_type(void);
-
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
-
-#endif /* __GST_TCPSINK_H__ */
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "gsttcpsrc.h"
-#include <unistd.h>
-
-#define TCP_DEFAULT_PORT 4953
-
-/* elementfactory information */
-static GstElementDetails gst_tcpsrc_details =
-GST_ELEMENT_DETAILS ("TCP packet receiver",
- "Source/Network",
- "Receive data over the network via TCP",
- "Zeeshan Ali <zak147@yahoo.com>");
-
-/* TCPSrc signals and args */
-enum
-{
- /* FILL ME */
- LAST_SIGNAL
-};
-
-enum
-{
- ARG_0,
- ARG_PORT,
- ARG_CONTROL
-/* ARG_SOCKET_OPTIONS,*/
- /* FILL ME */
-};
-
-#define GST_TYPE_TCPSRC_CONTROL (gst_tcpsrc_control_get_type())
-static GType
-gst_tcpsrc_control_get_type (void)
-{
- static GType tcpsrc_control_type = 0;
- static GEnumValue tcpsrc_control[] = {
- {CONTROL_NONE, "1", "none"},
- {CONTROL_TCP, "2", "tcp"},
- {CONTROL_ZERO, NULL, NULL}
- };
-
- if (!tcpsrc_control_type) {
- tcpsrc_control_type =
- g_enum_register_static ("GstTCPSrcControl", tcpsrc_control);
- }
- return tcpsrc_control_type;
-}
-
-static void gst_tcpsrc_base_init (gpointer g_class);
-static void gst_tcpsrc_class_init (GstTCPSrc * klass);
-static void gst_tcpsrc_init (GstTCPSrc * tcpsrc);
-
-static GstData *gst_tcpsrc_get (GstPad * pad);
-static GstElementStateReturn gst_tcpsrc_change_state (GstElement * element);
-
-static void gst_tcpsrc_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec);
-static void gst_tcpsrc_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec);
-static void gst_tcpsrc_set_clock (GstElement * element, GstClock * clock);
-
-static GstElementClass *parent_class = NULL;
-
-/*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */
-
-GType
-gst_tcpsrc_get_type (void)
-{
- static GType tcpsrc_type = 0;
-
-
- if (!tcpsrc_type) {
- static const GTypeInfo tcpsrc_info = {
- sizeof (GstTCPSrcClass),
- gst_tcpsrc_base_init,
- NULL,
- (GClassInitFunc) gst_tcpsrc_class_init,
- NULL,
- NULL,
- sizeof (GstTCPSrc),
- 0,
- (GInstanceInitFunc) gst_tcpsrc_init,
- NULL
- };
-
- tcpsrc_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0);
- }
- return tcpsrc_type;
-}
-
-static void
-gst_tcpsrc_base_init (gpointer g_class)
-{
- GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
-
- gst_element_class_set_details (element_class, &gst_tcpsrc_details);
-}
-
-static void
-gst_tcpsrc_class_init (GstTCPSrc * klass)
-{
- GObjectClass *gobject_class;
- GstElementClass *gstelement_class;
-
- gobject_class = (GObjectClass *) klass;
- gstelement_class = (GstElementClass *) klass;
-
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
-
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
- g_param_spec_int ("port", "port", "The port to receive the packets from",
- 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
- g_object_class_install_property (gobject_class, ARG_CONTROL,
- g_param_spec_enum ("control", "control", "The type of control",
- GST_TYPE_TCPSRC_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
-/*
- g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SOCKET_OPTIONS,
- g_param_spec_boolean ("socketop", "socketop", "Enable or disable socket options REUSEADDR and KEEPALIVE",
- FALSE, G_PARAM_READWRITE));
-*/
- gobject_class->set_property = gst_tcpsrc_set_property;
- gobject_class->get_property = gst_tcpsrc_get_property;
-
- gstelement_class->change_state = gst_tcpsrc_change_state;
- gstelement_class->set_clock = gst_tcpsrc_set_clock;
-}
-
-static void
-gst_tcpsrc_set_clock (GstElement * element, GstClock * clock)
-{
- GstTCPSrc *tcpsrc;
-
- tcpsrc = GST_TCPSRC (element);
-
- tcpsrc->clock = clock;
-}
-
-static void
-gst_tcpsrc_init (GstTCPSrc * tcpsrc)
-{
- /* create the src and src pads */
- tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC);
- gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad);
- gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get);
-
- tcpsrc->port = TCP_DEFAULT_PORT;
- tcpsrc->control = CONTROL_TCP;
- tcpsrc->clock = NULL;
- tcpsrc->sock = -1;
- tcpsrc->control_sock = -1;
- tcpsrc->client_sock = -1;
- /*tcpsrc->socket_options = FALSE; */
-
- GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN);
- GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF);
- GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
-}
-
-static GstData *
-gst_tcpsrc_get (GstPad * pad)
-{
- GstTCPSrc *tcpsrc;
- GstBuffer *outbuf;
- socklen_t len;
- gint numbytes;
- fd_set read_fds;
- guint max_sock;
-
-#ifndef GST_DISABLE_LOADSAVE
- int ret, client_sock;
-#endif
- struct sockaddr client_addr;
-
- g_return_val_if_fail (pad != NULL, NULL);
- g_return_val_if_fail (GST_IS_PAD (pad), NULL);
-
- tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad));
-
- FD_ZERO (&read_fds);
- FD_SET (tcpsrc->sock, &read_fds);
-
- max_sock = tcpsrc->sock;
-
- if (tcpsrc->control_sock >= 0) {
- FD_SET (tcpsrc->control_sock, &read_fds);
- max_sock = MAX (tcpsrc->sock, tcpsrc->control_sock);
- }
-
- /* Add to FD_SET client socket, when connection has been established */
- if (tcpsrc->client_sock >= 0) {
- FD_SET (tcpsrc->client_sock, &read_fds);
- max_sock = MAX (tcpsrc->client_sock, max_sock);
- }
-
-
- if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) > 0) {
- if ((tcpsrc->control_sock != -1)
- && FD_ISSET (tcpsrc->control_sock, &read_fds)) {
- guchar *buf = NULL;
-
-#ifndef GST_DISABLE_LOADSAVE
- xmlDocPtr doc;
- GstCaps *caps;
-#endif
-
-
- switch (tcpsrc->control) {
- case CONTROL_TCP:
-
-#ifndef GST_DISABLE_LOADSAVE
- buf = g_malloc (1024 * 10);
-
- len = sizeof (struct sockaddr);
- client_sock = accept (tcpsrc->control_sock, &client_addr, &len);
-
- if (client_sock <= 0) {
- perror ("control_sock accept");
- }
-
- else if ((ret = read (client_sock, buf, 1024 * 10)) <= 0) {
- perror ("control_sock read");
- }
-
- else {
- buf[ret] = '\0';
- doc = xmlParseMemory (buf, ret);
- caps = gst_caps_load_thyself (doc->xmlRootNode);
-
- /* foward the connect, we don't signal back the result here... */
- gst_pad_try_set_caps (tcpsrc->srcpad, caps);
- }
-
- g_free (buf);
-#endif
- break;
- case CONTROL_NONE:
- default:
- g_free (buf);
- return NULL;
- break;
- }
-
- outbuf = NULL;
- } else {
- outbuf = gst_buffer_new ();
- GST_BUFFER_DATA (outbuf) = g_malloc (24000);
- GST_BUFFER_SIZE (outbuf) = 24000;
-
- if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) {
- if (tcpsrc->clock) {
- GstClockTime current_time;
- GstEvent *discont;
-
- current_time = gst_clock_get_time (tcpsrc->clock);
-
- GST_BUFFER_TIMESTAMP (outbuf) = current_time;
-
- discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME,
- current_time, NULL);
-
- gst_pad_push (tcpsrc->srcpad, GST_DATA (discont));
- }
-
- GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF);
- }
-
- else {
- GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
- }
-
- if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) {
- tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len);
-
- if (tcpsrc->client_sock <= 0) {
- perror ("accept");
- }
-
- else {
- GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED);
- }
- }
-
- numbytes =
- read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf),
- GST_BUFFER_SIZE (outbuf));
-
- if (numbytes > 0) {
- GST_BUFFER_SIZE (outbuf) = numbytes;
- }
-
- else {
- if (numbytes == -1) {
- perror ("read");
- } else
- g_print ("End of Stream reached\n");
- gst_buffer_unref (outbuf);
- outbuf = NULL;
- close (tcpsrc->client_sock);
- tcpsrc->client_sock = -1;
- GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
- }
- }
- }
-
- else {
- perror ("select");
- outbuf = NULL;
- }
-
- return GST_DATA (outbuf);
-}
-
-
-static void
-gst_tcpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
- GParamSpec * pspec)
-{
- GstTCPSrc *tcpsrc;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_TCPSRC (object));
- tcpsrc = GST_TCPSRC (object);
-
- switch (prop_id) {
- case ARG_PORT:
- tcpsrc->port = g_value_get_int (value);
- break;
- case ARG_CONTROL:
- tcpsrc->control = g_value_get_enum (value);
- break;
-/* case ARG_SOCKET_OPTIONS:
- tcpsrc->socket_options = g_value_get_boolean(value);
- break; */
- default:
- break;
- }
-}
-
-static void
-gst_tcpsrc_get_property (GObject * object, guint prop_id, GValue * value,
- GParamSpec * pspec)
-{
- GstTCPSrc *tcpsrc;
-
- /* it's not null if we got it, but it might not be ours */
- g_return_if_fail (GST_IS_TCPSRC (object));
- tcpsrc = GST_TCPSRC (object);
-
- switch (prop_id) {
- case ARG_PORT:
- g_value_set_int (value, tcpsrc->port);
- break;
- case ARG_CONTROL:
- g_value_set_enum (value, tcpsrc->control);
- break;
-/* case ARG_SOCKET_OPTIONS:
- g_value_set_boolean(value,tcpsrc->socket_options);
- break;*/
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
-}
-
-/* create a socket for sending to remote machine */
-static gboolean
-gst_tcpsrc_init_receive (GstTCPSrc * src)
-{
- guint val = 0;
-
- memset (&src->myaddr, 0, sizeof (src->myaddr));
- src->myaddr.sin_family = AF_INET; /* host byte order */
- src->myaddr.sin_port = htons (src->port); /* short, network byte order */
- src->myaddr.sin_addr.s_addr = INADDR_ANY;
-
- if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
- perror ("stream_socket");
- return FALSE;
- }
-
-/* if (src->socket_options)
- {*/
- g_print ("Socket Options SO_REUSEADDR, SO_KEEPALIVE\n");
- /* Sock Options */
- val = 1;
- /* allow local address reuse */
- if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (int)) < 0)
- perror ("setsockopt()");
- val = 1;
- /* periodically test if connection still alive */
- if (setsockopt (src->sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof (int)) < 0)
- perror ("setsockopt()");
- /* Sock Options */
-/* } */
-
- if (bind (src->sock, (struct sockaddr *) &src->myaddr,
- sizeof (src->myaddr)) == -1) {
- perror ("stream_sock bind");
- return FALSE;
- }
-
- if (listen (src->sock, 5) == -1) {
- perror ("stream_sock listen");
- return FALSE;
- }
-
- fcntl (src->sock, F_SETFL, O_NONBLOCK);
-
- switch (src->control) {
- case CONTROL_TCP:
- if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
- perror ("control_socket");
- return FALSE;
- }
-
- src->myaddr.sin_port = htons (src->port + 1);
- if (bind (src->control_sock, (struct sockaddr *) &src->myaddr,
- sizeof (src->myaddr)) == -1) {
- perror ("control bind");
- return FALSE;
- }
-
- if (listen (src->control_sock, 5) == -1) {
- perror ("control listen");
- return FALSE;
- }
-
- fcntl (src->control_sock, F_SETFL, O_NONBLOCK);
- case CONTROL_NONE:
- GST_FLAG_SET (src, GST_TCPSRC_OPEN);
- return TRUE;
- break;
- default:
- return FALSE;
- break;
- }
-
- GST_FLAG_SET (src, GST_TCPSRC_OPEN);
-
- return TRUE;
-}
-
-static void
-gst_tcpsrc_close (GstTCPSrc * src)
-{
- if (src->sock != -1) {
- close (src->sock);
- src->sock = -1;
- }
- if (src->control_sock != -1) {
- close (src->control_sock);
- src->control_sock = -1;
- }
- if (src->client_sock != -1) {
- close (src->client_sock);
- src->client_sock = -1;
- }
-
- GST_FLAG_UNSET (src, GST_TCPSRC_OPEN);
-}
-
-static GstElementStateReturn
-gst_tcpsrc_change_state (GstElement * element)
-{
- g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE);
-
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN))
- gst_tcpsrc_close (GST_TCPSRC (element));
- } else {
- if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) {
- if (!gst_tcpsrc_init_receive (GST_TCPSRC (element)))
- return GST_STATE_FAILURE;
- }
- }
-
- if (GST_ELEMENT_CLASS (parent_class)->change_state)
- return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
- return GST_STATE_SUCCESS;
-}
+++ /dev/null
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-
-#ifndef __GST_TCPSRC_H__
-#define __GST_TCPSRC_H__
-
-#include <gst/gst.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
-
-#include <errno.h>
-#include <string.h>
-#include <sys/types.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include "gsttcpplugin.h"
-
-#include <fcntl.h>
-
-#define GST_TYPE_TCPSRC \
- (gst_tcpsrc_get_type())
-#define GST_TCPSRC(obj) \
- (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSRC,GstTCPSrc))
-#define GST_TCPSRC_CLASS(klass) \
- (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSRC,GstTCPSrc))
-#define GST_IS_TCPSRC(obj) \
- (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSRC))
-#define GST_IS_TCPSRC_CLASS(obj) \
- (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSRC))
-
-typedef struct _GstTCPSrc GstTCPSrc;
-typedef struct _GstTCPSrcClass GstTCPSrcClass;
-
-typedef enum {
- GST_TCPSRC_OPEN = GST_ELEMENT_FLAG_LAST,
- GST_TCPSRC_1ST_BUF,
- GST_TCPSRC_CONNECTED,
-
- GST_TCPSRC_FLAG_LAST,
-} GstTCPSrcFlags;
-
-struct _GstTCPSrc {
- GstElement element;
-
- /* pads */
- GstPad *sinkpad,*srcpad;
-
- int port;
- int sock;
- int client_sock;
- int control_sock;
-/* gboolean socket_options;*/
- Gst_TCP_Control control;
-
- struct sockaddr_in myaddr;
- GstClock *clock;
-};
-
-struct _GstTCPSrcClass {
- GstElementClass parent_class;
-};
-
-GType gst_tcpsrc_get_type(void);
-
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
-
-#endif /* __GST_TCPSRC_H__ */