From e9de36e38ca686d3ba7f6915fba2c7855997284d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 5 Jul 2005 10:21:40 +0000 Subject: [PATCH] Ported tcp plugins to 0.9. Original commit message from CVS: * configure.ac: * gst/tcp/Makefile.am: * gst/tcp/README: * gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type), (gst_multifdsink_base_init), (gst_multifdsink_class_init), (gst_multifdsink_init), (gst_multifdsink_remove_client_link), (is_sync_frame), (gst_multifdsink_handle_client_write), (gst_multifdsink_render), (gst_multifdsink_start), (gst_multifdsink_stop), (gst_multifdsink_change_state): * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcp.c: (gst_tcp_host_to_ip), (gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps), (gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps): * gst/tcp/gsttcp.h: * gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init), (gst_tcpclientsink_init), (gst_tcpclientsink_setcaps), (gst_tcpclientsink_render), (gst_tcpclientsink_start), (gst_tcpclientsink_stop), (gst_tcpclientsink_change_state): * gst/tcp/gsttcpclientsink.h: * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type), (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init), (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps), (gst_tcpclientsrc_create), (gst_tcpclientsrc_start), (gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock): * gst/tcp/gsttcpclientsrc.h: * gst/tcp/gsttcpplugin.c: (plugin_init): * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init): * gst/tcp/gsttcpserversink.h: * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type), (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init), (gst_tcpserversrc_init), (gst_tcpserversrc_finalize), (gst_tcpserversrc_create), (gst_tcpserversrc_start), (gst_tcpserversrc_stop): * gst/tcp/gsttcpserversrc.h: * gst/tcp/gsttcpsink.c: * gst/tcp/gsttcpsink.h: * gst/tcp/gsttcpsrc.c: * gst/tcp/gsttcpsrc.h: Ported tcp plugins to 0.9. --- ChangeLog | 43 ++++ configure.ac | 12 ++ gst/tcp/Makefile.am | 4 +- gst/tcp/README | 32 --- gst/tcp/gstmultifdsink.c | 144 ++++++++----- gst/tcp/gstmultifdsink.h | 19 +- gst/tcp/gsttcp.c | 254 ++++++++++++++--------- gst/tcp/gsttcp.h | 10 +- gst/tcp/gsttcpclientsink.c | 194 ++++++++++------- gst/tcp/gsttcpclientsink.h | 22 +- gst/tcp/gsttcpclientsrc.c | 414 ++++++++++++++++--------------------- gst/tcp/gsttcpclientsrc.h | 22 +- gst/tcp/gsttcpplugin.c | 10 +- gst/tcp/gsttcpserversink.c | 8 +- gst/tcp/gsttcpserversink.h | 10 +- gst/tcp/gsttcpserversrc.c | 486 ++++++++++++++++++------------------------- gst/tcp/gsttcpserversrc.h | 18 +- gst/tcp/gsttcpsink.c | 425 -------------------------------------- gst/tcp/gsttcpsink.h | 96 --------- gst/tcp/gsttcpsrc.c | 504 --------------------------------------------- gst/tcp/gsttcpsrc.h | 92 --------- 21 files changed, 837 insertions(+), 1982 deletions(-) delete mode 100644 gst/tcp/gsttcpsink.c delete mode 100644 gst/tcp/gsttcpsink.h delete mode 100644 gst/tcp/gsttcpsrc.c delete mode 100644 gst/tcp/gsttcpsrc.h diff --git a/ChangeLog b/ChangeLog index 5d1fece..b2925e3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,46 @@ +2005-07-05 Wim Taymans + + * configure.ac: + * gst/tcp/Makefile.am: + * gst/tcp/README: + * gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type), + (gst_multifdsink_base_init), (gst_multifdsink_class_init), + (gst_multifdsink_init), (gst_multifdsink_remove_client_link), + (is_sync_frame), (gst_multifdsink_handle_client_write), + (gst_multifdsink_render), (gst_multifdsink_start), + (gst_multifdsink_stop), (gst_multifdsink_change_state): + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcp.c: (gst_tcp_host_to_ip), + (gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps), + (gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps): + * gst/tcp/gsttcp.h: + * gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init), + (gst_tcpclientsink_init), (gst_tcpclientsink_setcaps), + (gst_tcpclientsink_render), (gst_tcpclientsink_start), + (gst_tcpclientsink_stop), (gst_tcpclientsink_change_state): + * gst/tcp/gsttcpclientsink.h: + * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type), + (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init), + (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps), + (gst_tcpclientsrc_create), (gst_tcpclientsrc_start), + (gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock): + * gst/tcp/gsttcpclientsrc.h: + * gst/tcp/gsttcpplugin.c: (plugin_init): + * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init): + * gst/tcp/gsttcpserversink.h: + * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type), + (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init), + (gst_tcpserversrc_init), (gst_tcpserversrc_finalize), + (gst_tcpserversrc_create), (gst_tcpserversrc_start), + (gst_tcpserversrc_stop): + * gst/tcp/gsttcpserversrc.h: + * gst/tcp/gsttcpsink.c: + * gst/tcp/gsttcpsink.h: + * gst/tcp/gsttcpsrc.c: + * gst/tcp/gsttcpsrc.h: + Ported tcp plugins to 0.9. + + 2005-07-05 Andy Wingo * gst/playback/gstplaybasebin.c (fill_buffer): diff --git a/configure.ac b/configure.ac index b1970fe..198ba49 100644 --- a/configure.ac +++ b/configure.ac @@ -237,6 +237,16 @@ fi AC_SUBST(GST_CONTROL_LIBS) +dnl check for gstreamer-dataprotocol; uninstalled is selected preferentially +PKG_CHECK_MODULES(GST_GDP, gstreamer-dataprotocol-$GST_MAJORMINOR >= $GST_REQ, + HAVE_GST_GDP="yes", HAVE_GST_GDP="no") + +if test "x$HAVE_GST_GDP" = "xno"; then + AC_MSG_ERROR(no GStreamer Dataprotocol Libs found) +fi + +AC_SUBST(GST_GDP_LIBS) + PKG_CHECK_MODULES(GST_BASE, gstreamer-base-$GST_MAJORMINOR >= $GST_REQ, HAVE_GST_BASE="yes", HAVE_GST_BASE="no") @@ -375,6 +385,7 @@ GST_PLUGINS_ALL="\ playback \ sine \ subparse \ + tcp \ typefind \ videotestsrc \ videorate \ @@ -883,6 +894,7 @@ gst/ffmpegcolorspace/Makefile gst/playback/Makefile gst/sine/Makefile gst/subparse/Makefile +gst/tcp/Makefile gst/typefind/Makefile gst/videotestsrc/Makefile gst/videorate/Makefile diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am index 7690d2b..bf910db 100644 --- a/gst/tcp/Makefile.am +++ b/gst/tcp/Makefile.am @@ -14,7 +14,6 @@ BUILT_SOURCES = $(built_sources) $(built_headers) libgsttcp_la_SOURCES = \ gsttcpplugin.c \ - gsttcpsrc.c gsttcpsink.c \ gsttcp.c \ gstfdset.c \ gstmultifdsink.c \ @@ -27,11 +26,10 @@ nodist_libgsttcp_la_SOURCES = \ # remove ENABLE_NEW when dataprotocol is stable libgsttcp_la_CFLAGS = $(GST_CFLAGS) -DGST_ENABLE_NEW libgsttcp_la_LIBADD = -libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) +libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_GDP_LIBS) noinst_HEADERS = \ gsttcpplugin.h \ - gsttcpsrc.h gsttcpsink.h \ gsttcp.h \ gstfdset.h \ gstmultifdsink.h \ diff --git a/gst/tcp/README b/gst/tcp/README index 29e26da..589d534 100644 --- a/gst/tcp/README +++ b/gst/tcp/README @@ -4,8 +4,6 @@ This part of the documentation is for the new tcp elements: - tcpserversrc - tcpserversink -which are created to replace the old tcpsrc/tcpsink - TESTS ----- Use these tests to test functionality of the various tcp plugins @@ -31,33 +29,3 @@ TODO ---- - implement DNS resolution --------- - -This is the old documentation for the original tcpsrc/tcpsink elements. - -* What is TCP src/sink? - -solution, like icecast or realaudio or whatever. -But the future RTP plugins shall not do the actual transmission/reception -of packets on the network themselve but the Application developer would be -encouraged to use either the TCP or the UDP plugins for that. UDP would be -used mostly but there could be situations where TCP would be the only -available choice. For example streaming accross firewalls that do not -allow UDP. - -* Shortcomings - -Even given our modest ambitions, the current code doesn't handle -caps negotiation robustly. - -* Todo - -The caps nego should do bi-directional negotiation. - -Perhaps this plugin can be the example of how to do caps negotiation -via a point-to-point protocol. - -12 Sep 2001 -Wim Taymans -Joshua N Pritikin -Zeeshan Ali diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 5c70e3c..590cd4f 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -66,6 +66,11 @@ GST_ELEMENT_DETAILS ("MultiFd sink", "Thomas Vander Stichele , " "Wim Taymans "); +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + GST_DEBUG_CATEGORY (multifdsink_debug); #define GST_CAT_DEFAULT (multifdsink_debug) @@ -215,7 +220,8 @@ static void gst_multifdsink_init (GstMultiFdSink * multifdsink); static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link); -static void gst_multifdsink_chain (GstPad * pad, GstData * _data); +static GstFlowReturn gst_multifdsink_render (GstBaseSink * bsink, + GstBuffer * buf); static GstElementStateReturn gst_multifdsink_change_state (GstElement * element); @@ -250,7 +256,7 @@ gst_multifdsink_get_type (void) }; multifdsink_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstMultiFdSink", + g_type_register_static (GST_TYPE_BASESINK, "GstMultiFdSink", &multifdsink_info, 0); } return multifdsink_type; @@ -261,6 +267,9 @@ gst_multifdsink_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&sinktemplate)); + gst_element_class_set_details (element_class, &gst_multifdsink_details); } @@ -269,11 +278,16 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_BASESINK); - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + gobject_class->set_property = gst_multifdsink_set_property; + gobject_class->get_property = gst_multifdsink_get_property; g_object_class_install_property (gobject_class, ARG_PROTOCOL, g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", @@ -375,11 +389,10 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED, G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS); - gobject_class->set_property = gst_multifdsink_set_property; - gobject_class->get_property = gst_multifdsink_get_property; - gstelement_class->change_state = gst_multifdsink_change_state; + gstbasesink_class->render = gst_multifdsink_render; + klass->add = gst_multifdsink_add; klass->remove = gst_multifdsink_remove; klass->clear = gst_multifdsink_clear; @@ -391,11 +404,6 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) static void gst_multifdsink_init (GstMultiFdSink * this) { - /* create the sink pad */ - this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_element_add_pad (GST_ELEMENT (this), this->sinkpad); - gst_pad_set_chain_function (this->sinkpad, gst_multifdsink_chain); - GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); this->protocol = DEFAULT_PROTOCOL; @@ -636,7 +644,7 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link) client->disconnect_time = GST_TIMEVAL_TO_TIME (now); /* free client buffers */ - g_slist_foreach (client->sending, (GFunc) gst_data_unref, NULL); + g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL); g_slist_free (client->sending); client->sending = NULL; @@ -777,9 +785,9 @@ gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client, static gboolean is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer) { - if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) { + if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { return FALSE; - } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) { + } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) { return TRUE; } return FALSE; @@ -934,7 +942,8 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, /* when using GDP, first check if we have queued caps yet */ if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { if (!client->caps_sent) { - const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad)); + const GstCaps *caps = + GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (sink))); /* queue caps for sending */ res = gst_multifdsink_client_queue_caps (sink, client, caps); @@ -1443,32 +1452,28 @@ gst_multifdsink_thread (GstMultiFdSink * sink) return NULL; } -static void -gst_multifdsink_chain (GstPad * pad, GstData * _data) +static GstFlowReturn +gst_multifdsink_render (GstBaseSink * bsink, GstBuffer * buf) { - GstBuffer *buf = GST_BUFFER (_data); GstMultiFdSink *sink; - g_return_if_fail (pad != NULL); - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (buf != NULL); - sink = GST_MULTIFDSINK (GST_OBJECT_PARENT (pad)); - g_return_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)); + sink = GST_MULTIFDSINK (bsink); - if (GST_IS_EVENT (buf)) { - g_warning ("FIXME: handle events"); - return; - } + /* since we keep this buffer out of the scope of this method */ + gst_buffer_ref (buf); + + g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN), + GST_FLOW_ERROR); GST_LOG_OBJECT (sink, "received buffer %p", buf); /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS, * it means we're getting new streamheader buffers, and we should clear * the old ones */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) && + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) && sink->previous_buffer_in_caps == FALSE) { GST_DEBUG_OBJECT (sink, "receiving new IN_CAPS buffers, clearing old streamheader"); - g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL); + g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL); g_slist_free (sink->streamheader); sink->streamheader = NULL; } @@ -1478,13 +1483,13 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) * After that we return, since we only send these out when we get * non IN_CAPS buffers so we properly keep track of clients that got * streamheaders. */ - if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) { + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) { sink->previous_buffer_in_caps = TRUE; GST_DEBUG_OBJECT (sink, "appending IN_CAPS buffer with length %d to streamheader", GST_BUFFER_SIZE (buf)); sink->streamheader = g_slist_append (sink->streamheader, buf); - return; + return GST_FLOW_OK; } sink->previous_buffer_in_caps = FALSE; @@ -1492,6 +1497,8 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) gst_multifdsink_queue_buffer (sink, buf); sink->bytes_to_serve += GST_BUFFER_SIZE (buf); + + return GST_FLOW_OK; } static void @@ -1617,21 +1624,24 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value, /* create a socket for sending to remote machine */ static gboolean -gst_multifdsink_init_send (GstMultiFdSink * this) +gst_multifdsink_start (GstBaseSink * bsink) { GstMultiFdSinkClass *fclass; int control_socket[2]; + GstMultiFdSink *this; + + if (GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN)) + return TRUE; + this = GST_MULTIFDSINK (bsink); fclass = GST_MULTIFDSINK_GET_CLASS (this); GST_INFO_OBJECT (this, "starting in mode %d", this->mode); this->fdset = gst_fdset_new (this->mode); - if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - return FALSE; - } + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) + goto socket_pair; + READ_SOCKET (this).fd = control_socket[0]; WRITE_SOCKET (this).fd = control_socket[1]; @@ -1653,16 +1663,31 @@ gst_multifdsink_init_send (GstMultiFdSink * this) this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread, this, TRUE, NULL); + GST_FLAG_SET (this, GST_MULTIFDSINK_OPEN); + return TRUE; + + /* ERRORS */ +socket_pair: + { + GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL), + GST_ERROR_SYSTEM); + return FALSE; + } } -static void -gst_multifdsink_close (GstMultiFdSink * this) +static gboolean +gst_multifdsink_stop (GstBaseSink * bsink) { GstMultiFdSinkClass *fclass; + GstMultiFdSink *this; + this = GST_MULTIFDSINK (bsink); fclass = GST_MULTIFDSINK_GET_CLASS (this); + if (!GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN)) + return TRUE; + this->running = FALSE; SEND_COMMAND (this, CONTROL_STOP); @@ -1678,7 +1703,7 @@ gst_multifdsink_close (GstMultiFdSink * this) close (WRITE_SOCKET (this).fd); if (this->streamheader) { - g_slist_foreach (this->streamheader, (GFunc) gst_data_unref, NULL); + g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); g_slist_free (this->streamheader); this->streamheader = NULL; } @@ -1691,46 +1716,55 @@ gst_multifdsink_close (GstMultiFdSink * this) gst_fdset_free (this->fdset); this->fdset = NULL; } + GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); + + return TRUE; } static GstElementStateReturn gst_multifdsink_change_state (GstElement * element) { GstMultiFdSink *sink; + gint transition; + GstElementStateReturn ret; - g_return_val_if_fail (GST_IS_MULTIFDSINK (element), GST_STATE_FAILURE); sink = GST_MULTIFDSINK (element); /* we disallow changing the state from the streaming thread */ if (g_thread_self () == sink->thread) return GST_STATE_FAILURE; - switch (GST_STATE_TRANSITION (element)) { + transition = GST_STATE_TRANSITION (element); + + switch (transition) { case GST_STATE_NULL_TO_READY: - if (!GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) { - if (!gst_multifdsink_init_send (sink)) - return GST_STATE_FAILURE; - GST_FLAG_SET (sink, GST_MULTIFDSINK_OPEN); - } + if (!gst_multifdsink_start (GST_BASESINK (sink))) + goto start_failed; break; case GST_STATE_READY_TO_PAUSED: break; case GST_STATE_PAUSED_TO_PLAYING: break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + switch (transition) { case GST_STATE_PLAYING_TO_PAUSED: break; case GST_STATE_PAUSED_TO_READY: break; case GST_STATE_READY_TO_NULL: - if (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) { - gst_multifdsink_close (GST_MULTIFDSINK (element)); - GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN); - } + gst_multifdsink_stop (GST_BASESINK (sink)); break; } + return ret; - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); - - return GST_STATE_SUCCESS; + /* ERRORS */ +start_failed: + { + return GST_STATE_FAILURE; + } } diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index 6b8fc0b..00828df 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -22,12 +22,10 @@ #ifndef __GST_MULTIFDSINK_H__ #define __GST_MULTIFDSINK_H__ - #include +#include -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #include "gsttcp.h" #include "gstfdset.h" @@ -119,10 +117,7 @@ typedef struct { } GstTCPClient; struct _GstMultiFdSink { - GstElement element; - - /* pad */ - GstPad *sinkpad; + GstBaseSink element; guint64 bytes_to_serve; /* how much bytes we must serve */ guint64 bytes_served; /* how much bytes have we served */ @@ -161,7 +156,7 @@ struct _GstMultiFdSink { }; struct _GstMultiFdSinkClass { - GstElementClass parent_class; + GstBaseSinkClass parent_class; /* element methods */ void (*add) (GstMultiFdSink *sink, int fd); @@ -187,10 +182,6 @@ void gst_multifdsink_remove (GstMultiFdSink *sink, int fd); void gst_multifdsink_clear (GstMultiFdSink *sink); GValueArray* gst_multifdsink_get_stats (GstMultiFdSink *sink, int fd); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - +G_END_DECLS #endif /* __GST_MULTIFDSINK_H__ */ diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c index 4e13956..99eee84 100644 --- a/gst/tcp/gsttcp.c +++ b/gst/tcp/gsttcp.c @@ -56,35 +56,42 @@ gst_tcp_host_to_ip (GstElement * element, const gchar * host) struct in_addr addr; GST_DEBUG_OBJECT (element, "resolving host %s", host); + /* first check if it already is an IP address */ if (inet_aton (host, &addr)) { ip = g_strdup (host); goto beach; } - /* FIXME: could do a localhost check here */ /* perform a name lookup */ - hostinfo = gethostbyname (host); - if (!hostinfo) { - GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), - ("Could not find IP address for host \"%s\".", host)); - return NULL; - } + if (!(hostinfo = gethostbyname (host))) + goto resolve_error; - if (hostinfo->h_addrtype != AF_INET) { - GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), - ("host \"%s\" is not an IP host", host)); - return NULL; - } + if (hostinfo->h_addrtype != AF_INET) + goto not_ip; addrs = hostinfo->h_addr_list; + /* There could be more than one IP address, but we just return the first */ ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs)); beach: GST_DEBUG_OBJECT (element, "resolved to IP %s", ip); return ip; + +resolve_error: + { + GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), + ("Could not find IP address for host \"%s\".", host)); + return NULL; + } +not_ip: + { + GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), + ("host \"%s\" is not an IP host", host)); + return NULL; + } } /* write buffer to given socket incrementally. @@ -149,15 +156,14 @@ gst_tcp_socket_close (int *socket) *socket = -1; } -/* read the gdp buffer header from the given socket +/* read a buffer from the given socket * returns: - * - a GstData representing a GstBuffer in which data should be read - * - a GstData representing a GstEvent + * - a GstBuffer in which data should be read * - NULL, indicating a connection close or an error, to be handled with * EOS */ -GstData * -gst_tcp_gdp_read_header (GstElement * this, int socket) +GstBuffer * +gst_tcp_gdp_read_buffer (GstElement * this, int socket) { size_t header_length = GST_DP_HEADER_LENGTH; size_t readsize; @@ -169,36 +175,51 @@ gst_tcp_gdp_read_header (GstElement * this, int socket) readsize = header_length; GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize); - ret = gst_tcp_socket_read (socket, header, readsize); - /* if we read 0 bytes, and we're blocking, we hit eos */ - if (ret == 0) { - GST_DEBUG ("blocking read returns 0, returning NULL"); - g_free (header); - return NULL; - } - if (ret < 0) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - g_free (header); - return NULL; + if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0) + goto read_error; + + if (ret != readsize) + goto short_read; + + if (!gst_dp_validate_header (header_length, header)) + goto validate_error; + + GST_LOG_OBJECT (this, "validated buffer packet header"); + + buffer = gst_dp_buffer_from_header (header_length, header); + g_free (header); + + GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer); + + return buffer; + + /* ERRORS */ +read_error: + { + if (ret == 0) { + /* if we read 0 bytes, and we're blocking, we hit eos */ + GST_DEBUG ("blocking read returns 0, returning NULL"); + g_free (header); + return NULL; + } else { + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + g_free (header); + return NULL; + } } - if (ret != readsize) { +short_read: + { + GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); + return NULL; } - g_assert (ret == readsize); - - if (!gst_dp_validate_header (header_length, header)) { +validate_error: + { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("GDP buffer packet header does not validate")); g_free (header); return NULL; } - GST_LOG_OBJECT (this, "validated buffer packet header"); - - buffer = gst_dp_buffer_from_header (header_length, header); - g_free (header); - - GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer); - return GST_DATA (buffer); } /* read the GDP caps packet from the given socket @@ -218,88 +239,122 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket) readsize = header_length; GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize); - ret = gst_tcp_socket_read (socket, header, readsize); - if (ret < 0) { - g_free (header); - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - return NULL; - } - if (ret == 0) { - GST_WARNING_OBJECT (this, "read returned EOF"); - return NULL; + if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0) + goto read_error; + + if (ret != readsize) + goto short_read; + + if (!gst_dp_validate_header (header_length, header)) + goto validate_error; + + readsize = gst_dp_header_payload_length (header); + payload = g_malloc (readsize); + + GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize); + if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0) + goto socket_read_error; + + if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) + goto is_not_caps; + + g_assert (ret == readsize); + + if (!gst_dp_validate_payload (readsize, header, payload)) + goto packet_validate_error; + + caps = gst_dp_caps_from_packet (header_length, header, payload); + string = gst_caps_to_string (caps); + GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string); + g_free (string); + + g_free (header); + g_free (payload); + + return caps; + + /* ERRORS */ +read_error: + { + if (ret < 0) { + g_free (header); + GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + return NULL; + } + if (ret == 0) { + GST_WARNING_OBJECT (this, "read returned EOF"); + return NULL; + } } - if (ret != readsize) { +short_read: + { GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes", readsize, ret); return NULL; } - - if (!gst_dp_validate_header (header_length, header)) { +validate_error: + { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("GDP caps packet header does not validate")); g_free (header); return NULL; } - - readsize = gst_dp_header_payload_length (header); - payload = g_malloc (readsize); - GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize); - ret = gst_tcp_socket_read (socket, payload, readsize); - - if (ret < 0) { +socket_read_error: + { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); g_free (header); g_free (payload); return NULL; } - if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) { +is_not_caps: + { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("Header read doesn't describe CAPS payload")); g_free (header); g_free (payload); return NULL; } - g_assert (ret == readsize); - - if (!gst_dp_validate_payload (readsize, header, payload)) { +packet_validate_error: + { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), ("GDP caps packet payload does not validate")); g_free (header); g_free (payload); return NULL; } - - caps = gst_dp_caps_from_packet (header_length, header, payload); - string = gst_caps_to_string (caps); - GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string); - g_free (string); - - g_free (header); - g_free (payload); - - return caps; } /* write a GDP header to the socket. Return false if fails. */ gboolean -gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer, +gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer, gboolean fatal, const gchar * host, int port) { guint length; guint8 *header; size_t wrote; - if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) { + if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) + goto create_error; + + GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length); + wrote = gst_tcp_socket_write (socket, header, length); + g_free (header); + + if (wrote != length) + goto write_error; + + return TRUE; + + /* ERRORS */ +create_error: + { if (fatal) GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL), ("Could not create GDP header from buffer")); return FALSE; } - - GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length); - wrote = gst_tcp_socket_write (socket, header, length); - g_free (header); - if (wrote != length) { +write_error: + { if (fatal) GST_ELEMENT_ERROR (this, RESOURCE, WRITE, (_("Error while sending data to \"%s:%d\"."), host, port), @@ -307,8 +362,6 @@ gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer, wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno))); return FALSE; } - - return TRUE; } /* write GDP header and payload to the given socket for the given caps. @@ -322,15 +375,36 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps, guint8 *payload; size_t wrote; - if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { + if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) + goto create_error; + + GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length); + wrote = gst_tcp_socket_write (socket, header, length); + if (wrote != length) + goto write_header_error; + + length = gst_dp_header_payload_length (header); + g_free (header); + + GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length); + wrote = gst_tcp_socket_write (socket, payload, length); + g_free (payload); + + if (wrote != length) + goto write_payload_error; + + return TRUE; + + /* ERRORS */ +create_error: + { if (fatal) GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL), ("Could not create GDP packet from caps")); return FALSE; } - GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length); - wrote = gst_tcp_socket_write (socket, header, length); - if (wrote != length) { +write_header_error: + { g_free (header); g_free (payload); if (fatal) @@ -340,13 +414,8 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps, wrote, length, g_strerror (errno))); return FALSE; } - - length = gst_dp_header_payload_length (header); - g_free (header); - GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length); - wrote = gst_tcp_socket_write (socket, payload, length); - g_free (payload); - if (wrote != length) { +write_payload_error: + { if (fatal) GST_ELEMENT_ERROR (this, RESOURCE, WRITE, (_("Error while sending gdp payload data to \"%s:%d\"."), host, port), @@ -354,5 +423,4 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps, wrote, length, g_strerror (errno))); return FALSE; } - return TRUE; } diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h index cce6ad2..63ed526 100644 --- a/gst/tcp/gsttcp.h +++ b/gst/tcp/gsttcp.h @@ -46,11 +46,13 @@ gint gst_tcp_socket_read (int socket, void *buf, size_t count); void gst_tcp_socket_close (int *socket); -GstData * gst_tcp_gdp_read_header (GstElement *this, int socket); -GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket); +GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket); +GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket); +GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket); -gboolean gst_tcp_gdp_write_header (GstElement *this, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port); -gboolean gst_tcp_gdp_write_caps (GstElement *this, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port); +gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port); +gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port); +gboolean gst_tcp_gdp_write_caps (GstElement *elem, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port); G_END_DECLS diff --git a/gst/tcp/gsttcpclientsink.c b/gst/tcp/gsttcpclientsink.c index 424b098..4e3f5a8 100644 --- a/gst/tcp/gsttcpclientsink.c +++ b/gst/tcp/gsttcpclientsink.c @@ -58,10 +58,9 @@ static void gst_tcpclientsink_class_init (GstTCPClientSink * klass); static void gst_tcpclientsink_init (GstTCPClientSink * tcpclientsink); static void gst_tcpclientsink_finalize (GObject * gobject); -static void gst_tcpclientsink_set_clock (GstElement * element, - GstClock * clock); - -static void gst_tcpclientsink_chain (GstPad * pad, GstData * _data); +static gboolean gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps); +static GstFlowReturn gst_tcpclientsink_render (GstBaseSink * bsink, + GstBuffer * buf); static GstElementStateReturn gst_tcpclientsink_change_state (GstElement * element); @@ -115,11 +114,17 @@ gst_tcpclientsink_class_init (GstTCPClientSink * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_BASESINK); - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + gobject_class->set_property = gst_tcpclientsink_set_property; + gobject_class->get_property = gst_tcpclientsink_get_property; + gobject_class->finalize = gst_tcpclientsink_finalize; g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, g_param_spec_string ("host", "Host", "The host/IP to send the packets to", @@ -131,44 +136,24 @@ gst_tcpclientsink_class_init (GstTCPClientSink * klass) g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, G_PARAM_READWRITE)); - gobject_class->set_property = gst_tcpclientsink_set_property; - gobject_class->get_property = gst_tcpclientsink_get_property; - gobject_class->finalize = gst_tcpclientsink_finalize; gstelement_class->change_state = gst_tcpclientsink_change_state; - gstelement_class->set_clock = gst_tcpclientsink_set_clock; - - GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink"); -} - -static void -gst_tcpclientsink_set_clock (GstElement * element, GstClock * clock) -{ - GstTCPClientSink *tcpclientsink; - tcpclientsink = GST_TCPCLIENTSINK (element); + gstbasesink_class->set_caps = gst_tcpclientsink_setcaps; + gstbasesink_class->render = gst_tcpclientsink_render; - tcpclientsink->clock = clock; + GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink"); } static void gst_tcpclientsink_init (GstTCPClientSink * this) { - /* create the sink pad */ - this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_element_add_pad (GST_ELEMENT (this), this->sinkpad); - gst_pad_set_chain_function (this->sinkpad, gst_tcpclientsink_chain); - this->host = g_strdup (TCP_DEFAULT_HOST); this->port = TCP_DEFAULT_PORT; - /* should support as minimum 576 for IPV4 and 1500 for IPV6 */ - /* this->mtu = 1500; */ this->sock_fd = -1; this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN); - - this->clock = NULL; } static void @@ -179,24 +164,12 @@ gst_tcpclientsink_finalize (GObject * gobject) g_free (this->host); } -static void -gst_tcpclientsink_chain (GstPad * pad, GstData * _data) +static gboolean +gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps) { - size_t wrote = 0; - - GstBuffer *buf = GST_BUFFER (_data); GstTCPClientSink *sink; - g_return_if_fail (pad != NULL); - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (buf != NULL); - sink = GST_TCPCLIENTSINK (GST_OBJECT_PARENT (pad)); - g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN)); - - if (GST_IS_EVENT (buf)) { - g_warning ("FIXME: handl events"); - return; - } + sink = GST_TCPCLIENTSINK (bsink); /* write the buffer header if we have one */ switch (sink->protocol) { @@ -209,44 +182,85 @@ gst_tcpclientsink_chain (GstPad * pad, GstData * _data) const GstCaps *caps; gchar *string; - caps = GST_PAD_CAPS (GST_PAD_PEER (pad)); + caps = GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (bsink))); string = gst_caps_to_string (caps); GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string); - if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps, - TRUE, sink->host, sink->port)) { - g_free (string); - return; - } g_free (string); + + if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps, + TRUE, sink->host, sink->port)) + goto gdp_write_error; + sink->caps_sent = TRUE; } + break; + default: + g_warning ("Unhandled protocol type"); + break; + } + + return TRUE; + + /* ERRORS */ +gdp_write_error: + { + return FALSE; + } +} + +static GstFlowReturn +gst_tcpclientsink_render (GstBaseSink * bsink, GstBuffer * buf) +{ + size_t wrote = 0; + GstTCPClientSink *sink; + gint size; + + sink = GST_TCPCLIENTSINK (bsink); + + g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN), + GST_FLOW_WRONG_STATE); + + size = GST_BUFFER_SIZE (buf); + + GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", size); + + /* write the buffer header if we have one */ + switch (sink->protocol) { + case GST_TCP_PROTOCOL_TYPE_NONE: + break; + case GST_TCP_PROTOCOL_TYPE_GDP: GST_LOG_OBJECT (sink, "Sending buffer header through GDP"); - if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), sink->sock_fd, buf, + if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf, TRUE, sink->host, sink->port)) - return; + goto gdp_write_error; break; default: - g_warning ("Unhandled protocol type"); break; } - GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", - GST_BUFFER_SIZE (buf)); - wrote = - gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), - GST_BUFFER_SIZE (buf)); + /* write buffer data */ + wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size); + + if (wrote < size) + goto write_error; - if (wrote < GST_BUFFER_SIZE (buf)) { + sink->data_written += wrote; + + return GST_FLOW_OK; + + /* ERRORS */ +gdp_write_error: + { + return FALSE; + } +write_error: + { GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port), ("Only %d of %d bytes written: %s", wrote, GST_BUFFER_SIZE (buf), g_strerror (errno))); + return GST_FLOW_ERROR; } - sink->data_written += wrote; - - gst_buffer_unref (buf); - - /* FIXME: emit signal ? */ } static void @@ -311,11 +325,14 @@ gst_tcpclientsink_get_property (GObject * object, guint prop_id, GValue * value, /* create a socket for sending to remote machine */ static gboolean -gst_tcpclientsink_init_send (GstTCPClientSink * this) +gst_tcpclientsink_start (GstTCPClientSink * this) { int ret; gchar *ip; + if (GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN)) + return TRUE; + /* reset caps_sent flag */ this->caps_sent = FALSE; @@ -373,34 +390,53 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this) return TRUE; } -static void -gst_tcpclientsink_close (GstTCPClientSink * this) +static gboolean +gst_tcpclientsink_stop (GstTCPClientSink * this) { + if (!GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN)) + return TRUE; + if (this->sock_fd != -1) { close (this->sock_fd); this->sock_fd = -1; } GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN); + + return TRUE; } static GstElementStateReturn gst_tcpclientsink_change_state (GstElement * element) { - g_return_val_if_fail (GST_IS_TCPCLIENTSINK (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) - gst_tcpclientsink_close (GST_TCPCLIENTSINK (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) { - if (!gst_tcpclientsink_init_send (GST_TCPCLIENTSINK (element))) - return GST_STATE_FAILURE; - } + GstTCPClientSink *sink; + gint transition; + GstElementStateReturn res; + + sink = GST_TCPCLIENTSINK (element); + transition = GST_STATE_TRANSITION (element); + + switch (transition) { + case GST_STATE_NULL_TO_READY: + case GST_STATE_READY_TO_PAUSED: + if (!gst_tcpclientsink_start (GST_TCPCLIENTSINK (element))) + goto start_failure; + break; + default: + break; } + res = GST_ELEMENT_CLASS (parent_class)->change_state (element); - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + switch (transition) { + case GST_STATE_READY_TO_NULL: + gst_tcpclientsink_stop (GST_TCPCLIENTSINK (element)); + default: + break; + } + return res; - return GST_STATE_SUCCESS; +start_failure: + { + return GST_STATE_FAILURE; + } } diff --git a/gst/tcp/gsttcpclientsink.h b/gst/tcp/gsttcpclientsink.h index 9cd657a..f7254e2 100644 --- a/gst/tcp/gsttcpclientsink.h +++ b/gst/tcp/gsttcpclientsink.h @@ -23,11 +23,11 @@ #include +#include + #include "gsttcp.h" -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #include #include @@ -65,10 +65,7 @@ typedef enum { } GstTCPClientSinkFlags; struct _GstTCPClientSink { - GstElement element; - - /* pad */ - GstPad *sinkpad; + GstBaseSink element; /* server information */ int port; @@ -81,21 +78,14 @@ struct _GstTCPClientSink { size_t data_written; /* how much bytes have we written ? */ GstTCPProtocolType protocol; /* used with the protocol enum */ gboolean caps_sent; /* whether or not we sent caps already */ - - guint mtu; - GstClock *clock; }; struct _GstTCPClientSinkClass { - GstElementClass parent_class; + GstBaseSinkClass parent_class; }; GType gst_tcpclientsink_get_type(void); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - +G_END_DECLS #endif /* __GST_TCPCLIENTSINK_H__ */ diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c index 0d96a74..8100164 100644 --- a/gst/tcp/gsttcpclientsrc.c +++ b/gst/tcp/gsttcpclientsrc.c @@ -47,6 +47,11 @@ GST_ELEMENT_DETAILS ("TCP Client source", "Receive data as a client over the network via TCP", "Thomas Vander Stichele "); +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + /* TCPClientSrc signals and args */ enum { @@ -66,17 +71,18 @@ static void gst_tcpclientsrc_class_init (GstTCPClientSrc * klass); static void gst_tcpclientsrc_init (GstTCPClientSrc * tcpclientsrc); static void gst_tcpclientsrc_finalize (GObject * gobject); -static GstCaps *gst_tcpclientsrc_getcaps (GstPad * pad); +static GstCaps *gst_tcpclientsrc_getcaps (GstBaseSrc * psrc); -static GstData *gst_tcpclientsrc_get (GstPad * pad); -static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement * - element); +static GstFlowReturn gst_tcpclientsrc_create (GstPushSrc * psrc, + GstBuffer ** outbuf); +static gboolean gst_tcpclientsrc_stop (GstBaseSrc * bsrc); +static gboolean gst_tcpclientsrc_start (GstBaseSrc * bsrc); +static gboolean gst_tcpclientsrc_unlock (GstBaseSrc * bsrc); static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static void gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock); static GstElementClass *parent_class = NULL; @@ -87,7 +93,6 @@ gst_tcpclientsrc_get_type (void) { static GType tcpclientsrc_type = 0; - if (!tcpclientsrc_type) { static const GTypeInfo tcpclientsrc_info = { sizeof (GstTCPClientSrcClass), @@ -103,7 +108,7 @@ gst_tcpclientsrc_get_type (void) }; tcpclientsrc_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSrc", + g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPClientSrc", &tcpclientsrc_info, 0); } return tcpclientsrc_type; @@ -114,6 +119,9 @@ gst_tcpclientsrc_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&srctemplate)); + gst_element_class_set_details (element_class, &gst_tcpclientsrc_details); } @@ -122,11 +130,19 @@ gst_tcpclientsrc_class_init (GstTCPClientSrc * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstBaseSrcClass *gstbasesrc_class; + GstPushSrcClass *gstpushsrc_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpushsrc_class = (GstPushSrcClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_PUSHSRC); - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + gobject_class->set_property = gst_tcpclientsrc_set_property; + gobject_class->get_property = gst_tcpclientsrc_get_property; + gobject_class->finalize = gst_tcpclientsrc_finalize; g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, g_param_spec_string ("host", "Host", @@ -140,43 +156,28 @@ gst_tcpclientsrc_class_init (GstTCPClientSrc * klass) GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, G_PARAM_READWRITE)); - gobject_class->set_property = gst_tcpclientsrc_set_property; - gobject_class->get_property = gst_tcpclientsrc_get_property; - gobject_class->finalize = gst_tcpclientsrc_finalize; + gstbasesrc_class->get_caps = gst_tcpclientsrc_getcaps; + gstbasesrc_class->start = gst_tcpclientsrc_start; + gstbasesrc_class->stop = gst_tcpclientsrc_stop; + gstbasesrc_class->unlock = gst_tcpclientsrc_unlock; - gstelement_class->change_state = gst_tcpclientsrc_change_state; - gstelement_class->set_clock = gst_tcpclientsrc_set_clock; + gstpushsrc_class->create = gst_tcpclientsrc_create; GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0, "TCP Client Source"); } static void -gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock) -{ - GstTCPClientSrc *tcpclientsrc; - - tcpclientsrc = GST_TCPCLIENTSRC (element); - - tcpclientsrc->clock = clock; -} - -static void gst_tcpclientsrc_init (GstTCPClientSrc * this) { - /* create the src pad */ - this->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (this), this->srcpad); - gst_pad_set_get_function (this->srcpad, gst_tcpclientsrc_get); - gst_pad_set_getcaps_function (this->srcpad, gst_tcpclientsrc_getcaps); - this->port = TCP_DEFAULT_PORT; this->host = g_strdup (TCP_DEFAULT_HOST); - this->clock = NULL; this->sock_fd = -1; this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; - this->curoffset = 0; this->caps = NULL; + this->curoffset = 0; + + gst_base_src_set_live (GST_BASESRC (this), TRUE); GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN); } @@ -190,12 +191,12 @@ gst_tcpclientsrc_finalize (GObject * gobject) } static GstCaps * -gst_tcpclientsrc_getcaps (GstPad * pad) +gst_tcpclientsrc_getcaps (GstBaseSrc * bsrc) { GstTCPClientSrc *src; GstCaps *caps = NULL; - src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad)); + src = GST_TCPCLIENTSRC (bsrc); if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) caps = gst_caps_new_any (); @@ -208,77 +209,20 @@ gst_tcpclientsrc_getcaps (GstPad * pad) return caps; } -/* close the socket and associated resources - * unset OPEN flag - * used both to recover from errors and go to NULL state */ -static void -gst_tcpclientsrc_close (GstTCPClientSrc * this) -{ - GST_DEBUG_OBJECT (this, "closing socket"); - if (this->sock_fd != -1) { - close (this->sock_fd); - this->sock_fd = -1; - } - this->caps_received = FALSE; - if (this->caps) { - gst_caps_free (this->caps); - this->caps = NULL; - } - GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN); -} - -/* close socket and related items and return an EOS GstData - * called from _get */ -static GstData * -gst_tcpclientsrc_eos (GstTCPClientSrc * src) -{ - GST_DEBUG_OBJECT (src, "going to EOS"); - gst_element_set_eos (GST_ELEMENT (src)); - gst_tcpclientsrc_close (src); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); -} - -static GstData * -gst_tcpclientsrc_get (GstPad * pad) +static GstFlowReturn +gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPClientSrc *src; size_t readsize; int ret; - - GstData *data = NULL; GstBuffer *buf = NULL; - g_return_val_if_fail (pad != NULL, NULL); - g_return_val_if_fail (GST_IS_PAD (pad), NULL); - src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad)); - if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) { - GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data"); - return NULL; - } - GST_LOG_OBJECT (src, "asked for a buffer"); + src = GST_TCPCLIENTSRC (psrc); - /* try to negotiate here */ - if (!gst_pad_is_negotiated (pad)) { - if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) { - GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM); - gst_buffer_unref (buf); - return gst_tcpclientsrc_eos (src); - } - } + if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) + goto wrong_state; - /* if we have a left over buffer after a discont, return that */ - if (src->buffer_after_discont) { - buf = src->buffer_after_discont; - GST_LOG_OBJECT (src, - "Returning buffer after discont of size %d, ts %" - GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT - ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, - GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), - GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf)); - src->buffer_after_discont = NULL; - return GST_DATA (buf); - } + GST_LOG_OBJECT (src, "asked for a buffer"); /* read the buffer header if we're using a protocol */ switch (src->protocol) { @@ -288,101 +232,51 @@ gst_tcpclientsrc_get (GstPad * pad) /* do a blocking select on the socket */ FD_ZERO (&testfds); FD_SET (src->sock_fd, &testfds); - ret = select (src->sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, 0); + /* no action (0) is an error too in our case */ - if (ret <= 0) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return gst_tcpclientsrc_eos (src); - } + if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0) + goto select_error; /* ask how much is available for reading on the socket */ - ret = ioctl (src->sock_fd, FIONREAD, &readsize); - if (ret < 0) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("ioctl failed: %s", g_strerror (errno))); - return gst_tcpclientsrc_eos (src); - } + if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0) + goto ioctl_error; + GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize); + buf = gst_buffer_new_and_alloc (readsize); break; - case GST_TCP_PROTOCOL_TYPE_GDP: - if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) { - return gst_tcpclientsrc_eos (src); - } - if (GST_IS_EVENT (data)) { - /* if we got back an EOS event, then we should go into eos ourselves */ - if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) { - gst_event_unref (data); - return gst_tcpclientsrc_eos (src); - } - return data; - } - buf = GST_BUFFER (data); + case GST_TCP_PROTOCOL_TYPE_GDP: + if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd))) + goto hit_eos; GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p", buf); + /* use this new buffer to read data into */ readsize = GST_BUFFER_SIZE (buf); break; default: - g_warning ("Unhandled protocol type"); + /* need to assert as buf == NULL */ + g_assert ("Unhandled protocol type"); break; } GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize); - ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize); - if (ret < 0) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - gst_buffer_unref (buf); - return gst_tcpclientsrc_eos (src); - } + if ((ret = + gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), + readsize)) < 0) + goto read_error; /* if we read 0 bytes, and we're blocking, we hit eos */ - if (ret == 0) { - GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS"); - gst_buffer_unref (buf); - return gst_tcpclientsrc_eos (src); - } + if (ret == 0) + goto zero_read; readsize = ret; GST_BUFFER_SIZE (buf) = readsize; - GST_BUFFER_MAXSIZE (buf) = readsize; - - /* FIXME: we could decide to set OFFSET and OFFSET_END for non-protocol - * streams to mean the bytes processed */ - - /* if this is our first buffer, we need to send a discont with the - * given timestamp or the current offset, and store the buffer for - * the next iteration through the get loop */ - if (src->send_discont) { - GstClockTime timestamp; - GstEvent *event; - - src->send_discont = FALSE; - src->buffer_after_discont = buf; - /* if the timestamp is valid, send a timed discont - * taking into account the incoming buffer's timestamps */ - timestamp = GST_BUFFER_TIMESTAMP (buf); - if (GST_CLOCK_TIME_IS_VALID (timestamp)) { - GST_DEBUG_OBJECT (src, - "sending discontinuous with timestamp %" GST_TIME_FORMAT, - GST_TIME_ARGS (timestamp)); - event = - gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, timestamp, NULL); - return GST_DATA (event); - } - /* otherwise, send an offset discont */ - GST_DEBUG_OBJECT (src, "sending discontinuous with offset %d", - src->curoffset); - event = - gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset, - NULL); - return GST_DATA (event); - } src->curoffset += readsize; + GST_LOG_OBJECT (src, "Returning buffer from _get of size %d, ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT @@ -390,7 +284,47 @@ gst_tcpclientsrc_get (GstPad * pad) GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), GST_TIME_ARGS (GST_BUFFER_DURATION (buf)), GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf)); - return GST_DATA (buf); + + gst_buffer_set_caps (buf, src->caps); + + *outbuf = buf; + + return GST_FLOW_OK; + + /* ERRORS */ +wrong_state: + { + GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data"); + return GST_FLOW_WRONG_STATE; + } +select_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("select failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +ioctl_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("ioctl failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +hit_eos: + { + return GST_FLOW_WRONG_STATE; + } +read_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +zero_read: + { + GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS"); + gst_buffer_unref (buf); + return GST_FLOW_WRONG_STATE; + } } static void @@ -453,109 +387,125 @@ gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value, /* create a socket for connecting to remote server */ static gboolean -gst_tcpclientsrc_init_receive (GstTCPClientSrc * this) +gst_tcpclientsrc_start (GstBaseSrc * bsrc) { int ret; gchar *ip; + GstTCPClientSrc *src = GST_TCPCLIENTSRC (bsrc); /* create receiving client socket */ - GST_DEBUG_OBJECT (this, "opening receiving client socket to %s:%d", - this->host, this->port); - if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } - GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d", - this->sock_fd); - GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN); + GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d", + src->host, src->port); + + if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) + goto no_socket; + + GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d", + src->sock_fd); + GST_FLAG_SET (src, GST_TCPCLIENTSRC_OPEN); /* look up name if we need to */ - ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); - if (!ip) { - gst_tcpclientsrc_close (this); - return FALSE; - } - GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip); + if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) + goto name_resolv; + + GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip); /* connect to server */ - memset (&this->server_sin, 0, sizeof (this->server_sin)); - this->server_sin.sin_family = AF_INET; /* network socket */ - this->server_sin.sin_port = htons (this->port); /* on port */ - this->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */ + memset (&src->server_sin, 0, sizeof (src->server_sin)); + src->server_sin.sin_family = AF_INET; /* network socket */ + src->server_sin.sin_port = htons (src->port); /* on port */ + src->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */ g_free (ip); - GST_DEBUG_OBJECT (this, "connecting to server"); - ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin, - sizeof (this->server_sin)); + GST_DEBUG_OBJECT (src, "connecting to server"); + ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin, + sizeof (src->server_sin)); if (ret) { - gst_tcpclientsrc_close (this); + gst_tcpclientsrc_stop (GST_BASESRC (src)); switch (errno) { case ECONNREFUSED: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, - (_("Connection to %s:%d refused."), this->host, this->port), - (NULL)); + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, + (_("Connection to %s:%d refused."), src->host, src->port), (NULL)); return FALSE; break; default: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), - ("connect to %s:%d failed: %s", this->host, this->port, + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), + ("connect to %s:%d failed: %s", src->host, src->port, g_strerror (errno))); return FALSE; break; } } - this->send_discont = TRUE; - this->buffer_after_discont = NULL; - /* get the caps if we're using GDP */ - if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { + if (src->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { /* if we haven't received caps yet, we should get them first */ - if (!this->caps_received) { + if (!src->caps_received) { GstCaps *caps; - GST_DEBUG_OBJECT (this, "getting caps through GDP"); - if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) { - gst_tcpclientsrc_close (this); - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("Could not read caps through GDP")); - return FALSE; - } - if (!GST_IS_CAPS (caps)) { - gst_tcpclientsrc_close (this); - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("Could not read caps through GDP")); - return FALSE; - } - GST_DEBUG_OBJECT (this, "Received caps through GDP: %" GST_PTR_FORMAT, + GST_DEBUG_OBJECT (src, "getting caps through GDP"); + if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd))) + goto no_caps; + + if (!GST_IS_CAPS (caps)) + goto no_caps; + + GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT, caps); - this->caps_received = TRUE; - this->caps = caps; + + src->caps_received = TRUE; + src->caps = caps; } } return TRUE; + +no_socket: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); + return FALSE; + } +name_resolv: + { + gst_tcpclientsrc_stop (GST_BASESRC (src)); + return FALSE; + } +no_caps: + { + gst_tcpclientsrc_stop (GST_BASESRC (src)); + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Could not read caps through GDP")); + return FALSE; + } } -static GstElementStateReturn -gst_tcpclientsrc_change_state (GstElement * element) +/* close the socket and associated resources + * unset OPEN flag + * used both to recover from errors and go to NULL state */ +static gboolean +gst_tcpclientsrc_stop (GstBaseSrc * bsrc) { - g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE); + GstTCPClientSrc *src; - /* if open and going to NULL, close it */ - if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) && - GST_STATE_PENDING (element) == GST_STATE_NULL) { - gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element)); + src = GST_TCPCLIENTSRC (bsrc); + + GST_DEBUG_OBJECT (src, "closing socket"); + if (src->sock_fd != -1) { + close (src->sock_fd); + src->sock_fd = -1; } - /* if closed and going to a state higher than NULL, open it */ - if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) && - GST_STATE_PENDING (element) > GST_STATE_NULL) { - if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element))) - return GST_STATE_FAILURE; + src->caps_received = FALSE; + if (src->caps) { + gst_caps_unref (src->caps); + src->caps = NULL; } + GST_FLAG_UNSET (src, GST_TCPCLIENTSRC_OPEN); - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + return TRUE; +} - return GST_STATE_SUCCESS; +static gboolean +gst_tcpclientsrc_unlock (GstBaseSrc * bsrc) +{ + return TRUE; } diff --git a/gst/tcp/gsttcpclientsrc.h b/gst/tcp/gsttcpclientsrc.h index f09a0b2..2ccaa39 100644 --- a/gst/tcp/gsttcpclientsrc.h +++ b/gst/tcp/gsttcpclientsrc.h @@ -23,16 +23,16 @@ #define __GST_TCPCLIENTSRC_H__ #include +#include -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #include /* sockaddr_in */ #include #include #include /* sockaddr_in */ #include + #include "gsttcp.h" #define GST_TYPE_TCPCLIENTSRC \ @@ -56,10 +56,7 @@ typedef enum { } GstTCPClientSrcFlags; struct _GstTCPClientSrc { - GstElement element; - - /* pad */ - GstPad *srcpad; + GstPushSrc element; /* server information */ int port; @@ -75,21 +72,14 @@ struct _GstTCPClientSrc { GstTCPProtocolType protocol; /* protocol used for reading data */ gboolean caps_received; /* if we have received caps yet */ GstCaps *caps; - GstClock *clock; - - gboolean send_discont; /* TRUE when we need to send a discont */ - GstBuffer *buffer_after_discont; /* temporary storage for buffer */ }; struct _GstTCPClientSrcClass { - GstElementClass parent_class; + GstPushSrcClass parent_class; }; GType gst_tcpclientsrc_get_type (void); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ +G_END_DECLS #endif /* __GST_TCPCLIENTSRC_H__ */ diff --git a/gst/tcp/gsttcpplugin.c b/gst/tcp/gsttcpplugin.c index 369bdd3..8ab6471 100644 --- a/gst/tcp/gsttcpplugin.c +++ b/gst/tcp/gsttcpplugin.c @@ -21,8 +21,7 @@ #include "config.h" #endif -#include "gsttcpsrc.h" -#include "gsttcpsink.h" +#include #include "gsttcpclientsrc.h" #include "gsttcpclientsink.h" #include "gsttcpserversrc.h" @@ -34,12 +33,7 @@ GST_DEBUG_CATEGORY (tcp_debug); static gboolean plugin_init (GstPlugin * plugin) { - if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE, - GST_TYPE_TCPSINK)) - return FALSE; - - if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC)) - return FALSE; + gst_dp_init (); if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE, GST_TYPE_TCPCLIENTSINK)) diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index 4dd513a..bb92348 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -119,6 +119,10 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass) parent_class = g_type_class_ref (GST_TYPE_MULTIFDSINK); + gobject_class->set_property = gst_tcpserversink_set_property; + gobject_class->get_property = gst_tcpserversink_get_property; + gobject_class->finalize = gst_tcpserversink_finalize; + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, g_param_spec_string ("host", "host", "The host/IP to send the packets to", TCP_DEFAULT_HOST, G_PARAM_READWRITE)); @@ -126,10 +130,6 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass) g_param_spec_int ("port", "port", "The port to send the packets to", 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); - gobject_class->set_property = gst_tcpserversink_set_property; - gobject_class->get_property = gst_tcpserversink_get_property; - gobject_class->finalize = gst_tcpserversink_finalize; - gstmultifdsink_class->init = gst_tcpserversink_init_send; gstmultifdsink_class->wait = gst_tcpserversink_handle_wait; gstmultifdsink_class->close = gst_tcpserversink_close; diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index 6b4848a..c4d98fa 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -25,9 +25,7 @@ #include -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #include #include @@ -82,10 +80,6 @@ struct _GstTCPServerSinkClass { GType gst_tcpserversink_get_type (void); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - +G_END_DECLS #endif /* __GST_TCPSERVERSINK_H__ */ diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c index 9c84320..5698976 100644 --- a/gst/tcp/gsttcpserversrc.c +++ b/gst/tcp/gsttcpserversrc.c @@ -46,6 +46,12 @@ GST_ELEMENT_DETAILS ("TCP Server source", "Receive data as a server over the network via TCP", "Thomas Vander Stichele "); +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + + /* TCPServerSrc signals and args */ enum { @@ -65,15 +71,15 @@ static void gst_tcpserversrc_class_init (GstTCPServerSrc * klass); static void gst_tcpserversrc_init (GstTCPServerSrc * tcpserversrc); static void gst_tcpserversrc_finalize (GObject * gobject); -static GstData *gst_tcpserversrc_get (GstPad * pad); -static GstElementStateReturn gst_tcpserversrc_change_state (GstElement * - element); +static gboolean gst_tcpserversrc_start (GstBaseSrc * bsrc); +static gboolean gst_tcpserversrc_stop (GstBaseSrc * bsrc); +static GstFlowReturn gst_tcpserversrc_create (GstPushSrc * psrc, + GstBuffer ** buf); static void gst_tcpserversrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_tcpserversrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static void gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock); static GstElementClass *parent_class = NULL; @@ -100,7 +106,7 @@ gst_tcpserversrc_get_type (void) }; tcpserversrc_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSrc", + g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPServerSrc", &tcpserversrc_info, 0); } return tcpserversrc_type; @@ -111,6 +117,9 @@ gst_tcpserversrc_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&srctemplate)); + gst_element_class_set_details (element_class, &gst_tcpserversrc_details); } @@ -119,11 +128,19 @@ gst_tcpserversrc_class_init (GstTCPServerSrc * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstBaseSrcClass *gstbasesrc_class; + GstPushSrcClass *gstpushsrc_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstbasesrc_class = (GstBaseSrcClass *) klass; + gstpushsrc_class = (GstPushSrcClass *) klass; + + parent_class = g_type_class_ref (GST_TYPE_PUSHSRC); - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + gobject_class->set_property = gst_tcpserversrc_set_property; + gobject_class->get_property = gst_tcpserversrc_get_property; + gobject_class->finalize = gst_tcpserversrc_finalize; g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, g_param_spec_string ("host", "Host", "The hostname to listen as", @@ -136,241 +153,98 @@ gst_tcpserversrc_class_init (GstTCPServerSrc * klass) GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, G_PARAM_READWRITE)); - gobject_class->set_property = gst_tcpserversrc_set_property; - gobject_class->get_property = gst_tcpserversrc_get_property; - gobject_class->finalize = gst_tcpserversrc_finalize; + gstbasesrc_class->start = gst_tcpserversrc_start; + gstbasesrc_class->stop = gst_tcpserversrc_stop; - gstelement_class->change_state = gst_tcpserversrc_change_state; - gstelement_class->set_clock = gst_tcpserversrc_set_clock; + gstpushsrc_class->create = gst_tcpserversrc_create; GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0, "TCP Server Source"); } static void -gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock) +gst_tcpserversrc_init (GstTCPServerSrc * src) { - GstTCPServerSrc *tcpserversrc; - - tcpserversrc = GST_TCPSERVERSRC (element); - - tcpserversrc->clock = clock; -} - -static void -gst_tcpserversrc_init (GstTCPServerSrc * this) -{ - /* create the src pad */ - this->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (this), this->srcpad); - gst_pad_set_get_function (this->srcpad, gst_tcpserversrc_get); - - this->server_port = TCP_DEFAULT_PORT; - this->host = g_strdup (TCP_DEFAULT_HOST); - this->clock = NULL; - this->server_sock_fd = -1; - this->client_sock_fd = -1; - this->curoffset = 0; - this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; - - GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN); + src->server_port = TCP_DEFAULT_PORT; + src->host = g_strdup (TCP_DEFAULT_HOST); + src->server_sock_fd = -1; + src->client_sock_fd = -1; + src->curoffset = 0; + src->protocol = GST_TCP_PROTOCOL_TYPE_NONE; + + GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN); } static void gst_tcpserversrc_finalize (GObject * gobject) { - GstTCPServerSrc *this = GST_TCPSERVERSRC (gobject); + GstTCPServerSrc *src = GST_TCPSERVERSRC (gobject); - g_free (this->host); + g_free (src->host); } -/* read the gdp caps packet from the socket */ -static GstCaps * -gst_tcpserversrc_gdp_read_caps (GstTCPServerSrc * this) -{ - size_t header_length = GST_DP_HEADER_LENGTH; - size_t readsize; - guint8 *header = NULL; - guint8 *payload = NULL; - ssize_t ret; - GstCaps *caps; - gchar *string; - - header = g_malloc (header_length); - - readsize = header_length; - GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize); - ret = read (this->client_sock_fd, header, readsize); - if (ret < 0) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - g_free (header); - return NULL; - } - g_assert (ret == readsize); - - if (!gst_dp_validate_header (header_length, header)) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP caps packet header does not validate")); - g_free (header); - return NULL; - } - - readsize = gst_dp_header_payload_length (header); - payload = g_malloc (readsize); - GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize); - ret = read (this->client_sock_fd, payload, readsize); - if (ret < 0) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - g_free (header); - g_free (payload); - return NULL; - } - g_assert (ret == readsize); - - if (!gst_dp_validate_payload (readsize, header, payload)) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP caps packet payload does not validate")); - g_free (header); - g_free (payload); - return NULL; - } - - caps = gst_dp_caps_from_packet (header_length, header, payload); - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string); - g_free (string); - - g_free (header); - g_free (payload); - - return caps; -} - -/* read the gdp buffer header from the socket - * returns a GstData, - * representing the new GstBuffer to read data into, or an EOS event - */ -static GstData * -gst_tcpserversrc_gdp_read_header (GstTCPServerSrc * this) -{ - size_t header_length = GST_DP_HEADER_LENGTH; - size_t readsize; - guint8 *header = NULL; - ssize_t ret; - GstBuffer *buffer; - - header = g_malloc (header_length); - readsize = header_length; - - GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize); - ret = read (this->client_sock_fd, header, readsize); - /* if we read 0 bytes, and we're blocking, we hit eos */ - if (ret == 0) { - GST_DEBUG ("blocking read returns 0, EOS"); - gst_element_set_eos (GST_ELEMENT (this)); - g_free (header); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } - if (ret < 0) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - g_free (header); - return NULL; - } - if (ret != readsize) { - g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret); - } - g_assert (ret == readsize); - - if (!gst_dp_validate_header (header_length, header)) { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP buffer packet header does not validate")); - g_free (header); - return NULL; - } - GST_LOG_OBJECT (this, "validated buffer packet header"); - - buffer = gst_dp_buffer_from_header (header_length, header); - - GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer); - return GST_DATA (buffer); -} - -static GstData * -gst_tcpserversrc_get (GstPad * pad) +static GstFlowReturn +gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf) { GstTCPServerSrc *src; size_t readsize; int ret; - - GstData *data = NULL; GstBuffer *buf = NULL; GstCaps *caps; - g_return_val_if_fail (pad != NULL, NULL); - g_return_val_if_fail (GST_IS_PAD (pad), NULL); - src = GST_TCPSERVERSRC (GST_OBJECT_PARENT (pad)); - g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), NULL); + src = GST_TCPSERVERSRC (psrc); + + g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), + GST_FLOW_ERROR); /* read the buffer header if we're using a protocol */ switch (src->protocol) { + case GST_TCP_PROTOCOL_TYPE_NONE: + { fd_set testfds; - case GST_TCP_PROTOCOL_TYPE_NONE: /* do a blocking select on the socket */ FD_ZERO (&testfds); FD_SET (src->client_sock_fd, &testfds); - ret = - select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, - 0); + /* no action (0) is an error too in our case */ - if (ret <= 0) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } + if ((ret = + select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, + (fd_set *) 0, 0)) <= 0) + goto select_error; + /* ask how much is available for reading on the socket */ - ret = ioctl (src->client_sock_fd, FIONREAD, &readsize); - if (ret < 0) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("ioctl failed: %s", g_strerror (errno))); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } + if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0) + goto ioctl_error; buf = gst_buffer_new_and_alloc (readsize); break; + } case GST_TCP_PROTOCOL_TYPE_GDP: /* if we haven't received caps yet, we should get them first */ if (!src->caps_received) { gchar *string; - if (!(caps = gst_tcpserversrc_gdp_read_caps (src))) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not read caps through GDP")); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } + if (!(caps = + gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd))) + goto gdp_caps_read_error; + src->caps_received = TRUE; string = gst_caps_to_string (caps); GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string); g_free (string); - if (!gst_pad_try_set_caps (pad, caps)) { - g_warning ("Could not set caps"); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } + gst_pad_set_caps (GST_BASESRC_PAD (psrc), caps); } /* now receive the buffer header */ - if (!(data = gst_tcpserversrc_gdp_read_header (src))) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not read data header through GDP")); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } - if (GST_IS_EVENT (data)) - return data; - buf = GST_BUFFER (data); + if (!(buf = + gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd))) + goto gdp_buffer_read_error; GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p", buf); + /* use this new buffer to read data into */ readsize = GST_BUFFER_SIZE (buf); break; @@ -380,31 +254,63 @@ gst_tcpserversrc_get (GstPad * pad) } GST_LOG_OBJECT (src, "Reading %d bytes", readsize); - ret = - gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf), - readsize); - if (ret < 0) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); - gst_buffer_unref (buf); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } + if ((ret = + gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf), + readsize)) < 0) + goto read_error; /* if we read 0 bytes, and we're blocking, we hit eos */ - if (ret == 0) { - GST_DEBUG ("blocking read returns 0, EOS"); - gst_buffer_unref (buf); - gst_element_set_eos (GST_ELEMENT (src)); - return GST_DATA (gst_event_new (GST_EVENT_EOS)); - } + if (ret == 0) + goto hit_eos; readsize = ret; GST_LOG_OBJECT (src, "Read %d bytes", readsize); GST_BUFFER_SIZE (buf) = readsize; - GST_BUFFER_MAXSIZE (buf) = readsize; GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize; src->curoffset += readsize; - return GST_DATA (buf); + + *outbuf = buf; + + return GST_FLOW_OK; + + /* ERROR */ +select_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("select failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +ioctl_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("ioctl failed: %s", g_strerror (errno))); + return GST_FLOW_ERROR; + } +gdp_caps_read_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Could not read caps through GDP")); + return GST_FLOW_ERROR; + } +gdp_buffer_read_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Could not read buffer header through GDP")); + return GST_FLOW_ERROR; + } +read_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + gst_buffer_unref (buf); + return GST_FLOW_ERROR; + } +hit_eos: + { + GST_DEBUG ("blocking read returns 0, EOS"); + gst_buffer_unref (buf); + return GST_FLOW_WRONG_STATE; + } } @@ -467,121 +373,125 @@ gst_tcpserversrc_get_property (GObject * object, guint prop_id, GValue * value, /* set up server */ static gboolean -gst_tcpserversrc_init_receive (GstTCPServerSrc * this) +gst_tcpserversrc_start (GstBaseSrc * bsrc) { int ret; + GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc); /* reset caps_received flag */ - this->caps_received = FALSE; + src->caps_received = FALSE; /* create the server listener socket */ - if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } - GST_DEBUG_OBJECT (this, "opened receiving server socket with fd %d", - this->server_sock_fd); + if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) + goto socket_error; + + GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d", + src->server_sock_fd); /* make address reusable */ ret = 1; - if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret, - sizeof (int)) < 0) { - GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); - return FALSE; - } + if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret, + sizeof (int)) < 0) + goto sock_opt; /* name the socket */ - memset (&this->server_sin, 0, sizeof (this->server_sin)); - this->server_sin.sin_family = AF_INET; /* network socket */ - this->server_sin.sin_port = htons (this->server_port); /* on port */ - if (this->host) { - gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); - - if (!host) { - gst_tcp_socket_close (&this->server_sock_fd); - return FALSE; - } - - this->server_sin.sin_addr.s_addr = inet_addr (host); + memset (&src->server_sin, 0, sizeof (src->server_sin)); + src->server_sin.sin_family = AF_INET; /* network socket */ + src->server_sin.sin_port = htons (src->server_port); /* on port */ + if (src->host) { + gchar *host; + + if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) + goto host_error; + src->server_sin.sin_addr.s_addr = inet_addr (host); g_free (host); } else - this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); + src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); /* bind it */ - GST_DEBUG_OBJECT (this, "binding server socket to address"); - ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin, - sizeof (this->server_sin)); + GST_DEBUG_OBJECT (src, "binding server socket to address"); + if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin, + sizeof (src->server_sin))) < 0) + goto bind_error; + + GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d", + src->server_sock_fd, TCP_BACKLOG); + + if (listen (src->server_sock_fd, TCP_BACKLOG) == -1) + goto listen_error; + + /* FIXME: maybe we should think about moving actual client accepting + somewhere else */ + GST_DEBUG_OBJECT (src, "waiting for client"); + if ((src->client_sock_fd = + accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin, + &src->client_sin_len)) == -1) + goto accept_error; + + GST_DEBUG_OBJECT (src, "received client"); + + GST_FLAG_SET (src, GST_TCPSERVERSRC_OPEN); - if (ret) { - gst_tcp_socket_close (&this->server_sock_fd); + return TRUE; + + /* ERRORS */ +socket_error: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); + return FALSE; + } +sock_opt: + { + GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), + ("Could not setsockopt: %s", g_strerror (errno))); + return FALSE; + } +host_error: + { + gst_tcp_socket_close (&src->server_sock_fd); + return FALSE; + } +bind_error: + { + gst_tcp_socket_close (&src->server_sock_fd); switch (errno) { default: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), ("bind failed: %s", g_strerror (errno))); - return FALSE; break; } + return FALSE; } - - GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", - this->server_sock_fd, TCP_BACKLOG); - if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) { - gst_tcp_socket_close (&this->server_sock_fd); - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), +listen_error: + { + gst_tcp_socket_close (&src->server_sock_fd); + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), ("Could not listen on server socket: %s", g_strerror (errno))); return FALSE; } - - /* FIXME: maybe we should think about moving actual client accepting - somewhere else */ - GST_DEBUG_OBJECT (this, "waiting for client"); - this->client_sock_fd = - accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin, - &this->client_sin_len); - if (this->client_sock_fd == -1) { - gst_tcp_socket_close (&this->server_sock_fd); - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), +accept_error: + { + gst_tcp_socket_close (&src->server_sock_fd); + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), ("Could not accept client on server socket: %s", g_strerror (errno))); return FALSE; } - GST_DEBUG_OBJECT (this, "received client"); - - GST_FLAG_SET (this, GST_TCPSERVERSRC_OPEN); - return TRUE; } -static void -gst_tcpserversrc_close (GstTCPServerSrc * this) +static gboolean +gst_tcpserversrc_stop (GstBaseSrc * bsrc) { - if (this->server_sock_fd != -1) { - close (this->server_sock_fd); - this->server_sock_fd = -1; - } - if (this->client_sock_fd != -1) { - close (this->client_sock_fd); - this->client_sock_fd = -1; - } - GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN); -} + GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc); -static GstElementStateReturn -gst_tcpserversrc_change_state (GstElement * element) -{ - g_return_val_if_fail (GST_IS_TCPSERVERSRC (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) - gst_tcpserversrc_close (GST_TCPSERVERSRC (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) { - if (!gst_tcpserversrc_init_receive (GST_TCPSERVERSRC (element))) - return GST_STATE_FAILURE; - } + if (src->server_sock_fd != -1) { + close (src->server_sock_fd); + src->server_sock_fd = -1; } + if (src->client_sock_fd != -1) { + close (src->client_sock_fd); + src->client_sock_fd = -1; + } + GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN); - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); - - return GST_STATE_SUCCESS; + return TRUE; } diff --git a/gst/tcp/gsttcpserversrc.h b/gst/tcp/gsttcpserversrc.h index fbc9e36..a0f50be 100644 --- a/gst/tcp/gsttcpserversrc.h +++ b/gst/tcp/gsttcpserversrc.h @@ -23,10 +23,9 @@ #define __GST_TCPSERVERSRC_H__ #include +#include -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_END_DECLS #include #include @@ -60,10 +59,7 @@ typedef enum { } GstTCPServerSrcFlags; struct _GstTCPServerSrc { - GstElement element; - - /* pad */ - GstPad *srcpad; + GstPushSrc element; /* server information */ int server_port; @@ -81,18 +77,14 @@ struct _GstTCPServerSrc { GstTCPProtocolType protocol; /* protocol used for reading data */ gboolean caps_received; /* if we have received caps yet */ - GstClock *clock; }; struct _GstTCPServerSrcClass { - GstElementClass parent_class; + GstPushSrcClass parent_class; }; GType gst_tcpserversrc_get_type (void); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ +G_BEGIN_DECLS #endif /* __GST_TCPSERVERSRC_H__ */ diff --git a/gst/tcp/gsttcpsink.c b/gst/tcp/gsttcpsink.c deleted file mode 100644 index a9211d4..0000000 --- a/gst/tcp/gsttcpsink.c +++ /dev/null @@ -1,425 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif -#include "gsttcpsink.h" - -#define TCP_DEFAULT_HOST "localhost" -#define TCP_DEFAULT_PORT 4953 - -/* elementfactory information */ -static GstElementDetails gst_tcpsink_details = -GST_ELEMENT_DETAILS ("TCP packet sender", - "Sink/Network", - "Send data over the network via TCP", - "Zeeshan Ali "); - -/* TCPSink signals and args */ -enum -{ - FRAME_ENCODED, - /* FILL ME */ - LAST_SIGNAL -}; - -enum -{ - ARG_0, - ARG_HOST, - ARG_PORT, - ARG_CONTROL, - ARG_MTU - /* FILL ME */ -}; - -#define GST_TYPE_TCPSINK_CONTROL (gst_tcpsink_control_get_type()) -static GType -gst_tcpsink_control_get_type (void) -{ - static GType tcpsink_control_type = 0; - static GEnumValue tcpsink_control[] = { - {CONTROL_NONE, "1", "none"}, - {CONTROL_TCP, "2", "tcp"}, - {CONTROL_ZERO, NULL, NULL} - }; - - if (!tcpsink_control_type) { - tcpsink_control_type = - g_enum_register_static ("GstTCPSinkControl", tcpsink_control); - } - return tcpsink_control_type; -} - -static void gst_tcpsink_base_init (gpointer g_class); -static void gst_tcpsink_class_init (GstTCPSink * klass); -static void gst_tcpsink_init (GstTCPSink * tcpsink); - -static void gst_tcpsink_set_clock (GstElement * element, GstClock * clock); - -static void gst_tcpsink_chain (GstPad * pad, GstData * _data); -static GstElementStateReturn gst_tcpsink_change_state (GstElement * element); - -static void gst_tcpsink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_tcpsink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - - -static GstElementClass *parent_class = NULL; - -/*static guint gst_tcpsink_signals[LAST_SIGNAL] = { 0 }; */ - -GType -gst_tcpsink_get_type (void) -{ - static GType tcpsink_type = 0; - - - if (!tcpsink_type) { - static const GTypeInfo tcpsink_info = { - sizeof (GstTCPSinkClass), - gst_tcpsink_base_init, - NULL, - (GClassInitFunc) gst_tcpsink_class_init, - NULL, - NULL, - sizeof (GstTCPSink), - 0, - (GInstanceInitFunc) gst_tcpsink_init, - NULL - }; - - tcpsink_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSink", &tcpsink_info, - 0); - } - return tcpsink_type; -} - -static void -gst_tcpsink_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_set_details (element_class, &gst_tcpsink_details); -} - -static void -gst_tcpsink_class_init (GstTCPSink * klass) -{ - GObjectClass *gobject_class; - GstElementClass *gstelement_class; - - gobject_class = (GObjectClass *) klass; - gstelement_class = (GstElementClass *) klass; - - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); - - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, - g_param_spec_string ("host", "host", "The host/IP to send the packets to", - TCP_DEFAULT_HOST, G_PARAM_READWRITE)); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, - g_param_spec_int ("port", "port", "The port to send the packets to", - 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_CONTROL, - g_param_spec_enum ("control", "control", "The type of control", - GST_TYPE_TCPSINK_CONTROL, CONTROL_TCP, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_MTU, g_param_spec_int ("mtu", "mtu", "mtu", G_MININT, G_MAXINT, 0, G_PARAM_READWRITE)); /* CHECKME */ - gobject_class->set_property = gst_tcpsink_set_property; - gobject_class->get_property = gst_tcpsink_get_property; - - gstelement_class->change_state = gst_tcpsink_change_state; - gstelement_class->set_clock = gst_tcpsink_set_clock; -} - - -static GstPadLinkReturn -gst_tcpsink_sink_link (GstPad * pad, const GstCaps * caps) -{ - GstTCPSink *tcpsink; - -#ifndef GST_DISABLE_LOADSAVE - struct sockaddr_in serv_addr; - struct in_addr addr; - struct hostent *he; - int fd; - FILE *f; - xmlDocPtr doc; -#endif - - tcpsink = GST_TCPSINK (gst_pad_get_parent (pad)); - - switch (tcpsink->control) { -#ifndef GST_DISABLE_LOADSAVE - case CONTROL_TCP: - memset (&serv_addr, 0, sizeof (serv_addr)); - - /* if its an IP address */ - if (inet_aton (tcpsink->host, &addr)) { - memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr)); - } - - /* we dont need to lookup for localhost */ - else if (strcmp (tcpsink->host, TCP_DEFAULT_HOST) == 0) { - if (inet_aton ("127.0.0.1", &addr)) { - memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr)); - } - } - - /* if its a hostname */ - else if ((he = gethostbyname (tcpsink->host))) { - memmove (&(serv_addr.sin_addr), he->h_addr, he->h_length); - } - - else { - perror ("hostname lookup error?"); - return GST_PAD_LINK_REFUSED; - } - - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons (tcpsink->port + 1); - - doc = xmlNewDoc ("1.0"); - doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL); - - gst_caps_save_thyself (caps, doc->xmlRootNode); - - if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { - perror ("socket"); - return GST_PAD_LINK_REFUSED; - } - - if (connect (fd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) != 0) { - g_printerr ("tcpsink: connect to %s port %d failed: %s\n", - tcpsink->host, tcpsink->port + 1, g_strerror (errno)); - return GST_PAD_LINK_REFUSED; - } - - f = fdopen (dup (fd), "wb"); - - xmlDocDump (f, doc); - fclose (f); - close (fd); - break; - -#endif - case CONTROL_NONE: - return GST_PAD_LINK_OK; - break; - default: - return GST_PAD_LINK_REFUSED; - break; - } - - return GST_PAD_LINK_OK; -} - -static void -gst_tcpsink_set_clock (GstElement * element, GstClock * clock) -{ - GstTCPSink *tcpsink; - - tcpsink = GST_TCPSINK (element); - - tcpsink->clock = clock; -} - -static void -gst_tcpsink_init (GstTCPSink * tcpsink) -{ - /* create the sink and src pads */ - tcpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_element_add_pad (GST_ELEMENT (tcpsink), tcpsink->sinkpad); - gst_pad_set_chain_function (tcpsink->sinkpad, gst_tcpsink_chain); - gst_pad_set_link_function (tcpsink->sinkpad, gst_tcpsink_sink_link); - - tcpsink->host = g_strdup (TCP_DEFAULT_HOST); - tcpsink->port = TCP_DEFAULT_PORT; - tcpsink->control = CONTROL_TCP; - /* should support as minimum 576 for IPV4 and 1500 for IPV6 */ - tcpsink->mtu = 1500; - - tcpsink->clock = NULL; -} - -static void -gst_tcpsink_chain (GstPad * pad, GstData * _data) -{ - GstBuffer *buf = GST_BUFFER (_data); - GstTCPSink *tcpsink; - - g_return_if_fail (pad != NULL); - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (buf != NULL); - - tcpsink = GST_TCPSINK (GST_OBJECT_PARENT (pad)); - - if (tcpsink->clock && GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { - gst_element_wait (GST_ELEMENT (tcpsink), GST_BUFFER_TIMESTAMP (buf)); - } - - if (write (tcpsink->sock, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)) <= 0) { - perror ("write"); - } - - gst_buffer_unref (buf); -} - -static void -gst_tcpsink_set_property (GObject * object, guint prop_id, const GValue * value, - GParamSpec * pspec) -{ - GstTCPSink *tcpsink; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_TCPSINK (object)); - tcpsink = GST_TCPSINK (object); - - switch (prop_id) { - case ARG_HOST: - if (tcpsink->host != NULL) - g_free (tcpsink->host); - if (g_value_get_string (value) == NULL) - tcpsink->host = NULL; - else - tcpsink->host = g_strdup (g_value_get_string (value)); - break; - case ARG_PORT: - tcpsink->port = g_value_get_int (value); - break; - case ARG_CONTROL: - tcpsink->control = g_value_get_enum (value); - break; - case ARG_MTU: - tcpsink->mtu = g_value_get_int (value); - break; - default: - break; - } -} - -static void -gst_tcpsink_get_property (GObject * object, guint prop_id, GValue * value, - GParamSpec * pspec) -{ - GstTCPSink *tcpsink; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_TCPSINK (object)); - tcpsink = GST_TCPSINK (object); - - switch (prop_id) { - case ARG_HOST: - g_value_set_string (value, tcpsink->host); - break; - case ARG_PORT: - g_value_set_int (value, tcpsink->port); - break; - case ARG_CONTROL: - g_value_set_enum (value, tcpsink->control); - break; - case ARG_MTU: - g_value_set_int (value, tcpsink->mtu); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - - -/* create a socket for sending to remote machine */ -static gboolean -gst_tcpsink_init_send (GstTCPSink * sink) -{ - struct hostent *he; - struct in_addr addr; - - memset (&sink->theiraddr, 0, sizeof (sink->theiraddr)); - sink->theiraddr.sin_family = AF_INET; /* host byte order */ - sink->theiraddr.sin_port = htons (sink->port); /* short, network byte order */ - - /* if its an IP address */ - if (inet_aton (sink->host, &addr)) { - memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr)); - } - - /* we dont need to lookup for localhost */ - else if (strcmp (sink->host, TCP_DEFAULT_HOST) == 0) { - if (inet_aton ("127.0.0.1", &addr)) { - memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr)); - } - } - - /* if its a hostname */ - else if ((he = gethostbyname (sink->host))) { - memmove (&(sink->theiraddr.sin_addr), he->h_addr, he->h_length); - } - - else { - perror ("hostname lookup error?"); - return FALSE; - } - - if ((sink->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { - perror ("socket"); - return FALSE; - } - - if (connect (sink->sock, (struct sockaddr *) &(sink->theiraddr), - sizeof (sink->theiraddr)) != 0) { - perror ("stream connect"); - return FALSE; - } - - GST_FLAG_SET (sink, GST_TCPSINK_OPEN); - - return TRUE; -} - -static void -gst_tcpsink_close (GstTCPSink * sink) -{ - close (sink->sock); - - GST_FLAG_UNSET (sink, GST_TCPSINK_OPEN); -} - -static GstElementStateReturn -gst_tcpsink_change_state (GstElement * element) -{ - g_return_val_if_fail (GST_IS_TCPSINK (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) - gst_tcpsink_close (GST_TCPSINK (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) { - if (!gst_tcpsink_init_send (GST_TCPSINK (element))) - return GST_STATE_FAILURE; - } - } - - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); - - return GST_STATE_SUCCESS; -} diff --git a/gst/tcp/gsttcpsink.h b/gst/tcp/gsttcpsink.h deleted file mode 100644 index 98c82f6..0000000 --- a/gst/tcp/gsttcpsink.h +++ /dev/null @@ -1,96 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCPSINK_H__ -#define __GST_TCPSINK_H__ - - -#include - -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "gsttcpplugin.h" - -#define GST_TYPE_TCPSINK \ - (gst_tcpsink_get_type()) -#define GST_TCPSINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSINK,GstTCPSink)) -#define GST_TCPSINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSINK,GstTCPSink)) -#define GST_IS_TCPSINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSINK)) -#define GST_IS_TCPSINK_CLASS(obj) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSINK)) - -typedef struct _GstTCPSink GstTCPSink; -typedef struct _GstTCPSinkClass GstTCPSinkClass; - -typedef enum { - GST_TCPSINK_OPEN = GST_ELEMENT_FLAG_LAST, - - GST_TCPSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2, -} GstTCPSinkFlags; - -struct _GstTCPSink { - GstElement element; - - /* pads */ - GstPad *sinkpad,*srcpad; - - int sock; - struct sockaddr_in theiraddr; - Gst_TCP_Control control; - - gint port; - gchar *host; - - guint mtu; - - GstClock *clock; -}; - -struct _GstTCPSinkClass { - GstElementClass parent_class; -}; - -GType gst_tcpsink_get_type(void); - - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - - -#endif /* __GST_TCPSINK_H__ */ diff --git a/gst/tcp/gsttcpsrc.c b/gst/tcp/gsttcpsrc.c deleted file mode 100644 index 6594580..0000000 --- a/gst/tcp/gsttcpsrc.c +++ /dev/null @@ -1,504 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "gsttcpsrc.h" -#include - -#define TCP_DEFAULT_PORT 4953 - -/* elementfactory information */ -static GstElementDetails gst_tcpsrc_details = -GST_ELEMENT_DETAILS ("TCP packet receiver", - "Source/Network", - "Receive data over the network via TCP", - "Zeeshan Ali "); - -/* TCPSrc signals and args */ -enum -{ - /* FILL ME */ - LAST_SIGNAL -}; - -enum -{ - ARG_0, - ARG_PORT, - ARG_CONTROL -/* ARG_SOCKET_OPTIONS,*/ - /* FILL ME */ -}; - -#define GST_TYPE_TCPSRC_CONTROL (gst_tcpsrc_control_get_type()) -static GType -gst_tcpsrc_control_get_type (void) -{ - static GType tcpsrc_control_type = 0; - static GEnumValue tcpsrc_control[] = { - {CONTROL_NONE, "1", "none"}, - {CONTROL_TCP, "2", "tcp"}, - {CONTROL_ZERO, NULL, NULL} - }; - - if (!tcpsrc_control_type) { - tcpsrc_control_type = - g_enum_register_static ("GstTCPSrcControl", tcpsrc_control); - } - return tcpsrc_control_type; -} - -static void gst_tcpsrc_base_init (gpointer g_class); -static void gst_tcpsrc_class_init (GstTCPSrc * klass); -static void gst_tcpsrc_init (GstTCPSrc * tcpsrc); - -static GstData *gst_tcpsrc_get (GstPad * pad); -static GstElementStateReturn gst_tcpsrc_change_state (GstElement * element); - -static void gst_tcpsrc_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_tcpsrc_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); -static void gst_tcpsrc_set_clock (GstElement * element, GstClock * clock); - -static GstElementClass *parent_class = NULL; - -/*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */ - -GType -gst_tcpsrc_get_type (void) -{ - static GType tcpsrc_type = 0; - - - if (!tcpsrc_type) { - static const GTypeInfo tcpsrc_info = { - sizeof (GstTCPSrcClass), - gst_tcpsrc_base_init, - NULL, - (GClassInitFunc) gst_tcpsrc_class_init, - NULL, - NULL, - sizeof (GstTCPSrc), - 0, - (GInstanceInitFunc) gst_tcpsrc_init, - NULL - }; - - tcpsrc_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0); - } - return tcpsrc_type; -} - -static void -gst_tcpsrc_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_set_details (element_class, &gst_tcpsrc_details); -} - -static void -gst_tcpsrc_class_init (GstTCPSrc * klass) -{ - GObjectClass *gobject_class; - GstElementClass *gstelement_class; - - gobject_class = (GObjectClass *) klass; - gstelement_class = (GstElementClass *) klass; - - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); - - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, - g_param_spec_int ("port", "port", "The port to receive the packets from", - 0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_CONTROL, - g_param_spec_enum ("control", "control", "The type of control", - GST_TYPE_TCPSRC_CONTROL, CONTROL_TCP, G_PARAM_READWRITE)); -/* - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SOCKET_OPTIONS, - g_param_spec_boolean ("socketop", "socketop", "Enable or disable socket options REUSEADDR and KEEPALIVE", - FALSE, G_PARAM_READWRITE)); -*/ - gobject_class->set_property = gst_tcpsrc_set_property; - gobject_class->get_property = gst_tcpsrc_get_property; - - gstelement_class->change_state = gst_tcpsrc_change_state; - gstelement_class->set_clock = gst_tcpsrc_set_clock; -} - -static void -gst_tcpsrc_set_clock (GstElement * element, GstClock * clock) -{ - GstTCPSrc *tcpsrc; - - tcpsrc = GST_TCPSRC (element); - - tcpsrc->clock = clock; -} - -static void -gst_tcpsrc_init (GstTCPSrc * tcpsrc) -{ - /* create the src and src pads */ - tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad); - gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get); - - tcpsrc->port = TCP_DEFAULT_PORT; - tcpsrc->control = CONTROL_TCP; - tcpsrc->clock = NULL; - tcpsrc->sock = -1; - tcpsrc->control_sock = -1; - tcpsrc->client_sock = -1; - /*tcpsrc->socket_options = FALSE; */ - - GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN); - GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF); - GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED); -} - -static GstData * -gst_tcpsrc_get (GstPad * pad) -{ - GstTCPSrc *tcpsrc; - GstBuffer *outbuf; - socklen_t len; - gint numbytes; - fd_set read_fds; - guint max_sock; - -#ifndef GST_DISABLE_LOADSAVE - int ret, client_sock; -#endif - struct sockaddr client_addr; - - g_return_val_if_fail (pad != NULL, NULL); - g_return_val_if_fail (GST_IS_PAD (pad), NULL); - - tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad)); - - FD_ZERO (&read_fds); - FD_SET (tcpsrc->sock, &read_fds); - - max_sock = tcpsrc->sock; - - if (tcpsrc->control_sock >= 0) { - FD_SET (tcpsrc->control_sock, &read_fds); - max_sock = MAX (tcpsrc->sock, tcpsrc->control_sock); - } - - /* Add to FD_SET client socket, when connection has been established */ - if (tcpsrc->client_sock >= 0) { - FD_SET (tcpsrc->client_sock, &read_fds); - max_sock = MAX (tcpsrc->client_sock, max_sock); - } - - - if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) > 0) { - if ((tcpsrc->control_sock != -1) - && FD_ISSET (tcpsrc->control_sock, &read_fds)) { - guchar *buf = NULL; - -#ifndef GST_DISABLE_LOADSAVE - xmlDocPtr doc; - GstCaps *caps; -#endif - - - switch (tcpsrc->control) { - case CONTROL_TCP: - -#ifndef GST_DISABLE_LOADSAVE - buf = g_malloc (1024 * 10); - - len = sizeof (struct sockaddr); - client_sock = accept (tcpsrc->control_sock, &client_addr, &len); - - if (client_sock <= 0) { - perror ("control_sock accept"); - } - - else if ((ret = read (client_sock, buf, 1024 * 10)) <= 0) { - perror ("control_sock read"); - } - - else { - buf[ret] = '\0'; - doc = xmlParseMemory (buf, ret); - caps = gst_caps_load_thyself (doc->xmlRootNode); - - /* foward the connect, we don't signal back the result here... */ - gst_pad_try_set_caps (tcpsrc->srcpad, caps); - } - - g_free (buf); -#endif - break; - case CONTROL_NONE: - default: - g_free (buf); - return NULL; - break; - } - - outbuf = NULL; - } else { - outbuf = gst_buffer_new (); - GST_BUFFER_DATA (outbuf) = g_malloc (24000); - GST_BUFFER_SIZE (outbuf) = 24000; - - if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) { - if (tcpsrc->clock) { - GstClockTime current_time; - GstEvent *discont; - - current_time = gst_clock_get_time (tcpsrc->clock); - - GST_BUFFER_TIMESTAMP (outbuf) = current_time; - - discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, - current_time, NULL); - - gst_pad_push (tcpsrc->srcpad, GST_DATA (discont)); - } - - GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF); - } - - else { - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - } - - if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) { - tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len); - - if (tcpsrc->client_sock <= 0) { - perror ("accept"); - } - - else { - GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED); - } - } - - numbytes = - read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf), - GST_BUFFER_SIZE (outbuf)); - - if (numbytes > 0) { - GST_BUFFER_SIZE (outbuf) = numbytes; - } - - else { - if (numbytes == -1) { - perror ("read"); - } else - g_print ("End of Stream reached\n"); - gst_buffer_unref (outbuf); - outbuf = NULL; - close (tcpsrc->client_sock); - tcpsrc->client_sock = -1; - GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED); - } - } - } - - else { - perror ("select"); - outbuf = NULL; - } - - return GST_DATA (outbuf); -} - - -static void -gst_tcpsrc_set_property (GObject * object, guint prop_id, const GValue * value, - GParamSpec * pspec) -{ - GstTCPSrc *tcpsrc; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_TCPSRC (object)); - tcpsrc = GST_TCPSRC (object); - - switch (prop_id) { - case ARG_PORT: - tcpsrc->port = g_value_get_int (value); - break; - case ARG_CONTROL: - tcpsrc->control = g_value_get_enum (value); - break; -/* case ARG_SOCKET_OPTIONS: - tcpsrc->socket_options = g_value_get_boolean(value); - break; */ - default: - break; - } -} - -static void -gst_tcpsrc_get_property (GObject * object, guint prop_id, GValue * value, - GParamSpec * pspec) -{ - GstTCPSrc *tcpsrc; - - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_TCPSRC (object)); - tcpsrc = GST_TCPSRC (object); - - switch (prop_id) { - case ARG_PORT: - g_value_set_int (value, tcpsrc->port); - break; - case ARG_CONTROL: - g_value_set_enum (value, tcpsrc->control); - break; -/* case ARG_SOCKET_OPTIONS: - g_value_set_boolean(value,tcpsrc->socket_options); - break;*/ - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -/* create a socket for sending to remote machine */ -static gboolean -gst_tcpsrc_init_receive (GstTCPSrc * src) -{ - guint val = 0; - - memset (&src->myaddr, 0, sizeof (src->myaddr)); - src->myaddr.sin_family = AF_INET; /* host byte order */ - src->myaddr.sin_port = htons (src->port); /* short, network byte order */ - src->myaddr.sin_addr.s_addr = INADDR_ANY; - - if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - perror ("stream_socket"); - return FALSE; - } - -/* if (src->socket_options) - {*/ - g_print ("Socket Options SO_REUSEADDR, SO_KEEPALIVE\n"); - /* Sock Options */ - val = 1; - /* allow local address reuse */ - if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (int)) < 0) - perror ("setsockopt()"); - val = 1; - /* periodically test if connection still alive */ - if (setsockopt (src->sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof (int)) < 0) - perror ("setsockopt()"); - /* Sock Options */ -/* } */ - - if (bind (src->sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) == -1) { - perror ("stream_sock bind"); - return FALSE; - } - - if (listen (src->sock, 5) == -1) { - perror ("stream_sock listen"); - return FALSE; - } - - fcntl (src->sock, F_SETFL, O_NONBLOCK); - - switch (src->control) { - case CONTROL_TCP: - if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - perror ("control_socket"); - return FALSE; - } - - src->myaddr.sin_port = htons (src->port + 1); - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) == -1) { - perror ("control bind"); - return FALSE; - } - - if (listen (src->control_sock, 5) == -1) { - perror ("control listen"); - return FALSE; - } - - fcntl (src->control_sock, F_SETFL, O_NONBLOCK); - case CONTROL_NONE: - GST_FLAG_SET (src, GST_TCPSRC_OPEN); - return TRUE; - break; - default: - return FALSE; - break; - } - - GST_FLAG_SET (src, GST_TCPSRC_OPEN); - - return TRUE; -} - -static void -gst_tcpsrc_close (GstTCPSrc * src) -{ - if (src->sock != -1) { - close (src->sock); - src->sock = -1; - } - if (src->control_sock != -1) { - close (src->control_sock); - src->control_sock = -1; - } - if (src->client_sock != -1) { - close (src->client_sock); - src->client_sock = -1; - } - - GST_FLAG_UNSET (src, GST_TCPSRC_OPEN); -} - -static GstElementStateReturn -gst_tcpsrc_change_state (GstElement * element) -{ - g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) - gst_tcpsrc_close (GST_TCPSRC (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) { - if (!gst_tcpsrc_init_receive (GST_TCPSRC (element))) - return GST_STATE_FAILURE; - } - } - - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); - - return GST_STATE_SUCCESS; -} diff --git a/gst/tcp/gsttcpsrc.h b/gst/tcp/gsttcpsrc.h deleted file mode 100644 index 8905abd..0000000 --- a/gst/tcp/gsttcpsrc.h +++ /dev/null @@ -1,92 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCPSRC_H__ -#define __GST_TCPSRC_H__ - -#include - -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ - -#include -#include -#include -#include -#include -#include -#include -#include "gsttcpplugin.h" - -#include - -#define GST_TYPE_TCPSRC \ - (gst_tcpsrc_get_type()) -#define GST_TCPSRC(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSRC,GstTCPSrc)) -#define GST_TCPSRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSRC,GstTCPSrc)) -#define GST_IS_TCPSRC(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSRC)) -#define GST_IS_TCPSRC_CLASS(obj) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSRC)) - -typedef struct _GstTCPSrc GstTCPSrc; -typedef struct _GstTCPSrcClass GstTCPSrcClass; - -typedef enum { - GST_TCPSRC_OPEN = GST_ELEMENT_FLAG_LAST, - GST_TCPSRC_1ST_BUF, - GST_TCPSRC_CONNECTED, - - GST_TCPSRC_FLAG_LAST, -} GstTCPSrcFlags; - -struct _GstTCPSrc { - GstElement element; - - /* pads */ - GstPad *sinkpad,*srcpad; - - int port; - int sock; - int client_sock; - int control_sock; -/* gboolean socket_options;*/ - Gst_TCP_Control control; - - struct sockaddr_in myaddr; - GstClock *clock; -}; - -struct _GstTCPSrcClass { - GstElementClass parent_class; -}; - -GType gst_tcpsrc_get_type(void); - - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - - -#endif /* __GST_TCPSRC_H__ */ -- 2.7.4