Ported tcp plugins to 0.9.
authorWim Taymans <wim.taymans@gmail.com>
Tue, 5 Jul 2005 10:21:40 +0000 (10:21 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 5 Jul 2005 10:21:40 +0000 (10:21 +0000)
Original commit message from CVS:
* configure.ac:
* gst/tcp/Makefile.am:
* gst/tcp/README:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type),
(gst_multifdsink_base_init), (gst_multifdsink_class_init),
(gst_multifdsink_init), (gst_multifdsink_remove_client_link),
(is_sync_frame), (gst_multifdsink_handle_client_write),
(gst_multifdsink_render), (gst_multifdsink_start),
(gst_multifdsink_stop), (gst_multifdsink_change_state):
* gst/tcp/gstmultifdsink.h:
* gst/tcp/gsttcp.c: (gst_tcp_host_to_ip),
(gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps),
(gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps):
* gst/tcp/gsttcp.h:
* gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init),
(gst_tcpclientsink_init), (gst_tcpclientsink_setcaps),
(gst_tcpclientsink_render), (gst_tcpclientsink_start),
(gst_tcpclientsink_stop), (gst_tcpclientsink_change_state):
* gst/tcp/gsttcpclientsink.h:
* gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
(gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
(gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
(gst_tcpclientsrc_create), (gst_tcpclientsrc_start),
(gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock):
* gst/tcp/gsttcpclientsrc.h:
* gst/tcp/gsttcpplugin.c: (plugin_init):
* gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init):
* gst/tcp/gsttcpserversink.h:
* gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
(gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
(gst_tcpserversrc_init), (gst_tcpserversrc_finalize),
(gst_tcpserversrc_create), (gst_tcpserversrc_start),
(gst_tcpserversrc_stop):
* gst/tcp/gsttcpserversrc.h:
* gst/tcp/gsttcpsink.c:
* gst/tcp/gsttcpsink.h:
* gst/tcp/gsttcpsrc.c:
* gst/tcp/gsttcpsrc.h:
Ported tcp plugins to 0.9.

21 files changed:
ChangeLog
configure.ac
gst/tcp/Makefile.am
gst/tcp/README
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
gst/tcp/gsttcp.c
gst/tcp/gsttcp.h
gst/tcp/gsttcpclientsink.c
gst/tcp/gsttcpclientsink.h
gst/tcp/gsttcpclientsrc.c
gst/tcp/gsttcpclientsrc.h
gst/tcp/gsttcpplugin.c
gst/tcp/gsttcpserversink.c
gst/tcp/gsttcpserversink.h
gst/tcp/gsttcpserversrc.c
gst/tcp/gsttcpserversrc.h
gst/tcp/gsttcpsink.c [deleted file]
gst/tcp/gsttcpsink.h [deleted file]
gst/tcp/gsttcpsrc.c [deleted file]
gst/tcp/gsttcpsrc.h [deleted file]

index 5d1fece..b2925e3 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,46 @@
+2005-07-05  Wim Taymans  <wim@fluendo.com>
+
+       * configure.ac:
+       * gst/tcp/Makefile.am:
+       * gst/tcp/README:
+       * gst/tcp/gstmultifdsink.c: (gst_multifdsink_get_type),
+       (gst_multifdsink_base_init), (gst_multifdsink_class_init),
+       (gst_multifdsink_init), (gst_multifdsink_remove_client_link),
+       (is_sync_frame), (gst_multifdsink_handle_client_write),
+       (gst_multifdsink_render), (gst_multifdsink_start),
+       (gst_multifdsink_stop), (gst_multifdsink_change_state):
+       * gst/tcp/gstmultifdsink.h:
+       * gst/tcp/gsttcp.c: (gst_tcp_host_to_ip),
+       (gst_tcp_gdp_read_buffer), (gst_tcp_gdp_read_caps),
+       (gst_tcp_gdp_write_buffer), (gst_tcp_gdp_write_caps):
+       * gst/tcp/gsttcp.h:
+       * gst/tcp/gsttcpclientsink.c: (gst_tcpclientsink_class_init),
+       (gst_tcpclientsink_init), (gst_tcpclientsink_setcaps),
+       (gst_tcpclientsink_render), (gst_tcpclientsink_start),
+       (gst_tcpclientsink_stop), (gst_tcpclientsink_change_state):
+       * gst/tcp/gsttcpclientsink.h:
+       * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
+       (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
+       (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
+       (gst_tcpclientsrc_create), (gst_tcpclientsrc_start),
+       (gst_tcpclientsrc_stop), (gst_tcpclientsrc_unlock):
+       * gst/tcp/gsttcpclientsrc.h:
+       * gst/tcp/gsttcpplugin.c: (plugin_init):
+       * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init):
+       * gst/tcp/gsttcpserversink.h:
+       * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
+       (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
+       (gst_tcpserversrc_init), (gst_tcpserversrc_finalize),
+       (gst_tcpserversrc_create), (gst_tcpserversrc_start),
+       (gst_tcpserversrc_stop):
+       * gst/tcp/gsttcpserversrc.h:
+       * gst/tcp/gsttcpsink.c:
+       * gst/tcp/gsttcpsink.h:
+       * gst/tcp/gsttcpsrc.c:
+       * gst/tcp/gsttcpsrc.h:
+       Ported tcp plugins to 0.9. 
+       
+
 2005-07-05  Andy Wingo  <wingo@pobox.com>
 
        * gst/playback/gstplaybasebin.c (fill_buffer):
index b1970fe..198ba49 100644 (file)
@@ -237,6 +237,16 @@ fi
 
 AC_SUBST(GST_CONTROL_LIBS)
 
+dnl check for gstreamer-dataprotocol; uninstalled is selected preferentially
+PKG_CHECK_MODULES(GST_GDP, gstreamer-dataprotocol-$GST_MAJORMINOR >= $GST_REQ,
+  HAVE_GST_GDP="yes", HAVE_GST_GDP="no")
+
+if test "x$HAVE_GST_GDP" = "xno"; then
+  AC_MSG_ERROR(no GStreamer Dataprotocol Libs found)
+fi
+
+AC_SUBST(GST_GDP_LIBS)
+
 PKG_CHECK_MODULES(GST_BASE, gstreamer-base-$GST_MAJORMINOR >= $GST_REQ,
   HAVE_GST_BASE="yes", HAVE_GST_BASE="no")
 
@@ -375,6 +385,7 @@ GST_PLUGINS_ALL="\
        playback \
        sine \
         subparse \
+        tcp \
        typefind \
        videotestsrc \
        videorate \
@@ -883,6 +894,7 @@ gst/ffmpegcolorspace/Makefile
 gst/playback/Makefile
 gst/sine/Makefile
 gst/subparse/Makefile
+gst/tcp/Makefile
 gst/typefind/Makefile
 gst/videotestsrc/Makefile
 gst/videorate/Makefile
index 7690d2b..bf910db 100644 (file)
@@ -14,7 +14,6 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
 
 libgsttcp_la_SOURCES = \
        gsttcpplugin.c \
-       gsttcpsrc.c gsttcpsink.c \
        gsttcp.c \
        gstfdset.c \
        gstmultifdsink.c  \
@@ -27,11 +26,10 @@ nodist_libgsttcp_la_SOURCES = \
 # remove ENABLE_NEW when dataprotocol is stable
 libgsttcp_la_CFLAGS = $(GST_CFLAGS) -DGST_ENABLE_NEW
 libgsttcp_la_LIBADD = 
-libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
+libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_GDP_LIBS)
 
 noinst_HEADERS = \
   gsttcpplugin.h \
-  gsttcpsrc.h gsttcpsink.h \
   gsttcp.h \
   gstfdset.h \
   gstmultifdsink.h  \
index 29e26da..589d534 100644 (file)
@@ -4,8 +4,6 @@ This part of the documentation is for the new tcp elements:
 - tcpserversrc
 - tcpserversink
                                                                                 
-which are created to replace the old tcpsrc/tcpsink
-                                                                                
 TESTS
 -----
 Use these tests to test functionality of the various tcp plugins
@@ -31,33 +29,3 @@ TODO
 ----
 - implement DNS resolution
 
---------
-
-This is the old documentation for the original tcpsrc/tcpsink elements.
-
-* What is TCP src/sink?
-
-solution, like icecast or realaudio or whatever.
-But the future RTP plugins shall not do the actual transmission/reception
-of packets on the network themselve but the Application developer would be
-encouraged to use either the TCP or the UDP plugins for that. UDP would be
-used mostly but there could be situations where TCP would be the only
-available choice. For example streaming accross firewalls that do not
-allow UDP.
-
-* Shortcomings
-
-Even given our modest ambitions, the current code doesn't handle
-caps negotiation robustly.
-
-* Todo
-
-The caps nego should do bi-directional negotiation.
-
-Perhaps this plugin can be the example of how to do caps negotiation
-via a point-to-point protocol.
-
-12 Sep 2001
-Wim Taymans <wim.taymans@chello.be>
-Joshua N Pritikin <vishnu@pobox.com>
-Zeeshan Ali <zak147@yahoo.com>
index 5c70e3c..590cd4f 100644 (file)
@@ -66,6 +66,11 @@ GST_ELEMENT_DETAILS ("MultiFd sink",
     "Thomas Vander Stichele <thomas at apestaart dot org>, "
     "Wim Taymans <wim@fluendo.com>");
 
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
 GST_DEBUG_CATEGORY (multifdsink_debug);
 #define GST_CAT_DEFAULT (multifdsink_debug)
 
@@ -215,7 +220,8 @@ static void gst_multifdsink_init (GstMultiFdSink * multifdsink);
 static void gst_multifdsink_remove_client_link (GstMultiFdSink * sink,
     GList * link);
 
-static void gst_multifdsink_chain (GstPad * pad, GstData * _data);
+static GstFlowReturn gst_multifdsink_render (GstBaseSink * bsink,
+    GstBuffer * buf);
 static GstElementStateReturn gst_multifdsink_change_state (GstElement *
     element);
 
@@ -250,7 +256,7 @@ gst_multifdsink_get_type (void)
     };
 
     multifdsink_type =
-        g_type_register_static (GST_TYPE_ELEMENT, "GstMultiFdSink",
+        g_type_register_static (GST_TYPE_BASESINK, "GstMultiFdSink",
         &multifdsink_info, 0);
   }
   return multifdsink_type;
@@ -261,6 +267,9 @@ gst_multifdsink_base_init (gpointer g_class)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&sinktemplate));
+
   gst_element_class_set_details (element_class, &gst_multifdsink_details);
 }
 
@@ -269,11 +278,16 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstBaseSinkClass *gstbasesink_class;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
+  gstbasesink_class = (GstBaseSinkClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_BASESINK);
 
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+  gobject_class->set_property = gst_multifdsink_set_property;
+  gobject_class->get_property = gst_multifdsink_get_property;
 
   g_object_class_install_property (gobject_class, ARG_PROTOCOL,
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
@@ -375,11 +389,10 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
           client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED,
       G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS);
 
-  gobject_class->set_property = gst_multifdsink_set_property;
-  gobject_class->get_property = gst_multifdsink_get_property;
-
   gstelement_class->change_state = gst_multifdsink_change_state;
 
+  gstbasesink_class->render = gst_multifdsink_render;
+
   klass->add = gst_multifdsink_add;
   klass->remove = gst_multifdsink_remove;
   klass->clear = gst_multifdsink_clear;
@@ -391,11 +404,6 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass)
 static void
 gst_multifdsink_init (GstMultiFdSink * this)
 {
-  /* create the sink pad */
-  this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
-  gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
-  gst_pad_set_chain_function (this->sinkpad, gst_multifdsink_chain);
-
   GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
 
   this->protocol = DEFAULT_PROTOCOL;
@@ -636,7 +644,7 @@ gst_multifdsink_remove_client_link (GstMultiFdSink * sink, GList * link)
   client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
 
   /* free client buffers */
-  g_slist_foreach (client->sending, (GFunc) gst_data_unref, NULL);
+  g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL);
   g_slist_free (client->sending);
   client->sending = NULL;
 
@@ -777,9 +785,9 @@ gst_multifdsink_client_queue_caps (GstMultiFdSink * sink, GstTCPClient * client,
 static gboolean
 is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer)
 {
-  if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_DELTA_UNIT)) {
+  if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
     return FALSE;
-  } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_IN_CAPS)) {
+  } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) {
     return TRUE;
   }
   return FALSE;
@@ -934,7 +942,8 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
   /* when using GDP, first check if we have queued caps yet */
   if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
     if (!client->caps_sent) {
-      const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad));
+      const GstCaps *caps =
+          GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (sink)));
 
       /* queue caps for sending */
       res = gst_multifdsink_client_queue_caps (sink, client, caps);
@@ -1443,32 +1452,28 @@ gst_multifdsink_thread (GstMultiFdSink * sink)
   return NULL;
 }
 
-static void
-gst_multifdsink_chain (GstPad * pad, GstData * _data)
+static GstFlowReturn
+gst_multifdsink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
-  GstBuffer *buf = GST_BUFFER (_data);
   GstMultiFdSink *sink;
 
-  g_return_if_fail (pad != NULL);
-  g_return_if_fail (GST_IS_PAD (pad));
-  g_return_if_fail (buf != NULL);
-  sink = GST_MULTIFDSINK (GST_OBJECT_PARENT (pad));
-  g_return_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN));
+  sink = GST_MULTIFDSINK (bsink);
 
-  if (GST_IS_EVENT (buf)) {
-    g_warning ("FIXME: handle events");
-    return;
-  }
+  /* since we keep this buffer out of the scope of this method */
+  gst_buffer_ref (buf);
+
+  g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN),
+      GST_FLOW_ERROR);
 
   GST_LOG_OBJECT (sink, "received buffer %p", buf);
   /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS,
    * it means we're getting new streamheader buffers, and we should clear
    * the old ones */
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS) &&
+  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS) &&
       sink->previous_buffer_in_caps == FALSE) {
     GST_DEBUG_OBJECT (sink,
         "receiving new IN_CAPS buffers, clearing old streamheader");
-    g_slist_foreach (sink->streamheader, (GFunc) gst_data_unref, NULL);
+    g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL);
     g_slist_free (sink->streamheader);
     sink->streamheader = NULL;
   }
@@ -1478,13 +1483,13 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
    * After that we return, since we only send these out when we get
    * non IN_CAPS buffers so we properly keep track of clients that got
    * streamheaders. */
-  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) {
+  if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
     sink->previous_buffer_in_caps = TRUE;
     GST_DEBUG_OBJECT (sink,
         "appending IN_CAPS buffer with length %d to streamheader",
         GST_BUFFER_SIZE (buf));
     sink->streamheader = g_slist_append (sink->streamheader, buf);
-    return;
+    return GST_FLOW_OK;
   }
 
   sink->previous_buffer_in_caps = FALSE;
@@ -1492,6 +1497,8 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data)
   gst_multifdsink_queue_buffer (sink, buf);
 
   sink->bytes_to_serve += GST_BUFFER_SIZE (buf);
+
+  return GST_FLOW_OK;
 }
 
 static void
@@ -1617,21 +1624,24 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value,
 
 /* create a socket for sending to remote machine */
 static gboolean
-gst_multifdsink_init_send (GstMultiFdSink * this)
+gst_multifdsink_start (GstBaseSink * bsink)
 {
   GstMultiFdSinkClass *fclass;
   int control_socket[2];
+  GstMultiFdSink *this;
+
+  if (GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
+    return TRUE;
 
+  this = GST_MULTIFDSINK (bsink);
   fclass = GST_MULTIFDSINK_GET_CLASS (this);
 
   GST_INFO_OBJECT (this, "starting in mode %d", this->mode);
   this->fdset = gst_fdset_new (this->mode);
 
-  if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) {
-    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
-        GST_ERROR_SYSTEM);
-    return FALSE;
-  }
+  if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0)
+    goto socket_pair;
+
   READ_SOCKET (this).fd = control_socket[0];
   WRITE_SOCKET (this).fd = control_socket[1];
 
@@ -1653,16 +1663,31 @@ gst_multifdsink_init_send (GstMultiFdSink * this)
   this->thread = g_thread_create ((GThreadFunc) gst_multifdsink_thread,
       this, TRUE, NULL);
 
+  GST_FLAG_SET (this, GST_MULTIFDSINK_OPEN);
+
   return TRUE;
+
+  /* ERRORS */
+socket_pair:
+  {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL),
+        GST_ERROR_SYSTEM);
+    return FALSE;
+  }
 }
 
-static void
-gst_multifdsink_close (GstMultiFdSink * this)
+static gboolean
+gst_multifdsink_stop (GstBaseSink * bsink)
 {
   GstMultiFdSinkClass *fclass;
+  GstMultiFdSink *this;
 
+  this = GST_MULTIFDSINK (bsink);
   fclass = GST_MULTIFDSINK_GET_CLASS (this);
 
+  if (!GST_FLAG_IS_SET (bsink, GST_MULTIFDSINK_OPEN))
+    return TRUE;
+
   this->running = FALSE;
 
   SEND_COMMAND (this, CONTROL_STOP);
@@ -1678,7 +1703,7 @@ gst_multifdsink_close (GstMultiFdSink * this)
   close (WRITE_SOCKET (this).fd);
 
   if (this->streamheader) {
-    g_slist_foreach (this->streamheader, (GFunc) gst_data_unref, NULL);
+    g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL);
     g_slist_free (this->streamheader);
     this->streamheader = NULL;
   }
@@ -1691,46 +1716,55 @@ gst_multifdsink_close (GstMultiFdSink * this)
     gst_fdset_free (this->fdset);
     this->fdset = NULL;
   }
+  GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN);
+
+  return TRUE;
 }
 
 static GstElementStateReturn
 gst_multifdsink_change_state (GstElement * element)
 {
   GstMultiFdSink *sink;
+  gint transition;
+  GstElementStateReturn ret;
 
-  g_return_val_if_fail (GST_IS_MULTIFDSINK (element), GST_STATE_FAILURE);
   sink = GST_MULTIFDSINK (element);
 
   /* we disallow changing the state from the streaming thread */
   if (g_thread_self () == sink->thread)
     return GST_STATE_FAILURE;
 
-  switch (GST_STATE_TRANSITION (element)) {
+  transition = GST_STATE_TRANSITION (element);
+
+  switch (transition) {
     case GST_STATE_NULL_TO_READY:
-      if (!GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
-        if (!gst_multifdsink_init_send (sink))
-          return GST_STATE_FAILURE;
-        GST_FLAG_SET (sink, GST_MULTIFDSINK_OPEN);
-      }
+      if (!gst_multifdsink_start (GST_BASESINK (sink)))
+        goto start_failed;
       break;
     case GST_STATE_READY_TO_PAUSED:
       break;
     case GST_STATE_PAUSED_TO_PLAYING:
       break;
+    default:
+      break;
+  }
+
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+  switch (transition) {
     case GST_STATE_PLAYING_TO_PAUSED:
       break;
     case GST_STATE_PAUSED_TO_READY:
       break;
     case GST_STATE_READY_TO_NULL:
-      if (GST_FLAG_IS_SET (sink, GST_MULTIFDSINK_OPEN)) {
-        gst_multifdsink_close (GST_MULTIFDSINK (element));
-        GST_FLAG_UNSET (sink, GST_MULTIFDSINK_OPEN);
-      }
+      gst_multifdsink_stop (GST_BASESINK (sink));
       break;
   }
+  return ret;
 
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
-  return GST_STATE_SUCCESS;
+  /* ERRORS */
+start_failed:
+  {
+    return GST_STATE_FAILURE;
+  }
 }
index 6b8fc0b..00828df 100644 (file)
 #ifndef __GST_MULTIFDSINK_H__
 #define __GST_MULTIFDSINK_H__
 
-
 #include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
 
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
 
 #include "gsttcp.h"
 #include "gstfdset.h"
@@ -119,10 +117,7 @@ typedef struct {
 } GstTCPClient;
 
 struct _GstMultiFdSink {
-  GstElement element;
-
-  /* pad */
-  GstPad *sinkpad;
+  GstBaseSink element;
 
   guint64 bytes_to_serve; /* how much bytes we must serve */
   guint64 bytes_served; /* how much bytes have we served */
@@ -161,7 +156,7 @@ struct _GstMultiFdSink {
 };
 
 struct _GstMultiFdSinkClass {
-  GstElementClass parent_class;
+  GstBaseSinkClass parent_class;
 
   /* element methods */
   void                 (*add)          (GstMultiFdSink *sink, int fd);
@@ -187,10 +182,6 @@ void gst_multifdsink_remove (GstMultiFdSink *sink, int fd);
 void gst_multifdsink_clear (GstMultiFdSink *sink);
 GValueArray* gst_multifdsink_get_stats (GstMultiFdSink *sink, int fd);
 
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
+G_END_DECLS
 
 #endif /* __GST_MULTIFDSINK_H__ */
index 4e13956..99eee84 100644 (file)
@@ -56,35 +56,42 @@ gst_tcp_host_to_ip (GstElement * element, const gchar * host)
   struct in_addr addr;
 
   GST_DEBUG_OBJECT (element, "resolving host %s", host);
+
   /* first check if it already is an IP address */
   if (inet_aton (host, &addr)) {
     ip = g_strdup (host);
     goto beach;
   }
-
   /* FIXME: could do a localhost check here */
 
   /* perform a name lookup */
-  hostinfo = gethostbyname (host);
-  if (!hostinfo) {
-    GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
-        ("Could not find IP address for host \"%s\".", host));
-    return NULL;
-  }
+  if (!(hostinfo = gethostbyname (host)))
+    goto resolve_error;
 
-  if (hostinfo->h_addrtype != AF_INET) {
-    GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
-        ("host \"%s\" is not an IP host", host));
-    return NULL;
-  }
+  if (hostinfo->h_addrtype != AF_INET)
+    goto not_ip;
 
   addrs = hostinfo->h_addr_list;
+
   /* There could be more than one IP address, but we just return the first */
   ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
 
 beach:
   GST_DEBUG_OBJECT (element, "resolved to IP %s", ip);
   return ip;
+
+resolve_error:
+  {
+    GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
+        ("Could not find IP address for host \"%s\".", host));
+    return NULL;
+  }
+not_ip:
+  {
+    GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
+        ("host \"%s\" is not an IP host", host));
+    return NULL;
+  }
 }
 
 /* write buffer to given socket incrementally.
@@ -149,15 +156,14 @@ gst_tcp_socket_close (int *socket)
   *socket = -1;
 }
 
-/* read the gdp buffer header from the given socket
+/* read a buffer from the given socket
  * returns:
- * - a GstData representing a GstBuffer in which data should be read
- * - a GstData representing a GstEvent
+ * - a GstBuffer in which data should be read
  * - NULL, indicating a connection close or an error, to be handled with
  *         EOS
  */
-GstData *
-gst_tcp_gdp_read_header (GstElement * this, int socket)
+GstBuffer *
+gst_tcp_gdp_read_buffer (GstElement * this, int socket)
 {
   size_t header_length = GST_DP_HEADER_LENGTH;
   size_t readsize;
@@ -169,36 +175,51 @@ gst_tcp_gdp_read_header (GstElement * this, int socket)
   readsize = header_length;
 
   GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
-  ret = gst_tcp_socket_read (socket, header, readsize);
-  /* if we read 0 bytes, and we're blocking, we hit eos */
-  if (ret == 0) {
-    GST_DEBUG ("blocking read returns 0, returning NULL");
-    g_free (header);
-    return NULL;
-  }
-  if (ret < 0) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    g_free (header);
-    return NULL;
+  if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
+    goto read_error;
+
+  if (ret != readsize)
+    goto short_read;
+
+  if (!gst_dp_validate_header (header_length, header))
+    goto validate_error;
+
+  GST_LOG_OBJECT (this, "validated buffer packet header");
+
+  buffer = gst_dp_buffer_from_header (header_length, header);
+  g_free (header);
+
+  GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
+
+  return buffer;
+
+  /* ERRORS */
+read_error:
+  {
+    if (ret == 0) {
+      /* if we read 0 bytes, and we're blocking, we hit eos */
+      GST_DEBUG ("blocking read returns 0, returning NULL");
+      g_free (header);
+      return NULL;
+    } else {
+      GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+      g_free (header);
+      return NULL;
+    }
   }
-  if (ret != readsize) {
+short_read:
+  {
+    GST_WARNING ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
     g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
+    return NULL;
   }
-  g_assert (ret == readsize);
-
-  if (!gst_dp_validate_header (header_length, header)) {
+validate_error:
+  {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("GDP buffer packet header does not validate"));
     g_free (header);
     return NULL;
   }
-  GST_LOG_OBJECT (this, "validated buffer packet header");
-
-  buffer = gst_dp_buffer_from_header (header_length, header);
-  g_free (header);
-
-  GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
-  return GST_DATA (buffer);
 }
 
 /* read the GDP caps packet from the given socket
@@ -218,88 +239,122 @@ gst_tcp_gdp_read_caps (GstElement * this, int socket)
   readsize = header_length;
 
   GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
-  ret = gst_tcp_socket_read (socket, header, readsize);
-  if (ret < 0) {
-    g_free (header);
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    return NULL;
-  }
-  if (ret == 0) {
-    GST_WARNING_OBJECT (this, "read returned EOF");
-    return NULL;
+  if ((ret = gst_tcp_socket_read (socket, header, readsize)) <= 0)
+    goto read_error;
+
+  if (ret != readsize)
+    goto short_read;
+
+  if (!gst_dp_validate_header (header_length, header))
+    goto validate_error;
+
+  readsize = gst_dp_header_payload_length (header);
+  payload = g_malloc (readsize);
+
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
+  if ((ret = gst_tcp_socket_read (socket, payload, readsize)) < 0)
+    goto socket_read_error;
+
+  if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS)
+    goto is_not_caps;
+
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_payload (readsize, header, payload))
+    goto packet_validate_error;
+
+  caps = gst_dp_caps_from_packet (header_length, header, payload);
+  string = gst_caps_to_string (caps);
+  GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
+  g_free (string);
+
+  g_free (header);
+  g_free (payload);
+
+  return caps;
+
+  /* ERRORS */
+read_error:
+  {
+    if (ret < 0) {
+      g_free (header);
+      GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+      return NULL;
+    }
+    if (ret == 0) {
+      GST_WARNING_OBJECT (this, "read returned EOF");
+      return NULL;
+    }
   }
-  if (ret != readsize) {
+short_read:
+  {
     GST_WARNING_OBJECT (this, "Tried to read %d bytes but only read %d bytes",
         readsize, ret);
     return NULL;
   }
-
-  if (!gst_dp_validate_header (header_length, header)) {
+validate_error:
+  {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("GDP caps packet header does not validate"));
     g_free (header);
     return NULL;
   }
-
-  readsize = gst_dp_header_payload_length (header);
-  payload = g_malloc (readsize);
-  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
-  ret = gst_tcp_socket_read (socket, payload, readsize);
-
-  if (ret < 0) {
+socket_read_error:
+  {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
     g_free (header);
     g_free (payload);
     return NULL;
   }
-  if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) {
+is_not_caps:
+  {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("Header read doesn't describe CAPS payload"));
     g_free (header);
     g_free (payload);
     return NULL;
   }
-  g_assert (ret == readsize);
-
-  if (!gst_dp_validate_payload (readsize, header, payload)) {
+packet_validate_error:
+  {
     GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
         ("GDP caps packet payload does not validate"));
     g_free (header);
     g_free (payload);
     return NULL;
   }
-
-  caps = gst_dp_caps_from_packet (header_length, header, payload);
-  string = gst_caps_to_string (caps);
-  GST_LOG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
-  g_free (string);
-
-  g_free (header);
-  g_free (payload);
-
-  return caps;
 }
 
 /* write a GDP header to the socket.  Return false if fails. */
 gboolean
-gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
+gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer,
     gboolean fatal, const gchar * host, int port)
 {
   guint length;
   guint8 *header;
   size_t wrote;
 
-  if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) {
+  if (!gst_dp_header_from_buffer (buffer, 0, &length, &header))
+    goto create_error;
+
+  GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
+  wrote = gst_tcp_socket_write (socket, header, length);
+  g_free (header);
+
+  if (wrote != length)
+    goto write_error;
+
+  return TRUE;
+
+  /* ERRORS */
+create_error:
+  {
     if (fatal)
       GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
           ("Could not create GDP header from buffer"));
     return FALSE;
   }
-
-  GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
-  wrote = gst_tcp_socket_write (socket, header, length);
-  g_free (header);
-  if (wrote != length) {
+write_error:
+  {
     if (fatal)
       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
           (_("Error while sending data to \"%s:%d\"."), host, port),
@@ -307,8 +362,6 @@ gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
               wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
     return FALSE;
   }
-
-  return TRUE;
 }
 
 /* write GDP header and payload to the given socket for the given caps.
@@ -322,15 +375,36 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
   guint8 *payload;
   size_t wrote;
 
-  if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+  if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload))
+    goto create_error;
+
+  GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
+  wrote = gst_tcp_socket_write (socket, header, length);
+  if (wrote != length)
+    goto write_header_error;
+
+  length = gst_dp_header_payload_length (header);
+  g_free (header);
+
+  GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
+  wrote = gst_tcp_socket_write (socket, payload, length);
+  g_free (payload);
+
+  if (wrote != length)
+    goto write_payload_error;
+
+  return TRUE;
+
+  /* ERRORS */
+create_error:
+  {
     if (fatal)
       GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
           ("Could not create GDP packet from caps"));
     return FALSE;
   }
-  GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
-  wrote = gst_tcp_socket_write (socket, header, length);
-  if (wrote != length) {
+write_header_error:
+  {
     g_free (header);
     g_free (payload);
     if (fatal)
@@ -340,13 +414,8 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
               wrote, length, g_strerror (errno)));
     return FALSE;
   }
-
-  length = gst_dp_header_payload_length (header);
-  g_free (header);
-  GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
-  wrote = gst_tcp_socket_write (socket, payload, length);
-  g_free (payload);
-  if (wrote != length) {
+write_payload_error:
+  {
     if (fatal)
       GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
           (_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
@@ -354,5 +423,4 @@ gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
               wrote, length, g_strerror (errno)));
     return FALSE;
   }
-  return TRUE;
 }
index cce6ad2..63ed526 100644 (file)
@@ -46,11 +46,13 @@ gint gst_tcp_socket_read (int socket, void *buf, size_t count);
 
 void gst_tcp_socket_close (int *socket);
 
-GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
-GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
+GstBuffer * gst_tcp_gdp_read_buffer (GstElement *elem, int socket);
+GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket);
+GstCaps * gst_tcp_gdp_read_caps (GstElement *elem, int socket);
 
-gboolean gst_tcp_gdp_write_header (GstElement *this, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
-gboolean gst_tcp_gdp_write_caps (GstElement *this, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_caps (GstElement *elem, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
 
 G_END_DECLS
 
index 424b098..4e3f5a8 100644 (file)
@@ -58,10 +58,9 @@ static void gst_tcpclientsink_class_init (GstTCPClientSink * klass);
 static void gst_tcpclientsink_init (GstTCPClientSink * tcpclientsink);
 static void gst_tcpclientsink_finalize (GObject * gobject);
 
-static void gst_tcpclientsink_set_clock (GstElement * element,
-    GstClock * clock);
-
-static void gst_tcpclientsink_chain (GstPad * pad, GstData * _data);
+static gboolean gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps);
+static GstFlowReturn gst_tcpclientsink_render (GstBaseSink * bsink,
+    GstBuffer * buf);
 static GstElementStateReturn gst_tcpclientsink_change_state (GstElement *
     element);
 
@@ -115,11 +114,17 @@ gst_tcpclientsink_class_init (GstTCPClientSink * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstBaseSinkClass *gstbasesink_class;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
+  gstbasesink_class = (GstBaseSinkClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_BASESINK);
 
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+  gobject_class->set_property = gst_tcpclientsink_set_property;
+  gobject_class->get_property = gst_tcpclientsink_get_property;
+  gobject_class->finalize = gst_tcpclientsink_finalize;
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
       g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
@@ -131,44 +136,24 @@ gst_tcpclientsink_class_init (GstTCPClientSink * klass)
       g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
           GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
           G_PARAM_READWRITE));
-  gobject_class->set_property = gst_tcpclientsink_set_property;
-  gobject_class->get_property = gst_tcpclientsink_get_property;
-  gobject_class->finalize = gst_tcpclientsink_finalize;
 
   gstelement_class->change_state = gst_tcpclientsink_change_state;
-  gstelement_class->set_clock = gst_tcpclientsink_set_clock;
-
-  GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
-}
-
-static void
-gst_tcpclientsink_set_clock (GstElement * element, GstClock * clock)
-{
-  GstTCPClientSink *tcpclientsink;
 
-  tcpclientsink = GST_TCPCLIENTSINK (element);
+  gstbasesink_class->set_caps = gst_tcpclientsink_setcaps;
+  gstbasesink_class->render = gst_tcpclientsink_render;
 
-  tcpclientsink->clock = clock;
+  GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
 }
 
 static void
 gst_tcpclientsink_init (GstTCPClientSink * this)
 {
-  /* create the sink pad */
-  this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
-  gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
-  gst_pad_set_chain_function (this->sinkpad, gst_tcpclientsink_chain);
-
   this->host = g_strdup (TCP_DEFAULT_HOST);
   this->port = TCP_DEFAULT_PORT;
-  /* should support as minimum 576 for IPV4 and 1500 for IPV6 */
-  /* this->mtu = 1500; */
 
   this->sock_fd = -1;
   this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
   GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
-
-  this->clock = NULL;
 }
 
 static void
@@ -179,24 +164,12 @@ gst_tcpclientsink_finalize (GObject * gobject)
   g_free (this->host);
 }
 
-static void
-gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
+static gboolean
+gst_tcpclientsink_setcaps (GstBaseSink * bsink, GstCaps * caps)
 {
-  size_t wrote = 0;
-
-  GstBuffer *buf = GST_BUFFER (_data);
   GstTCPClientSink *sink;
 
-  g_return_if_fail (pad != NULL);
-  g_return_if_fail (GST_IS_PAD (pad));
-  g_return_if_fail (buf != NULL);
-  sink = GST_TCPCLIENTSINK (GST_OBJECT_PARENT (pad));
-  g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN));
-
-  if (GST_IS_EVENT (buf)) {
-    g_warning ("FIXME: handl events");
-    return;
-  }
+  sink = GST_TCPCLIENTSINK (bsink);
 
   /* write the buffer header if we have one */
   switch (sink->protocol) {
@@ -209,44 +182,85 @@ gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
         const GstCaps *caps;
         gchar *string;
 
-        caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
+        caps = GST_PAD_CAPS (GST_PAD_PEER (GST_BASESINK_PAD (bsink)));
         string = gst_caps_to_string (caps);
         GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
-        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
-                TRUE, sink->host, sink->port)) {
-          g_free (string);
-          return;
-        }
         g_free (string);
+
+        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
+                TRUE, sink->host, sink->port))
+          goto gdp_write_error;
+
         sink->caps_sent = TRUE;
       }
+      break;
+    default:
+      g_warning ("Unhandled protocol type");
+      break;
+  }
+
+  return TRUE;
+
+  /* ERRORS */
+gdp_write_error:
+  {
+    return FALSE;
+  }
+}
+
+static GstFlowReturn
+gst_tcpclientsink_render (GstBaseSink * bsink, GstBuffer * buf)
+{
+  size_t wrote = 0;
+  GstTCPClientSink *sink;
+  gint size;
+
+  sink = GST_TCPCLIENTSINK (bsink);
+
+  g_return_val_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN),
+      GST_FLOW_WRONG_STATE);
+
+  size = GST_BUFFER_SIZE (buf);
+
+  GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", size);
+
+  /* write the buffer header if we have one */
+  switch (sink->protocol) {
+    case GST_TCP_PROTOCOL_TYPE_NONE:
+      break;
+    case GST_TCP_PROTOCOL_TYPE_GDP:
       GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
-      if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), sink->sock_fd, buf,
+      if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd, buf,
               TRUE, sink->host, sink->port))
-        return;
+        goto gdp_write_error;
       break;
     default:
-      g_warning ("Unhandled protocol type");
       break;
   }
 
-  GST_LOG_OBJECT (sink, "writing %d bytes for buffer data",
-      GST_BUFFER_SIZE (buf));
-  wrote =
-      gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf),
-      GST_BUFFER_SIZE (buf));
+  /* write buffer data */
+  wrote = gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf), size);
+
+  if (wrote < size)
+    goto write_error;
 
-  if (wrote < GST_BUFFER_SIZE (buf)) {
+  sink->data_written += wrote;
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+gdp_write_error:
+  {
+    return FALSE;
+  }
+write_error:
+  {
     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
         (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
         ("Only %d of %d bytes written: %s",
             wrote, GST_BUFFER_SIZE (buf), g_strerror (errno)));
+    return GST_FLOW_ERROR;
   }
-  sink->data_written += wrote;
-
-  gst_buffer_unref (buf);
-
-  /* FIXME: emit signal ? */
 }
 
 static void
@@ -311,11 +325,14 @@ gst_tcpclientsink_get_property (GObject * object, guint prop_id, GValue * value,
 
 /* create a socket for sending to remote machine */
 static gboolean
-gst_tcpclientsink_init_send (GstTCPClientSink * this)
+gst_tcpclientsink_start (GstTCPClientSink * this)
 {
   int ret;
   gchar *ip;
 
+  if (GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN))
+    return TRUE;
+
   /* reset caps_sent flag */
   this->caps_sent = FALSE;
 
@@ -373,34 +390,53 @@ gst_tcpclientsink_init_send (GstTCPClientSink * this)
   return TRUE;
 }
 
-static void
-gst_tcpclientsink_close (GstTCPClientSink * this)
+static gboolean
+gst_tcpclientsink_stop (GstTCPClientSink * this)
 {
+  if (!GST_FLAG_IS_SET (this, GST_TCPCLIENTSINK_OPEN))
+    return TRUE;
+
   if (this->sock_fd != -1) {
     close (this->sock_fd);
     this->sock_fd = -1;
   }
 
   GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
+
+  return TRUE;
 }
 
 static GstElementStateReturn
 gst_tcpclientsink_change_state (GstElement * element)
 {
-  g_return_val_if_fail (GST_IS_TCPCLIENTSINK (element), GST_STATE_FAILURE);
-
-  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN))
-      gst_tcpclientsink_close (GST_TCPCLIENTSINK (element));
-  } else {
-    if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) {
-      if (!gst_tcpclientsink_init_send (GST_TCPCLIENTSINK (element)))
-        return GST_STATE_FAILURE;
-    }
+  GstTCPClientSink *sink;
+  gint transition;
+  GstElementStateReturn res;
+
+  sink = GST_TCPCLIENTSINK (element);
+  transition = GST_STATE_TRANSITION (element);
+
+  switch (transition) {
+    case GST_STATE_NULL_TO_READY:
+    case GST_STATE_READY_TO_PAUSED:
+      if (!gst_tcpclientsink_start (GST_TCPCLIENTSINK (element)))
+        goto start_failure;
+      break;
+    default:
+      break;
   }
+  res = GST_ELEMENT_CLASS (parent_class)->change_state (element);
 
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+  switch (transition) {
+    case GST_STATE_READY_TO_NULL:
+      gst_tcpclientsink_stop (GST_TCPCLIENTSINK (element));
+    default:
+      break;
+  }
+  return res;
 
-  return GST_STATE_SUCCESS;
+start_failure:
+  {
+    return GST_STATE_FAILURE;
+  }
 }
index 9cd657a..f7254e2 100644 (file)
 
 
 #include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+
 #include "gsttcp.h"
 
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -65,10 +65,7 @@ typedef enum {
 } GstTCPClientSinkFlags;
 
 struct _GstTCPClientSink {
-  GstElement element;
-
-  /* pad */
-  GstPad *sinkpad;
+  GstBaseSink element;
 
   /* server information */
   int port;
@@ -81,21 +78,14 @@ struct _GstTCPClientSink {
   size_t data_written; /* how much bytes have we written ? */
   GstTCPProtocolType protocol; /* used with the protocol enum */
   gboolean caps_sent; /* whether or not we sent caps already */
-
-  guint mtu;
-  GstClock *clock;
 };
 
 struct _GstTCPClientSinkClass {
-  GstElementClass parent_class;
+  GstBaseSinkClass parent_class;
 };
 
 GType gst_tcpclientsink_get_type(void);
 
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
+G_END_DECLS
 
 #endif /* __GST_TCPCLIENTSINK_H__ */
index 0d96a74..8100164 100644 (file)
@@ -47,6 +47,11 @@ GST_ELEMENT_DETAILS ("TCP Client source",
     "Receive data as a client over the network via TCP",
     "Thomas Vander Stichele <thomas at apestaart dot org>");
 
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
 /* TCPClientSrc signals and args */
 enum
 {
@@ -66,17 +71,18 @@ static void gst_tcpclientsrc_class_init (GstTCPClientSrc * klass);
 static void gst_tcpclientsrc_init (GstTCPClientSrc * tcpclientsrc);
 static void gst_tcpclientsrc_finalize (GObject * gobject);
 
-static GstCaps *gst_tcpclientsrc_getcaps (GstPad * pad);
+static GstCaps *gst_tcpclientsrc_getcaps (GstBaseSrc * psrc);
 
-static GstData *gst_tcpclientsrc_get (GstPad * pad);
-static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement *
-    element);
+static GstFlowReturn gst_tcpclientsrc_create (GstPushSrc * psrc,
+    GstBuffer ** outbuf);
+static gboolean gst_tcpclientsrc_stop (GstBaseSrc * bsrc);
+static gboolean gst_tcpclientsrc_start (GstBaseSrc * bsrc);
+static gboolean gst_tcpclientsrc_unlock (GstBaseSrc * bsrc);
 
 static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
-static void gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock);
 
 static GstElementClass *parent_class = NULL;
 
@@ -87,7 +93,6 @@ gst_tcpclientsrc_get_type (void)
 {
   static GType tcpclientsrc_type = 0;
 
-
   if (!tcpclientsrc_type) {
     static const GTypeInfo tcpclientsrc_info = {
       sizeof (GstTCPClientSrcClass),
@@ -103,7 +108,7 @@ gst_tcpclientsrc_get_type (void)
     };
 
     tcpclientsrc_type =
-        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSrc",
+        g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPClientSrc",
         &tcpclientsrc_info, 0);
   }
   return tcpclientsrc_type;
@@ -114,6 +119,9 @@ gst_tcpclientsrc_base_init (gpointer g_class)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&srctemplate));
+
   gst_element_class_set_details (element_class, &gst_tcpclientsrc_details);
 }
 
@@ -122,11 +130,19 @@ gst_tcpclientsrc_class_init (GstTCPClientSrc * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstBaseSrcClass *gstbasesrc_class;
+  GstPushSrcClass *gstpushsrc_class;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
+  gstbasesrc_class = (GstBaseSrcClass *) klass;
+  gstpushsrc_class = (GstPushSrcClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
 
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+  gobject_class->set_property = gst_tcpclientsrc_set_property;
+  gobject_class->get_property = gst_tcpclientsrc_get_property;
+  gobject_class->finalize = gst_tcpclientsrc_finalize;
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
       g_param_spec_string ("host", "Host",
@@ -140,43 +156,28 @@ gst_tcpclientsrc_class_init (GstTCPClientSrc * klass)
           GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
           G_PARAM_READWRITE));
 
-  gobject_class->set_property = gst_tcpclientsrc_set_property;
-  gobject_class->get_property = gst_tcpclientsrc_get_property;
-  gobject_class->finalize = gst_tcpclientsrc_finalize;
+  gstbasesrc_class->get_caps = gst_tcpclientsrc_getcaps;
+  gstbasesrc_class->start = gst_tcpclientsrc_start;
+  gstbasesrc_class->stop = gst_tcpclientsrc_stop;
+  gstbasesrc_class->unlock = gst_tcpclientsrc_unlock;
 
-  gstelement_class->change_state = gst_tcpclientsrc_change_state;
-  gstelement_class->set_clock = gst_tcpclientsrc_set_clock;
+  gstpushsrc_class->create = gst_tcpclientsrc_create;
 
   GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
       "TCP Client Source");
 }
 
 static void
-gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock)
-{
-  GstTCPClientSrc *tcpclientsrc;
-
-  tcpclientsrc = GST_TCPCLIENTSRC (element);
-
-  tcpclientsrc->clock = clock;
-}
-
-static void
 gst_tcpclientsrc_init (GstTCPClientSrc * this)
 {
-  /* create the src pad */
-  this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
-  gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
-  gst_pad_set_get_function (this->srcpad, gst_tcpclientsrc_get);
-  gst_pad_set_getcaps_function (this->srcpad, gst_tcpclientsrc_getcaps);
-
   this->port = TCP_DEFAULT_PORT;
   this->host = g_strdup (TCP_DEFAULT_HOST);
-  this->clock = NULL;
   this->sock_fd = -1;
   this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
-  this->curoffset = 0;
   this->caps = NULL;
+  this->curoffset = 0;
+
+  gst_base_src_set_live (GST_BASESRC (this), TRUE);
 
   GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
 }
@@ -190,12 +191,12 @@ gst_tcpclientsrc_finalize (GObject * gobject)
 }
 
 static GstCaps *
-gst_tcpclientsrc_getcaps (GstPad * pad)
+gst_tcpclientsrc_getcaps (GstBaseSrc * bsrc)
 {
   GstTCPClientSrc *src;
   GstCaps *caps = NULL;
 
-  src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
+  src = GST_TCPCLIENTSRC (bsrc);
 
   if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN))
     caps = gst_caps_new_any ();
@@ -208,77 +209,20 @@ gst_tcpclientsrc_getcaps (GstPad * pad)
   return caps;
 }
 
-/* close the socket and associated resources
- * unset OPEN flag
- * used both to recover from errors and go to NULL state */
-static void
-gst_tcpclientsrc_close (GstTCPClientSrc * this)
-{
-  GST_DEBUG_OBJECT (this, "closing socket");
-  if (this->sock_fd != -1) {
-    close (this->sock_fd);
-    this->sock_fd = -1;
-  }
-  this->caps_received = FALSE;
-  if (this->caps) {
-    gst_caps_free (this->caps);
-    this->caps = NULL;
-  }
-  GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
-}
-
-/* close socket and related items and return an EOS GstData
- * called from _get */
-static GstData *
-gst_tcpclientsrc_eos (GstTCPClientSrc * src)
-{
-  GST_DEBUG_OBJECT (src, "going to EOS");
-  gst_element_set_eos (GST_ELEMENT (src));
-  gst_tcpclientsrc_close (src);
-  return GST_DATA (gst_event_new (GST_EVENT_EOS));
-}
-
-static GstData *
-gst_tcpclientsrc_get (GstPad * pad)
+static GstFlowReturn
+gst_tcpclientsrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPClientSrc *src;
   size_t readsize;
   int ret;
-
-  GstData *data = NULL;
   GstBuffer *buf = NULL;
 
-  g_return_val_if_fail (pad != NULL, NULL);
-  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
-  src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
-  if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN)) {
-    GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
-    return NULL;
-  }
-  GST_LOG_OBJECT (src, "asked for a buffer");
+  src = GST_TCPCLIENTSRC (psrc);
 
-  /* try to negotiate here */
-  if (!gst_pad_is_negotiated (pad)) {
-    if (GST_PAD_LINK_FAILED (gst_pad_renegotiate (pad))) {
-      GST_ELEMENT_ERROR (src, CORE, NEGOTIATION, (NULL), GST_ERROR_SYSTEM);
-      gst_buffer_unref (buf);
-      return gst_tcpclientsrc_eos (src);
-    }
-  }
+  if (!GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN))
+    goto wrong_state;
 
-  /* if we have a left over buffer after a discont, return that */
-  if (src->buffer_after_discont) {
-    buf = src->buffer_after_discont;
-    GST_LOG_OBJECT (src,
-        "Returning buffer after discont of size %d, ts %"
-        GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
-        ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
-        GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
-        GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
-        GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
-    src->buffer_after_discont = NULL;
-    return GST_DATA (buf);
-  }
+  GST_LOG_OBJECT (src, "asked for a buffer");
 
   /* read the buffer header if we're using a protocol */
   switch (src->protocol) {
@@ -288,101 +232,51 @@ gst_tcpclientsrc_get (GstPad * pad)
       /* do a blocking select on the socket */
       FD_ZERO (&testfds);
       FD_SET (src->sock_fd, &testfds);
-      ret = select (src->sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, 0);
+
       /* no action (0) is an error too in our case */
-      if (ret <= 0) {
-        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-            ("select failed: %s", g_strerror (errno)));
-        return gst_tcpclientsrc_eos (src);
-      }
+      if ((ret = select (src->sock_fd + 1, &testfds, NULL, NULL, 0)) <= 0)
+        goto select_error;
 
       /* ask how much is available for reading on the socket */
-      ret = ioctl (src->sock_fd, FIONREAD, &readsize);
-      if (ret < 0) {
-        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-            ("ioctl failed: %s", g_strerror (errno)));
-        return gst_tcpclientsrc_eos (src);
-      }
+      if ((ret = ioctl (src->sock_fd, FIONREAD, &readsize)) < 0)
+        goto ioctl_error;
+
       GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize);
+
       buf = gst_buffer_new_and_alloc (readsize);
       break;
-    case GST_TCP_PROTOCOL_TYPE_GDP:
-      if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
-        return gst_tcpclientsrc_eos (src);
-      }
-      if (GST_IS_EVENT (data)) {
-        /* if we got back an EOS event, then we should go into eos ourselves */
-        if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
-          gst_event_unref (data);
-          return gst_tcpclientsrc_eos (src);
-        }
-        return data;
-      }
 
-      buf = GST_BUFFER (data);
+    case GST_TCP_PROTOCOL_TYPE_GDP:
+      if (!(buf = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd)))
+        goto hit_eos;
 
       GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
           buf);
+
       /* use this new buffer to read data into */
       readsize = GST_BUFFER_SIZE (buf);
       break;
     default:
-      g_warning ("Unhandled protocol type");
+      /* need to assert as buf == NULL */
+      g_assert ("Unhandled protocol type");
       break;
   }
 
   GST_LOG_OBJECT (src, "Reading %d bytes into buffer", readsize);
-  ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
-  if (ret < 0) {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_buffer_unref (buf);
-    return gst_tcpclientsrc_eos (src);
-  }
+  if ((ret =
+          gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf),
+              readsize)) < 0)
+    goto read_error;
 
   /* if we read 0 bytes, and we're blocking, we hit eos */
-  if (ret == 0) {
-    GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
-    gst_buffer_unref (buf);
-    return gst_tcpclientsrc_eos (src);
-  }
+  if (ret == 0)
+    goto zero_read;
 
   readsize = ret;
   GST_BUFFER_SIZE (buf) = readsize;
-  GST_BUFFER_MAXSIZE (buf) = readsize;
-
-  /* FIXME: we could decide to set OFFSET and OFFSET_END for non-protocol
-   * streams to mean the bytes processed */
-
-  /* if this is our first buffer, we need to send a discont with the
-   * given timestamp or the current offset, and store the buffer for
-   * the next iteration through the get loop */
-  if (src->send_discont) {
-    GstClockTime timestamp;
-    GstEvent *event;
-
-    src->send_discont = FALSE;
-    src->buffer_after_discont = buf;
-    /* if the timestamp is valid, send a timed discont
-     * taking into account the incoming buffer's timestamps */
-    timestamp = GST_BUFFER_TIMESTAMP (buf);
-    if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
-      GST_DEBUG_OBJECT (src,
-          "sending discontinuous with timestamp %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (timestamp));
-      event =
-          gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, timestamp, NULL);
-      return GST_DATA (event);
-    }
-    /* otherwise, send an offset discont */
-    GST_DEBUG_OBJECT (src, "sending discontinuous with offset %d",
-        src->curoffset);
-    event =
-        gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset,
-        NULL);
-    return GST_DATA (event);
-  }
 
   src->curoffset += readsize;
+
   GST_LOG_OBJECT (src,
       "Returning buffer from _get of size %d, ts %"
       GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
@@ -390,7 +284,47 @@ gst_tcpclientsrc_get (GstPad * pad)
       GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
       GST_TIME_ARGS (GST_BUFFER_DURATION (buf)),
       GST_BUFFER_OFFSET (buf), GST_BUFFER_OFFSET_END (buf));
-  return GST_DATA (buf);
+
+  gst_buffer_set_caps (buf, src->caps);
+
+  *outbuf = buf;
+
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+wrong_state:
+  {
+    GST_DEBUG_OBJECT (src, "connection to server closed, cannot give data");
+    return GST_FLOW_WRONG_STATE;
+  }
+select_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("select failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+ioctl_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("ioctl failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+hit_eos:
+  {
+    return GST_FLOW_WRONG_STATE;
+  }
+read_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    gst_buffer_unref (buf);
+    return GST_FLOW_ERROR;
+  }
+zero_read:
+  {
+    GST_DEBUG_OBJECT (src, "blocking read returns 0, EOS");
+    gst_buffer_unref (buf);
+    return GST_FLOW_WRONG_STATE;
+  }
 }
 
 static void
@@ -453,109 +387,125 @@ gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
 
 /* create a socket for connecting to remote server */
 static gboolean
-gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
+gst_tcpclientsrc_start (GstBaseSrc * bsrc)
 {
   int ret;
   gchar *ip;
+  GstTCPClientSrc *src = GST_TCPCLIENTSRC (bsrc);
 
   /* create receiving client socket */
-  GST_DEBUG_OBJECT (this, "opening receiving client socket to %s:%d",
-      this->host, this->port);
-  if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
-    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
-    return FALSE;
-  }
-  GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
-      this->sock_fd);
-  GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
+  GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
+      src->host, src->port);
+
+  if ((src->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+    goto no_socket;
+
+  GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d",
+      src->sock_fd);
+  GST_FLAG_SET (src, GST_TCPCLIENTSRC_OPEN);
 
   /* look up name if we need to */
-  ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
-  if (!ip) {
-    gst_tcpclientsrc_close (this);
-    return FALSE;
-  }
-  GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
+  if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
+    goto name_resolv;
+
+  GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
 
   /* connect to server */
-  memset (&this->server_sin, 0, sizeof (this->server_sin));
-  this->server_sin.sin_family = AF_INET;        /* network socket */
-  this->server_sin.sin_port = htons (this->port);       /* on port */
-  this->server_sin.sin_addr.s_addr = inet_addr (ip);    /* on host ip */
+  memset (&src->server_sin, 0, sizeof (src->server_sin));
+  src->server_sin.sin_family = AF_INET; /* network socket */
+  src->server_sin.sin_port = htons (src->port); /* on port */
+  src->server_sin.sin_addr.s_addr = inet_addr (ip);     /* on host ip */
   g_free (ip);
 
-  GST_DEBUG_OBJECT (this, "connecting to server");
-  ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
-      sizeof (this->server_sin));
+  GST_DEBUG_OBJECT (src, "connecting to server");
+  ret = connect (src->sock_fd, (struct sockaddr *) &src->server_sin,
+      sizeof (src->server_sin));
 
   if (ret) {
-    gst_tcpclientsrc_close (this);
+    gst_tcpclientsrc_stop (GST_BASESRC (src));
     switch (errno) {
       case ECONNREFUSED:
-        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
-            (_("Connection to %s:%d refused."), this->host, this->port),
-            (NULL));
+        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
+            (_("Connection to %s:%d refused."), src->host, src->port), (NULL));
         return FALSE;
         break;
       default:
-        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
-            ("connect to %s:%d failed: %s", this->host, this->port,
+        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+            ("connect to %s:%d failed: %s", src->host, src->port,
                 g_strerror (errno)));
         return FALSE;
         break;
     }
   }
 
-  this->send_discont = TRUE;
-  this->buffer_after_discont = NULL;
-
   /* get the caps if we're using GDP */
-  if (this->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
+  if (src->protocol == GST_TCP_PROTOCOL_TYPE_GDP) {
     /* if we haven't received caps yet, we should get them first */
-    if (!this->caps_received) {
+    if (!src->caps_received) {
       GstCaps *caps;
 
-      GST_DEBUG_OBJECT (this, "getting caps through GDP");
-      if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (this), this->sock_fd))) {
-        gst_tcpclientsrc_close (this);
-        GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
-            ("Could not read caps through GDP"));
-        return FALSE;
-      }
-      if (!GST_IS_CAPS (caps)) {
-        gst_tcpclientsrc_close (this);
-        GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
-            ("Could not read caps through GDP"));
-        return FALSE;
-      }
-      GST_DEBUG_OBJECT (this, "Received caps through GDP: %" GST_PTR_FORMAT,
+      GST_DEBUG_OBJECT (src, "getting caps through GDP");
+      if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd)))
+        goto no_caps;
+
+      if (!GST_IS_CAPS (caps))
+        goto no_caps;
+
+      GST_DEBUG_OBJECT (src, "Received caps through GDP: %" GST_PTR_FORMAT,
           caps);
-      this->caps_received = TRUE;
-      this->caps = caps;
+
+      src->caps_received = TRUE;
+      src->caps = caps;
     }
   }
   return TRUE;
+
+no_socket:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+name_resolv:
+  {
+    gst_tcpclientsrc_stop (GST_BASESRC (src));
+    return FALSE;
+  }
+no_caps:
+  {
+    gst_tcpclientsrc_stop (GST_BASESRC (src));
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Could not read caps through GDP"));
+    return FALSE;
+  }
 }
 
-static GstElementStateReturn
-gst_tcpclientsrc_change_state (GstElement * element)
+/* close the socket and associated resources
+ * unset OPEN flag
+ * used both to recover from errors and go to NULL state */
+static gboolean
+gst_tcpclientsrc_stop (GstBaseSrc * bsrc)
 {
-  g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
+  GstTCPClientSrc *src;
 
-  /* if open and going to NULL, close it */
-  if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
-      GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
+  src = GST_TCPCLIENTSRC (bsrc);
+
+  GST_DEBUG_OBJECT (src, "closing socket");
+  if (src->sock_fd != -1) {
+    close (src->sock_fd);
+    src->sock_fd = -1;
   }
-  /* if closed and going to a state higher than NULL, open it */
-  if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN) &&
-      GST_STATE_PENDING (element) > GST_STATE_NULL) {
-    if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
-      return GST_STATE_FAILURE;
+  src->caps_received = FALSE;
+  if (src->caps) {
+    gst_caps_unref (src->caps);
+    src->caps = NULL;
   }
+  GST_FLAG_UNSET (src, GST_TCPCLIENTSRC_OPEN);
 
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+  return TRUE;
+}
 
-  return GST_STATE_SUCCESS;
+static gboolean
+gst_tcpclientsrc_unlock (GstBaseSrc * bsrc)
+{
+  return TRUE;
 }
index f09a0b2..2ccaa39 100644 (file)
 #define __GST_TCPCLIENTSRC_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
 
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
 
 #include <netdb.h>                        /* sockaddr_in */
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>                          /* sockaddr_in */
 #include <unistd.h>
+
 #include "gsttcp.h"
 
 #define GST_TYPE_TCPCLIENTSRC \
@@ -56,10 +56,7 @@ typedef enum {
 } GstTCPClientSrcFlags;
 
 struct _GstTCPClientSrc {
-  GstElement element;
-
-  /* pad */
-  GstPad *srcpad;
+  GstPushSrc element;
 
   /* server information */
   int port;
@@ -75,21 +72,14 @@ struct _GstTCPClientSrc {
   GstTCPProtocolType protocol; /* protocol used for reading data */
   gboolean caps_received;      /* if we have received caps yet */
   GstCaps *caps;
-  GstClock *clock;
-
-  gboolean send_discont;       /* TRUE when we need to send a discont */
-  GstBuffer *buffer_after_discont; /* temporary storage for buffer */
 };
 
 struct _GstTCPClientSrcClass {
-  GstElementClass parent_class;
+  GstPushSrcClass parent_class;
 };
 
 GType gst_tcpclientsrc_get_type (void);
 
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_END_DECLS
 
 #endif /* __GST_TCPCLIENTSRC_H__ */
index 369bdd3..8ab6471 100644 (file)
@@ -21,8 +21,7 @@
 #include "config.h"
 #endif
 
-#include "gsttcpsrc.h"
-#include "gsttcpsink.h"
+#include <gst/dataprotocol/dataprotocol.h>
 #include "gsttcpclientsrc.h"
 #include "gsttcpclientsink.h"
 #include "gsttcpserversrc.h"
@@ -34,12 +33,7 @@ GST_DEBUG_CATEGORY (tcp_debug);
 static gboolean
 plugin_init (GstPlugin * plugin)
 {
-  if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE,
-          GST_TYPE_TCPSINK))
-    return FALSE;
-
-  if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC))
-    return FALSE;
+  gst_dp_init ();
 
   if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE,
           GST_TYPE_TCPCLIENTSINK))
index 4dd513a..bb92348 100644 (file)
@@ -119,6 +119,10 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass)
 
   parent_class = g_type_class_ref (GST_TYPE_MULTIFDSINK);
 
+  gobject_class->set_property = gst_tcpserversink_set_property;
+  gobject_class->get_property = gst_tcpserversink_get_property;
+  gobject_class->finalize = gst_tcpserversink_finalize;
+
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
       g_param_spec_string ("host", "host", "The host/IP to send the packets to",
           TCP_DEFAULT_HOST, G_PARAM_READWRITE));
@@ -126,10 +130,6 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass)
       g_param_spec_int ("port", "port", "The port to send the packets to",
           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
 
-  gobject_class->set_property = gst_tcpserversink_set_property;
-  gobject_class->get_property = gst_tcpserversink_get_property;
-  gobject_class->finalize = gst_tcpserversink_finalize;
-
   gstmultifdsink_class->init = gst_tcpserversink_init_send;
   gstmultifdsink_class->wait = gst_tcpserversink_handle_wait;
   gstmultifdsink_class->close = gst_tcpserversink_close;
index 6b4848a..c4d98fa 100644 (file)
@@ -25,9 +25,7 @@
 
 #include <gst/gst.h>
 
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -82,10 +80,6 @@ struct _GstTCPServerSinkClass {
 
 GType gst_tcpserversink_get_type (void);
 
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
+G_END_DECLS
 
 #endif /* __GST_TCPSERVERSINK_H__ */
index 9c84320..5698976 100644 (file)
@@ -46,6 +46,12 @@ GST_ELEMENT_DETAILS ("TCP Server source",
     "Receive data as a server over the network via TCP",
     "Thomas Vander Stichele <thomas at apestaart dot org>");
 
+static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
+    GST_PAD_SRC,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS_ANY);
+
+
 /* TCPServerSrc signals and args */
 enum
 {
@@ -65,15 +71,15 @@ static void gst_tcpserversrc_class_init (GstTCPServerSrc * klass);
 static void gst_tcpserversrc_init (GstTCPServerSrc * tcpserversrc);
 static void gst_tcpserversrc_finalize (GObject * gobject);
 
-static GstData *gst_tcpserversrc_get (GstPad * pad);
-static GstElementStateReturn gst_tcpserversrc_change_state (GstElement *
-    element);
+static gboolean gst_tcpserversrc_start (GstBaseSrc * bsrc);
+static gboolean gst_tcpserversrc_stop (GstBaseSrc * bsrc);
+static GstFlowReturn gst_tcpserversrc_create (GstPushSrc * psrc,
+    GstBuffer ** buf);
 
 static void gst_tcpserversrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_tcpserversrc_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
-static void gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock);
 
 static GstElementClass *parent_class = NULL;
 
@@ -100,7 +106,7 @@ gst_tcpserversrc_get_type (void)
     };
 
     tcpserversrc_type =
-        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSrc",
+        g_type_register_static (GST_TYPE_PUSHSRC, "GstTCPServerSrc",
         &tcpserversrc_info, 0);
   }
   return tcpserversrc_type;
@@ -111,6 +117,9 @@ gst_tcpserversrc_base_init (gpointer g_class)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&srctemplate));
+
   gst_element_class_set_details (element_class, &gst_tcpserversrc_details);
 }
 
@@ -119,11 +128,19 @@ gst_tcpserversrc_class_init (GstTCPServerSrc * klass)
 {
   GObjectClass *gobject_class;
   GstElementClass *gstelement_class;
+  GstBaseSrcClass *gstbasesrc_class;
+  GstPushSrcClass *gstpushsrc_class;
 
   gobject_class = (GObjectClass *) klass;
   gstelement_class = (GstElementClass *) klass;
+  gstbasesrc_class = (GstBaseSrcClass *) klass;
+  gstpushsrc_class = (GstPushSrcClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
 
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+  gobject_class->set_property = gst_tcpserversrc_set_property;
+  gobject_class->get_property = gst_tcpserversrc_get_property;
+  gobject_class->finalize = gst_tcpserversrc_finalize;
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
       g_param_spec_string ("host", "Host", "The hostname to listen as",
@@ -136,241 +153,98 @@ gst_tcpserversrc_class_init (GstTCPServerSrc * klass)
           GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE,
           G_PARAM_READWRITE));
 
-  gobject_class->set_property = gst_tcpserversrc_set_property;
-  gobject_class->get_property = gst_tcpserversrc_get_property;
-  gobject_class->finalize = gst_tcpserversrc_finalize;
+  gstbasesrc_class->start = gst_tcpserversrc_start;
+  gstbasesrc_class->stop = gst_tcpserversrc_stop;
 
-  gstelement_class->change_state = gst_tcpserversrc_change_state;
-  gstelement_class->set_clock = gst_tcpserversrc_set_clock;
+  gstpushsrc_class->create = gst_tcpserversrc_create;
 
   GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
       "TCP Server Source");
 }
 
 static void
-gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock)
+gst_tcpserversrc_init (GstTCPServerSrc * src)
 {
-  GstTCPServerSrc *tcpserversrc;
-
-  tcpserversrc = GST_TCPSERVERSRC (element);
-
-  tcpserversrc->clock = clock;
-}
-
-static void
-gst_tcpserversrc_init (GstTCPServerSrc * this)
-{
-  /* create the src pad */
-  this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
-  gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
-  gst_pad_set_get_function (this->srcpad, gst_tcpserversrc_get);
-
-  this->server_port = TCP_DEFAULT_PORT;
-  this->host = g_strdup (TCP_DEFAULT_HOST);
-  this->clock = NULL;
-  this->server_sock_fd = -1;
-  this->client_sock_fd = -1;
-  this->curoffset = 0;
-  this->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
-
-  GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
+  src->server_port = TCP_DEFAULT_PORT;
+  src->host = g_strdup (TCP_DEFAULT_HOST);
+  src->server_sock_fd = -1;
+  src->client_sock_fd = -1;
+  src->curoffset = 0;
+  src->protocol = GST_TCP_PROTOCOL_TYPE_NONE;
+
+  GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN);
 }
 
 static void
 gst_tcpserversrc_finalize (GObject * gobject)
 {
-  GstTCPServerSrc *this = GST_TCPSERVERSRC (gobject);
+  GstTCPServerSrc *src = GST_TCPSERVERSRC (gobject);
 
-  g_free (this->host);
+  g_free (src->host);
 }
 
-/* read the gdp caps packet from the socket */
-static GstCaps *
-gst_tcpserversrc_gdp_read_caps (GstTCPServerSrc * this)
-{
-  size_t header_length = GST_DP_HEADER_LENGTH;
-  size_t readsize;
-  guint8 *header = NULL;
-  guint8 *payload = NULL;
-  ssize_t ret;
-  GstCaps *caps;
-  gchar *string;
-
-  header = g_malloc (header_length);
-
-  readsize = header_length;
-  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
-  ret = read (this->client_sock_fd, header, readsize);
-  if (ret < 0) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    g_free (header);
-    return NULL;
-  }
-  g_assert (ret == readsize);
-
-  if (!gst_dp_validate_header (header_length, header)) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
-        ("GDP caps packet header does not validate"));
-    g_free (header);
-    return NULL;
-  }
-
-  readsize = gst_dp_header_payload_length (header);
-  payload = g_malloc (readsize);
-  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
-  ret = read (this->client_sock_fd, payload, readsize);
-  if (ret < 0) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    g_free (header);
-    g_free (payload);
-    return NULL;
-  }
-  g_assert (ret == readsize);
-
-  if (!gst_dp_validate_payload (readsize, header, payload)) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
-        ("GDP caps packet payload does not validate"));
-    g_free (header);
-    g_free (payload);
-    return NULL;
-  }
-
-  caps = gst_dp_caps_from_packet (header_length, header, payload);
-  string = gst_caps_to_string (caps);
-  GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
-  g_free (string);
-
-  g_free (header);
-  g_free (payload);
-
-  return caps;
-}
-
-/* read the gdp buffer header from the socket
- * returns a GstData,
- * representing the new GstBuffer to read data into, or an EOS event
- */
-static GstData *
-gst_tcpserversrc_gdp_read_header (GstTCPServerSrc * this)
-{
-  size_t header_length = GST_DP_HEADER_LENGTH;
-  size_t readsize;
-  guint8 *header = NULL;
-  ssize_t ret;
-  GstBuffer *buffer;
-
-  header = g_malloc (header_length);
-  readsize = header_length;
-
-  GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
-  ret = read (this->client_sock_fd, header, readsize);
-  /* if we read 0 bytes, and we're blocking, we hit eos */
-  if (ret == 0) {
-    GST_DEBUG ("blocking read returns 0, EOS");
-    gst_element_set_eos (GST_ELEMENT (this));
-    g_free (header);
-    return GST_DATA (gst_event_new (GST_EVENT_EOS));
-  }
-  if (ret < 0) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    g_free (header);
-    return NULL;
-  }
-  if (ret != readsize) {
-    g_warning ("Wanted %d bytes, got %d bytes", (int) readsize, (int) ret);
-  }
-  g_assert (ret == readsize);
-
-  if (!gst_dp_validate_header (header_length, header)) {
-    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
-        ("GDP buffer packet header does not validate"));
-    g_free (header);
-    return NULL;
-  }
-  GST_LOG_OBJECT (this, "validated buffer packet header");
-
-  buffer = gst_dp_buffer_from_header (header_length, header);
-
-  GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
-  return GST_DATA (buffer);
-}
-
-static GstData *
-gst_tcpserversrc_get (GstPad * pad)
+static GstFlowReturn
+gst_tcpserversrc_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstTCPServerSrc *src;
   size_t readsize;
   int ret;
-
-  GstData *data = NULL;
   GstBuffer *buf = NULL;
   GstCaps *caps;
 
-  g_return_val_if_fail (pad != NULL, NULL);
-  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
-  src = GST_TCPSERVERSRC (GST_OBJECT_PARENT (pad));
-  g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), NULL);
+  src = GST_TCPSERVERSRC (psrc);
+
+  g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN),
+      GST_FLOW_ERROR);
 
   /* read the buffer header if we're using a protocol */
   switch (src->protocol) {
+    case GST_TCP_PROTOCOL_TYPE_NONE:
+    {
       fd_set testfds;
 
-    case GST_TCP_PROTOCOL_TYPE_NONE:
       /* do a blocking select on the socket */
       FD_ZERO (&testfds);
       FD_SET (src->client_sock_fd, &testfds);
-      ret =
-          select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0,
-          0);
+
       /* no action (0) is an error too in our case */
-      if (ret <= 0) {
-        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-            ("select failed: %s", g_strerror (errno)));
-        return GST_DATA (gst_event_new (GST_EVENT_EOS));
-      }
+      if ((ret =
+              select (src->client_sock_fd + 1, &testfds, (fd_set *) 0,
+                  (fd_set *) 0, 0)) <= 0)
+        goto select_error;
+
       /* ask how much is available for reading on the socket */
-      ret = ioctl (src->client_sock_fd, FIONREAD, &readsize);
-      if (ret < 0) {
-        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-            ("ioctl failed: %s", g_strerror (errno)));
-        return GST_DATA (gst_event_new (GST_EVENT_EOS));
-      }
+      if ((ret = ioctl (src->client_sock_fd, FIONREAD, &readsize)) < 0)
+        goto ioctl_error;
 
       buf = gst_buffer_new_and_alloc (readsize);
       break;
+    }
     case GST_TCP_PROTOCOL_TYPE_GDP:
       /* if we haven't received caps yet, we should get them first */
       if (!src->caps_received) {
         gchar *string;
 
-        if (!(caps = gst_tcpserversrc_gdp_read_caps (src))) {
-          GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-              ("Could not read caps through GDP"));
-          return GST_DATA (gst_event_new (GST_EVENT_EOS));
-        }
+        if (!(caps =
+                gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd)))
+          goto gdp_caps_read_error;
+
         src->caps_received = TRUE;
         string = gst_caps_to_string (caps);
         GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
         g_free (string);
 
-        if (!gst_pad_try_set_caps (pad, caps)) {
-          g_warning ("Could not set caps");
-          return GST_DATA (gst_event_new (GST_EVENT_EOS));
-        }
+        gst_pad_set_caps (GST_BASESRC_PAD (psrc), caps);
       }
 
       /* now receive the buffer header */
-      if (!(data = gst_tcpserversrc_gdp_read_header (src))) {
-        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-            ("Could not read data header through GDP"));
-        return GST_DATA (gst_event_new (GST_EVENT_EOS));
-      }
-      if (GST_IS_EVENT (data))
-        return data;
-      buf = GST_BUFFER (data);
+      if (!(buf =
+              gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd)))
+        goto gdp_buffer_read_error;
 
       GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
           buf);
+
       /* use this new buffer to read data into */
       readsize = GST_BUFFER_SIZE (buf);
       break;
@@ -380,31 +254,63 @@ gst_tcpserversrc_get (GstPad * pad)
   }
 
   GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
-  ret =
-      gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
-      readsize);
-  if (ret < 0) {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_buffer_unref (buf);
-    return GST_DATA (gst_event_new (GST_EVENT_EOS));
-  }
+  if ((ret =
+          gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
+              readsize)) < 0)
+    goto read_error;
 
   /* if we read 0 bytes, and we're blocking, we hit eos */
-  if (ret == 0) {
-    GST_DEBUG ("blocking read returns 0, EOS");
-    gst_buffer_unref (buf);
-    gst_element_set_eos (GST_ELEMENT (src));
-    return GST_DATA (gst_event_new (GST_EVENT_EOS));
-  }
+  if (ret == 0)
+    goto hit_eos;
 
   readsize = ret;
   GST_LOG_OBJECT (src, "Read %d bytes", readsize);
   GST_BUFFER_SIZE (buf) = readsize;
-  GST_BUFFER_MAXSIZE (buf) = readsize;
   GST_BUFFER_OFFSET (buf) = src->curoffset;
   GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
   src->curoffset += readsize;
-  return GST_DATA (buf);
+
+  *outbuf = buf;
+
+  return GST_FLOW_OK;
+
+  /* ERROR */
+select_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("select failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+ioctl_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("ioctl failed: %s", g_strerror (errno)));
+    return GST_FLOW_ERROR;
+  }
+gdp_caps_read_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Could not read caps through GDP"));
+    return GST_FLOW_ERROR;
+  }
+gdp_buffer_read_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+        ("Could not read buffer header through GDP"));
+    return GST_FLOW_ERROR;
+  }
+read_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    gst_buffer_unref (buf);
+    return GST_FLOW_ERROR;
+  }
+hit_eos:
+  {
+    GST_DEBUG ("blocking read returns 0, EOS");
+    gst_buffer_unref (buf);
+    return GST_FLOW_WRONG_STATE;
+  }
 }
 
 
@@ -467,121 +373,125 @@ gst_tcpserversrc_get_property (GObject * object, guint prop_id, GValue * value,
 
 /* set up server */
 static gboolean
-gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
+gst_tcpserversrc_start (GstBaseSrc * bsrc)
 {
   int ret;
+  GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc);
 
   /* reset caps_received flag */
-  this->caps_received = FALSE;
+  src->caps_received = FALSE;
 
   /* create the server listener socket */
-  if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
-    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
-    return FALSE;
-  }
-  GST_DEBUG_OBJECT (this, "opened receiving server socket with fd %d",
-      this->server_sock_fd);
+  if ((src->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1)
+    goto socket_error;
+
+  GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d",
+      src->server_sock_fd);
 
   /* make address reusable */
   ret = 1;
-  if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
-          sizeof (int)) < 0) {
-    GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
-        ("Could not setsockopt: %s", g_strerror (errno)));
-    return FALSE;
-  }
+  if (setsockopt (src->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
+          sizeof (int)) < 0)
+    goto sock_opt;
 
   /* name the socket */
-  memset (&this->server_sin, 0, sizeof (this->server_sin));
-  this->server_sin.sin_family = AF_INET;        /* network socket */
-  this->server_sin.sin_port = htons (this->server_port);        /* on port */
-  if (this->host) {
-    gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
-
-    if (!host) {
-      gst_tcp_socket_close (&this->server_sock_fd);
-      return FALSE;
-    }
-
-    this->server_sin.sin_addr.s_addr = inet_addr (host);
+  memset (&src->server_sin, 0, sizeof (src->server_sin));
+  src->server_sin.sin_family = AF_INET; /* network socket */
+  src->server_sin.sin_port = htons (src->server_port);  /* on port */
+  if (src->host) {
+    gchar *host;
+
+    if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host)))
+      goto host_error;
+    src->server_sin.sin_addr.s_addr = inet_addr (host);
     g_free (host);
   } else
-    this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
+    src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
 
   /* bind it */
-  GST_DEBUG_OBJECT (this, "binding server socket to address");
-  ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
-      sizeof (this->server_sin));
+  GST_DEBUG_OBJECT (src, "binding server socket to address");
+  if ((ret = bind (src->server_sock_fd, (struct sockaddr *) &src->server_sin,
+              sizeof (src->server_sin))) < 0)
+    goto bind_error;
+
+  GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d",
+      src->server_sock_fd, TCP_BACKLOG);
+
+  if (listen (src->server_sock_fd, TCP_BACKLOG) == -1)
+    goto listen_error;
+
+  /* FIXME: maybe we should think about moving actual client accepting
+     somewhere else */
+  GST_DEBUG_OBJECT (src, "waiting for client");
+  if ((src->client_sock_fd =
+          accept (src->server_sock_fd, (struct sockaddr *) &src->client_sin,
+              &src->client_sin_len)) == -1)
+    goto accept_error;
+
+  GST_DEBUG_OBJECT (src, "received client");
+
+  GST_FLAG_SET (src, GST_TCPSERVERSRC_OPEN);
 
-  if (ret) {
-    gst_tcp_socket_close (&this->server_sock_fd);
+  return TRUE;
+
+  /* ERRORS */
+socket_error:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+sock_opt:
+  {
+    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+        ("Could not setsockopt: %s", g_strerror (errno)));
+    return FALSE;
+  }
+host_error:
+  {
+    gst_tcp_socket_close (&src->server_sock_fd);
+    return FALSE;
+  }
+bind_error:
+  {
+    gst_tcp_socket_close (&src->server_sock_fd);
     switch (errno) {
       default:
-        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+        GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
             ("bind failed: %s", g_strerror (errno)));
-        return FALSE;
         break;
     }
+    return FALSE;
   }
-
-  GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
-      this->server_sock_fd, TCP_BACKLOG);
-  if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
-    gst_tcp_socket_close (&this->server_sock_fd);
-    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+listen_error:
+  {
+    gst_tcp_socket_close (&src->server_sock_fd);
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
         ("Could not listen on server socket: %s", g_strerror (errno)));
     return FALSE;
   }
-
-  /* FIXME: maybe we should think about moving actual client accepting
-     somewhere else */
-  GST_DEBUG_OBJECT (this, "waiting for client");
-  this->client_sock_fd =
-      accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
-      &this->client_sin_len);
-  if (this->client_sock_fd == -1) {
-    gst_tcp_socket_close (&this->server_sock_fd);
-    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+accept_error:
+  {
+    gst_tcp_socket_close (&src->server_sock_fd);
+    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
         ("Could not accept client on server socket: %s", g_strerror (errno)));
     return FALSE;
   }
-  GST_DEBUG_OBJECT (this, "received client");
-
-  GST_FLAG_SET (this, GST_TCPSERVERSRC_OPEN);
-  return TRUE;
 }
 
-static void
-gst_tcpserversrc_close (GstTCPServerSrc * this)
+static gboolean
+gst_tcpserversrc_stop (GstBaseSrc * bsrc)
 {
-  if (this->server_sock_fd != -1) {
-    close (this->server_sock_fd);
-    this->server_sock_fd = -1;
-  }
-  if (this->client_sock_fd != -1) {
-    close (this->client_sock_fd);
-    this->client_sock_fd = -1;
-  }
-  GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
-}
+  GstTCPServerSrc *src = GST_TCPSERVERSRC (bsrc);
 
-static GstElementStateReturn
-gst_tcpserversrc_change_state (GstElement * element)
-{
-  g_return_val_if_fail (GST_IS_TCPSERVERSRC (element), GST_STATE_FAILURE);
-
-  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    if (GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN))
-      gst_tcpserversrc_close (GST_TCPSERVERSRC (element));
-  } else {
-    if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) {
-      if (!gst_tcpserversrc_init_receive (GST_TCPSERVERSRC (element)))
-        return GST_STATE_FAILURE;
-    }
+  if (src->server_sock_fd != -1) {
+    close (src->server_sock_fd);
+    src->server_sock_fd = -1;
   }
+  if (src->client_sock_fd != -1) {
+    close (src->client_sock_fd);
+    src->client_sock_fd = -1;
+  }
+  GST_FLAG_UNSET (src, GST_TCPSERVERSRC_OPEN);
 
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
-  return GST_STATE_SUCCESS;
+  return TRUE;
 }
index fbc9e36..a0f50be 100644 (file)
 #define __GST_TCPSERVERSRC_H__
 
 #include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
 
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_END_DECLS
 
 #include <errno.h>
 #include <string.h>
@@ -60,10 +59,7 @@ typedef enum {
 } GstTCPServerSrcFlags;
 
 struct _GstTCPServerSrc {
-  GstElement element;
-
-  /* pad */
-  GstPad *srcpad;
+  GstPushSrc element;
 
   /* server information */
   int server_port;
@@ -81,18 +77,14 @@ struct _GstTCPServerSrc {
 
   GstTCPProtocolType protocol; /* protocol used for reading data */
   gboolean caps_received;      /* if we have received caps yet */
-  GstClock *clock;
 };
 
 struct _GstTCPServerSrcClass {
-  GstElementClass parent_class;
+  GstPushSrcClass parent_class;
 };
 
 GType gst_tcpserversrc_get_type (void);
 
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_BEGIN_DECLS
 
 #endif /* __GST_TCPSERVERSRC_H__ */
diff --git a/gst/tcp/gsttcpsink.c b/gst/tcp/gsttcpsink.c
deleted file mode 100644 (file)
index a9211d4..0000000
+++ /dev/null
@@ -1,425 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-#include "gsttcpsink.h"
-
-#define TCP_DEFAULT_HOST       "localhost"
-#define TCP_DEFAULT_PORT       4953
-
-/* elementfactory information */
-static GstElementDetails gst_tcpsink_details =
-GST_ELEMENT_DETAILS ("TCP packet sender",
-    "Sink/Network",
-    "Send data over the network via TCP",
-    "Zeeshan Ali <zak147@yahoo.com>");
-
-/* TCPSink signals and args */
-enum
-{
-  FRAME_ENCODED,
-  /* FILL ME */
-  LAST_SIGNAL
-};
-
-enum
-{
-  ARG_0,
-  ARG_HOST,
-  ARG_PORT,
-  ARG_CONTROL,
-  ARG_MTU
-      /* FILL ME */
-};
-
-#define GST_TYPE_TCPSINK_CONTROL       (gst_tcpsink_control_get_type())
-static GType
-gst_tcpsink_control_get_type (void)
-{
-  static GType tcpsink_control_type = 0;
-  static GEnumValue tcpsink_control[] = {
-    {CONTROL_NONE, "1", "none"},
-    {CONTROL_TCP, "2", "tcp"},
-    {CONTROL_ZERO, NULL, NULL}
-  };
-
-  if (!tcpsink_control_type) {
-    tcpsink_control_type =
-        g_enum_register_static ("GstTCPSinkControl", tcpsink_control);
-  }
-  return tcpsink_control_type;
-}
-
-static void gst_tcpsink_base_init (gpointer g_class);
-static void gst_tcpsink_class_init (GstTCPSink * klass);
-static void gst_tcpsink_init (GstTCPSink * tcpsink);
-
-static void gst_tcpsink_set_clock (GstElement * element, GstClock * clock);
-
-static void gst_tcpsink_chain (GstPad * pad, GstData * _data);
-static GstElementStateReturn gst_tcpsink_change_state (GstElement * element);
-
-static void gst_tcpsink_set_property (GObject * object, guint prop_id,
-    const GValue * value, GParamSpec * pspec);
-static void gst_tcpsink_get_property (GObject * object, guint prop_id,
-    GValue * value, GParamSpec * pspec);
-
-
-static GstElementClass *parent_class = NULL;
-
-/*static guint gst_tcpsink_signals[LAST_SIGNAL] = { 0 }; */
-
-GType
-gst_tcpsink_get_type (void)
-{
-  static GType tcpsink_type = 0;
-
-
-  if (!tcpsink_type) {
-    static const GTypeInfo tcpsink_info = {
-      sizeof (GstTCPSinkClass),
-      gst_tcpsink_base_init,
-      NULL,
-      (GClassInitFunc) gst_tcpsink_class_init,
-      NULL,
-      NULL,
-      sizeof (GstTCPSink),
-      0,
-      (GInstanceInitFunc) gst_tcpsink_init,
-      NULL
-    };
-
-    tcpsink_type =
-        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSink", &tcpsink_info,
-        0);
-  }
-  return tcpsink_type;
-}
-
-static void
-gst_tcpsink_base_init (gpointer g_class)
-{
-  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
-
-  gst_element_class_set_details (element_class, &gst_tcpsink_details);
-}
-
-static void
-gst_tcpsink_class_init (GstTCPSink * klass)
-{
-  GObjectClass *gobject_class;
-  GstElementClass *gstelement_class;
-
-  gobject_class = (GObjectClass *) klass;
-  gstelement_class = (GstElementClass *) klass;
-
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
-
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
-      g_param_spec_string ("host", "host", "The host/IP to send the packets to",
-          TCP_DEFAULT_HOST, G_PARAM_READWRITE));
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
-      g_param_spec_int ("port", "port", "The port to send the packets to",
-          0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
-  g_object_class_install_property (gobject_class, ARG_CONTROL,
-      g_param_spec_enum ("control", "control", "The type of control",
-          GST_TYPE_TCPSINK_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
-  g_object_class_install_property (gobject_class, ARG_MTU, g_param_spec_int ("mtu", "mtu", "mtu", G_MININT, G_MAXINT, 0, G_PARAM_READWRITE));   /* CHECKME */
-  gobject_class->set_property = gst_tcpsink_set_property;
-  gobject_class->get_property = gst_tcpsink_get_property;
-
-  gstelement_class->change_state = gst_tcpsink_change_state;
-  gstelement_class->set_clock = gst_tcpsink_set_clock;
-}
-
-
-static GstPadLinkReturn
-gst_tcpsink_sink_link (GstPad * pad, const GstCaps * caps)
-{
-  GstTCPSink *tcpsink;
-
-#ifndef GST_DISABLE_LOADSAVE
-  struct sockaddr_in serv_addr;
-  struct in_addr addr;
-  struct hostent *he;
-  int fd;
-  FILE *f;
-  xmlDocPtr doc;
-#endif
-
-  tcpsink = GST_TCPSINK (gst_pad_get_parent (pad));
-
-  switch (tcpsink->control) {
-#ifndef GST_DISABLE_LOADSAVE
-    case CONTROL_TCP:
-      memset (&serv_addr, 0, sizeof (serv_addr));
-
-      /* if its an IP address */
-      if (inet_aton (tcpsink->host, &addr)) {
-        memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
-      }
-
-      /* we dont need to lookup for localhost */
-      else if (strcmp (tcpsink->host, TCP_DEFAULT_HOST) == 0) {
-        if (inet_aton ("127.0.0.1", &addr)) {
-          memmove (&(serv_addr.sin_addr), &addr, sizeof (struct in_addr));
-        }
-      }
-
-      /* if its a hostname */
-      else if ((he = gethostbyname (tcpsink->host))) {
-        memmove (&(serv_addr.sin_addr), he->h_addr, he->h_length);
-      }
-
-      else {
-        perror ("hostname lookup error?");
-        return GST_PAD_LINK_REFUSED;
-      }
-
-      serv_addr.sin_family = AF_INET;
-      serv_addr.sin_port = htons (tcpsink->port + 1);
-
-      doc = xmlNewDoc ("1.0");
-      doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL);
-
-      gst_caps_save_thyself (caps, doc->xmlRootNode);
-
-      if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
-        perror ("socket");
-        return GST_PAD_LINK_REFUSED;
-      }
-
-      if (connect (fd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) != 0) {
-        g_printerr ("tcpsink: connect to %s port %d failed: %s\n",
-            tcpsink->host, tcpsink->port + 1, g_strerror (errno));
-        return GST_PAD_LINK_REFUSED;
-      }
-
-      f = fdopen (dup (fd), "wb");
-
-      xmlDocDump (f, doc);
-      fclose (f);
-      close (fd);
-      break;
-
-#endif
-    case CONTROL_NONE:
-      return GST_PAD_LINK_OK;
-      break;
-    default:
-      return GST_PAD_LINK_REFUSED;
-      break;
-  }
-
-  return GST_PAD_LINK_OK;
-}
-
-static void
-gst_tcpsink_set_clock (GstElement * element, GstClock * clock)
-{
-  GstTCPSink *tcpsink;
-
-  tcpsink = GST_TCPSINK (element);
-
-  tcpsink->clock = clock;
-}
-
-static void
-gst_tcpsink_init (GstTCPSink * tcpsink)
-{
-  /* create the sink and src pads */
-  tcpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
-  gst_element_add_pad (GST_ELEMENT (tcpsink), tcpsink->sinkpad);
-  gst_pad_set_chain_function (tcpsink->sinkpad, gst_tcpsink_chain);
-  gst_pad_set_link_function (tcpsink->sinkpad, gst_tcpsink_sink_link);
-
-  tcpsink->host = g_strdup (TCP_DEFAULT_HOST);
-  tcpsink->port = TCP_DEFAULT_PORT;
-  tcpsink->control = CONTROL_TCP;
-  /* should support as minimum 576 for IPV4 and 1500 for IPV6 */
-  tcpsink->mtu = 1500;
-
-  tcpsink->clock = NULL;
-}
-
-static void
-gst_tcpsink_chain (GstPad * pad, GstData * _data)
-{
-  GstBuffer *buf = GST_BUFFER (_data);
-  GstTCPSink *tcpsink;
-
-  g_return_if_fail (pad != NULL);
-  g_return_if_fail (GST_IS_PAD (pad));
-  g_return_if_fail (buf != NULL);
-
-  tcpsink = GST_TCPSINK (GST_OBJECT_PARENT (pad));
-
-  if (tcpsink->clock && GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
-    gst_element_wait (GST_ELEMENT (tcpsink), GST_BUFFER_TIMESTAMP (buf));
-  }
-
-  if (write (tcpsink->sock, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)) <= 0) {
-    perror ("write");
-  }
-
-  gst_buffer_unref (buf);
-}
-
-static void
-gst_tcpsink_set_property (GObject * object, guint prop_id, const GValue * value,
-    GParamSpec * pspec)
-{
-  GstTCPSink *tcpsink;
-
-  /* it's not null if we got it, but it might not be ours */
-  g_return_if_fail (GST_IS_TCPSINK (object));
-  tcpsink = GST_TCPSINK (object);
-
-  switch (prop_id) {
-    case ARG_HOST:
-      if (tcpsink->host != NULL)
-        g_free (tcpsink->host);
-      if (g_value_get_string (value) == NULL)
-        tcpsink->host = NULL;
-      else
-        tcpsink->host = g_strdup (g_value_get_string (value));
-      break;
-    case ARG_PORT:
-      tcpsink->port = g_value_get_int (value);
-      break;
-    case ARG_CONTROL:
-      tcpsink->control = g_value_get_enum (value);
-      break;
-    case ARG_MTU:
-      tcpsink->mtu = g_value_get_int (value);
-      break;
-    default:
-      break;
-  }
-}
-
-static void
-gst_tcpsink_get_property (GObject * object, guint prop_id, GValue * value,
-    GParamSpec * pspec)
-{
-  GstTCPSink *tcpsink;
-
-  /* it's not null if we got it, but it might not be ours */
-  g_return_if_fail (GST_IS_TCPSINK (object));
-  tcpsink = GST_TCPSINK (object);
-
-  switch (prop_id) {
-    case ARG_HOST:
-      g_value_set_string (value, tcpsink->host);
-      break;
-    case ARG_PORT:
-      g_value_set_int (value, tcpsink->port);
-      break;
-    case ARG_CONTROL:
-      g_value_set_enum (value, tcpsink->control);
-      break;
-    case ARG_MTU:
-      g_value_set_int (value, tcpsink->mtu);
-      break;
-    default:
-      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
-      break;
-  }
-}
-
-
-/* create a socket for sending to remote machine */
-static gboolean
-gst_tcpsink_init_send (GstTCPSink * sink)
-{
-  struct hostent *he;
-  struct in_addr addr;
-
-  memset (&sink->theiraddr, 0, sizeof (sink->theiraddr));
-  sink->theiraddr.sin_family = AF_INET; /* host byte order */
-  sink->theiraddr.sin_port = htons (sink->port);        /* short, network byte order */
-
-  /* if its an IP address */
-  if (inet_aton (sink->host, &addr)) {
-    memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
-  }
-
-  /* we dont need to lookup for localhost */
-  else if (strcmp (sink->host, TCP_DEFAULT_HOST) == 0) {
-    if (inet_aton ("127.0.0.1", &addr)) {
-      memmove (&(sink->theiraddr.sin_addr), &addr, sizeof (struct in_addr));
-    }
-  }
-
-  /* if its a hostname */
-  else if ((he = gethostbyname (sink->host))) {
-    memmove (&(sink->theiraddr.sin_addr), he->h_addr, he->h_length);
-  }
-
-  else {
-    perror ("hostname lookup error?");
-    return FALSE;
-  }
-
-  if ((sink->sock = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) {
-    perror ("socket");
-    return FALSE;
-  }
-
-  if (connect (sink->sock, (struct sockaddr *) &(sink->theiraddr),
-          sizeof (sink->theiraddr)) != 0) {
-    perror ("stream connect");
-    return FALSE;
-  }
-
-  GST_FLAG_SET (sink, GST_TCPSINK_OPEN);
-
-  return TRUE;
-}
-
-static void
-gst_tcpsink_close (GstTCPSink * sink)
-{
-  close (sink->sock);
-
-  GST_FLAG_UNSET (sink, GST_TCPSINK_OPEN);
-}
-
-static GstElementStateReturn
-gst_tcpsink_change_state (GstElement * element)
-{
-  g_return_val_if_fail (GST_IS_TCPSINK (element), GST_STATE_FAILURE);
-
-  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    if (GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN))
-      gst_tcpsink_close (GST_TCPSINK (element));
-  } else {
-    if (!GST_FLAG_IS_SET (element, GST_TCPSINK_OPEN)) {
-      if (!gst_tcpsink_init_send (GST_TCPSINK (element)))
-        return GST_STATE_FAILURE;
-    }
-  }
-
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
-  return GST_STATE_SUCCESS;
-}
diff --git a/gst/tcp/gsttcpsink.h b/gst/tcp/gsttcpsink.h
deleted file mode 100644 (file)
index 98c82f6..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-
-#ifndef __GST_TCPSINK_H__
-#define __GST_TCPSINK_H__
-
-
-#include <gst/gst.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <errno.h>
-#include <string.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <sys/wait.h>
-#include <fcntl.h>
-#include <arpa/inet.h>
-#include "gsttcpplugin.h"
-
-#define GST_TYPE_TCPSINK \
-  (gst_tcpsink_get_type())
-#define GST_TCPSINK(obj) \
-  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSINK,GstTCPSink))
-#define GST_TCPSINK_CLASS(klass) \
-  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSINK,GstTCPSink))
-#define GST_IS_TCPSINK(obj) \
-  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSINK))
-#define GST_IS_TCPSINK_CLASS(obj) \
-  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSINK))
-
-typedef struct _GstTCPSink GstTCPSink;
-typedef struct _GstTCPSinkClass GstTCPSinkClass;
-
-typedef enum {
-  GST_TCPSINK_OPEN             = GST_ELEMENT_FLAG_LAST,
-
-  GST_TCPSINK_FLAG_LAST        = GST_ELEMENT_FLAG_LAST + 2,
-} GstTCPSinkFlags;
-
-struct _GstTCPSink {
-  GstElement element;
-
-  /* pads */
-  GstPad *sinkpad,*srcpad;
-
-  int sock;
-  struct sockaddr_in theiraddr;
-  Gst_TCP_Control control;
-
-  gint port;
-  gchar *host;
-    
-  guint mtu;
-    
-  GstClock *clock;
-};
-
-struct _GstTCPSinkClass {
-  GstElementClass parent_class;
-};
-
-GType gst_tcpsink_get_type(void);
-
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
-
-#endif /* __GST_TCPSINK_H__ */
diff --git a/gst/tcp/gsttcpsrc.c b/gst/tcp/gsttcpsrc.c
deleted file mode 100644 (file)
index 6594580..0000000
+++ /dev/null
@@ -1,504 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "gsttcpsrc.h"
-#include <unistd.h>
-
-#define TCP_DEFAULT_PORT               4953
-
-/* elementfactory information */
-static GstElementDetails gst_tcpsrc_details =
-GST_ELEMENT_DETAILS ("TCP packet receiver",
-    "Source/Network",
-    "Receive data over the network via TCP",
-    "Zeeshan Ali <zak147@yahoo.com>");
-
-/* TCPSrc signals and args */
-enum
-{
-  /* FILL ME */
-  LAST_SIGNAL
-};
-
-enum
-{
-  ARG_0,
-  ARG_PORT,
-  ARG_CONTROL
-/*  ARG_SOCKET_OPTIONS,*/
-      /* FILL ME */
-};
-
-#define GST_TYPE_TCPSRC_CONTROL        (gst_tcpsrc_control_get_type())
-static GType
-gst_tcpsrc_control_get_type (void)
-{
-  static GType tcpsrc_control_type = 0;
-  static GEnumValue tcpsrc_control[] = {
-    {CONTROL_NONE, "1", "none"},
-    {CONTROL_TCP, "2", "tcp"},
-    {CONTROL_ZERO, NULL, NULL}
-  };
-
-  if (!tcpsrc_control_type) {
-    tcpsrc_control_type =
-        g_enum_register_static ("GstTCPSrcControl", tcpsrc_control);
-  }
-  return tcpsrc_control_type;
-}
-
-static void gst_tcpsrc_base_init (gpointer g_class);
-static void gst_tcpsrc_class_init (GstTCPSrc * klass);
-static void gst_tcpsrc_init (GstTCPSrc * tcpsrc);
-
-static GstData *gst_tcpsrc_get (GstPad * pad);
-static GstElementStateReturn gst_tcpsrc_change_state (GstElement * element);
-
-static void gst_tcpsrc_set_property (GObject * object, guint prop_id,
-    const GValue * value, GParamSpec * pspec);
-static void gst_tcpsrc_get_property (GObject * object, guint prop_id,
-    GValue * value, GParamSpec * pspec);
-static void gst_tcpsrc_set_clock (GstElement * element, GstClock * clock);
-
-static GstElementClass *parent_class = NULL;
-
-/*static guint gst_tcpsrc_signals[LAST_SIGNAL] = { 0 }; */
-
-GType
-gst_tcpsrc_get_type (void)
-{
-  static GType tcpsrc_type = 0;
-
-
-  if (!tcpsrc_type) {
-    static const GTypeInfo tcpsrc_info = {
-      sizeof (GstTCPSrcClass),
-      gst_tcpsrc_base_init,
-      NULL,
-      (GClassInitFunc) gst_tcpsrc_class_init,
-      NULL,
-      NULL,
-      sizeof (GstTCPSrc),
-      0,
-      (GInstanceInitFunc) gst_tcpsrc_init,
-      NULL
-    };
-
-    tcpsrc_type =
-        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPSrc", &tcpsrc_info, 0);
-  }
-  return tcpsrc_type;
-}
-
-static void
-gst_tcpsrc_base_init (gpointer g_class)
-{
-  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
-
-  gst_element_class_set_details (element_class, &gst_tcpsrc_details);
-}
-
-static void
-gst_tcpsrc_class_init (GstTCPSrc * klass)
-{
-  GObjectClass *gobject_class;
-  GstElementClass *gstelement_class;
-
-  gobject_class = (GObjectClass *) klass;
-  gstelement_class = (GstElementClass *) klass;
-
-  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
-
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
-      g_param_spec_int ("port", "port", "The port to receive the packets from",
-          0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
-  g_object_class_install_property (gobject_class, ARG_CONTROL,
-      g_param_spec_enum ("control", "control", "The type of control",
-          GST_TYPE_TCPSRC_CONTROL, CONTROL_TCP, G_PARAM_READWRITE));
-/*
-  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_SOCKET_OPTIONS,
-    g_param_spec_boolean ("socketop", "socketop", "Enable or disable socket options REUSEADDR and KEEPALIVE",
-                        FALSE, G_PARAM_READWRITE));
-*/
-  gobject_class->set_property = gst_tcpsrc_set_property;
-  gobject_class->get_property = gst_tcpsrc_get_property;
-
-  gstelement_class->change_state = gst_tcpsrc_change_state;
-  gstelement_class->set_clock = gst_tcpsrc_set_clock;
-}
-
-static void
-gst_tcpsrc_set_clock (GstElement * element, GstClock * clock)
-{
-  GstTCPSrc *tcpsrc;
-
-  tcpsrc = GST_TCPSRC (element);
-
-  tcpsrc->clock = clock;
-}
-
-static void
-gst_tcpsrc_init (GstTCPSrc * tcpsrc)
-{
-  /* create the src and src pads */
-  tcpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC);
-  gst_element_add_pad (GST_ELEMENT (tcpsrc), tcpsrc->srcpad);
-  gst_pad_set_get_function (tcpsrc->srcpad, gst_tcpsrc_get);
-
-  tcpsrc->port = TCP_DEFAULT_PORT;
-  tcpsrc->control = CONTROL_TCP;
-  tcpsrc->clock = NULL;
-  tcpsrc->sock = -1;
-  tcpsrc->control_sock = -1;
-  tcpsrc->client_sock = -1;
-  /*tcpsrc->socket_options = FALSE; */
-
-  GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_OPEN);
-  GST_FLAG_SET (tcpsrc, GST_TCPSRC_1ST_BUF);
-  GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
-}
-
-static GstData *
-gst_tcpsrc_get (GstPad * pad)
-{
-  GstTCPSrc *tcpsrc;
-  GstBuffer *outbuf;
-  socklen_t len;
-  gint numbytes;
-  fd_set read_fds;
-  guint max_sock;
-
-#ifndef GST_DISABLE_LOADSAVE
-  int ret, client_sock;
-#endif
-  struct sockaddr client_addr;
-
-  g_return_val_if_fail (pad != NULL, NULL);
-  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
-
-  tcpsrc = GST_TCPSRC (GST_OBJECT_PARENT (pad));
-
-  FD_ZERO (&read_fds);
-  FD_SET (tcpsrc->sock, &read_fds);
-
-  max_sock = tcpsrc->sock;
-
-  if (tcpsrc->control_sock >= 0) {
-    FD_SET (tcpsrc->control_sock, &read_fds);
-    max_sock = MAX (tcpsrc->sock, tcpsrc->control_sock);
-  }
-
-  /* Add to FD_SET client socket, when connection has been established */
-  if (tcpsrc->client_sock >= 0) {
-    FD_SET (tcpsrc->client_sock, &read_fds);
-    max_sock = MAX (tcpsrc->client_sock, max_sock);
-  }
-
-
-  if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) > 0) {
-    if ((tcpsrc->control_sock != -1)
-        && FD_ISSET (tcpsrc->control_sock, &read_fds)) {
-      guchar *buf = NULL;
-
-#ifndef GST_DISABLE_LOADSAVE
-      xmlDocPtr doc;
-      GstCaps *caps;
-#endif
-
-
-      switch (tcpsrc->control) {
-        case CONTROL_TCP:
-
-#ifndef GST_DISABLE_LOADSAVE
-          buf = g_malloc (1024 * 10);
-
-          len = sizeof (struct sockaddr);
-          client_sock = accept (tcpsrc->control_sock, &client_addr, &len);
-
-          if (client_sock <= 0) {
-            perror ("control_sock accept");
-          }
-
-          else if ((ret = read (client_sock, buf, 1024 * 10)) <= 0) {
-            perror ("control_sock read");
-          }
-
-          else {
-            buf[ret] = '\0';
-            doc = xmlParseMemory (buf, ret);
-            caps = gst_caps_load_thyself (doc->xmlRootNode);
-
-            /* foward the connect, we don't signal back the result here... */
-            gst_pad_try_set_caps (tcpsrc->srcpad, caps);
-          }
-
-          g_free (buf);
-#endif
-          break;
-        case CONTROL_NONE:
-        default:
-          g_free (buf);
-          return NULL;
-          break;
-      }
-
-      outbuf = NULL;
-    } else {
-      outbuf = gst_buffer_new ();
-      GST_BUFFER_DATA (outbuf) = g_malloc (24000);
-      GST_BUFFER_SIZE (outbuf) = 24000;
-
-      if (GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_1ST_BUF)) {
-        if (tcpsrc->clock) {
-          GstClockTime current_time;
-          GstEvent *discont;
-
-          current_time = gst_clock_get_time (tcpsrc->clock);
-
-          GST_BUFFER_TIMESTAMP (outbuf) = current_time;
-
-          discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME,
-              current_time, NULL);
-
-          gst_pad_push (tcpsrc->srcpad, GST_DATA (discont));
-        }
-
-        GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_1ST_BUF);
-      }
-
-      else {
-        GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE;
-      }
-
-      if (!GST_FLAG_IS_SET (tcpsrc, GST_TCPSRC_CONNECTED)) {
-        tcpsrc->client_sock = accept (tcpsrc->sock, &client_addr, &len);
-
-        if (tcpsrc->client_sock <= 0) {
-          perror ("accept");
-        }
-
-        else {
-          GST_FLAG_SET (tcpsrc, GST_TCPSRC_CONNECTED);
-        }
-      }
-
-      numbytes =
-          read (tcpsrc->client_sock, GST_BUFFER_DATA (outbuf),
-          GST_BUFFER_SIZE (outbuf));
-
-      if (numbytes > 0) {
-        GST_BUFFER_SIZE (outbuf) = numbytes;
-      }
-
-      else {
-        if (numbytes == -1) {
-          perror ("read");
-        } else
-          g_print ("End of Stream reached\n");
-        gst_buffer_unref (outbuf);
-        outbuf = NULL;
-        close (tcpsrc->client_sock);
-        tcpsrc->client_sock = -1;
-        GST_FLAG_UNSET (tcpsrc, GST_TCPSRC_CONNECTED);
-      }
-    }
-  }
-
-  else {
-    perror ("select");
-    outbuf = NULL;
-  }
-
-  return GST_DATA (outbuf);
-}
-
-
-static void
-gst_tcpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
-    GParamSpec * pspec)
-{
-  GstTCPSrc *tcpsrc;
-
-  /* it's not null if we got it, but it might not be ours */
-  g_return_if_fail (GST_IS_TCPSRC (object));
-  tcpsrc = GST_TCPSRC (object);
-
-  switch (prop_id) {
-    case ARG_PORT:
-      tcpsrc->port = g_value_get_int (value);
-      break;
-    case ARG_CONTROL:
-      tcpsrc->control = g_value_get_enum (value);
-      break;
-/*    case ARG_SOCKET_OPTIONS:
-       tcpsrc->socket_options = g_value_get_boolean(value);    
-      break;   */
-    default:
-      break;
-  }
-}
-
-static void
-gst_tcpsrc_get_property (GObject * object, guint prop_id, GValue * value,
-    GParamSpec * pspec)
-{
-  GstTCPSrc *tcpsrc;
-
-  /* it's not null if we got it, but it might not be ours */
-  g_return_if_fail (GST_IS_TCPSRC (object));
-  tcpsrc = GST_TCPSRC (object);
-
-  switch (prop_id) {
-    case ARG_PORT:
-      g_value_set_int (value, tcpsrc->port);
-      break;
-    case ARG_CONTROL:
-      g_value_set_enum (value, tcpsrc->control);
-      break;
-/*    case ARG_SOCKET_OPTIONS:
-      g_value_set_boolean(value,tcpsrc->socket_options);
-      break;*/
-    default:
-      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
-      break;
-  }
-}
-
-/* create a socket for sending to remote machine */
-static gboolean
-gst_tcpsrc_init_receive (GstTCPSrc * src)
-{
-  guint val = 0;
-
-  memset (&src->myaddr, 0, sizeof (src->myaddr));
-  src->myaddr.sin_family = AF_INET;     /* host byte order */
-  src->myaddr.sin_port = htons (src->port);     /* short, network byte order */
-  src->myaddr.sin_addr.s_addr = INADDR_ANY;
-
-  if ((src->sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
-    perror ("stream_socket");
-    return FALSE;
-  }
-
-/*  if (src->socket_options)
-  {*/
-  g_print ("Socket Options SO_REUSEADDR, SO_KEEPALIVE\n");
-  /* Sock Options */
-  val = 1;
-  /* allow local address reuse */
-  if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (int)) < 0)
-    perror ("setsockopt()");
-  val = 1;
-  /* periodically test if connection still alive */
-  if (setsockopt (src->sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof (int)) < 0)
-    perror ("setsockopt()");
-  /* Sock Options */
-/*  } */
-
-  if (bind (src->sock, (struct sockaddr *) &src->myaddr,
-          sizeof (src->myaddr)) == -1) {
-    perror ("stream_sock bind");
-    return FALSE;
-  }
-
-  if (listen (src->sock, 5) == -1) {
-    perror ("stream_sock listen");
-    return FALSE;
-  }
-
-  fcntl (src->sock, F_SETFL, O_NONBLOCK);
-
-  switch (src->control) {
-    case CONTROL_TCP:
-      if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
-        perror ("control_socket");
-        return FALSE;
-      }
-
-      src->myaddr.sin_port = htons (src->port + 1);
-      if (bind (src->control_sock, (struct sockaddr *) &src->myaddr,
-              sizeof (src->myaddr)) == -1) {
-        perror ("control bind");
-        return FALSE;
-      }
-
-      if (listen (src->control_sock, 5) == -1) {
-        perror ("control listen");
-        return FALSE;
-      }
-
-      fcntl (src->control_sock, F_SETFL, O_NONBLOCK);
-    case CONTROL_NONE:
-      GST_FLAG_SET (src, GST_TCPSRC_OPEN);
-      return TRUE;
-      break;
-    default:
-      return FALSE;
-      break;
-  }
-
-  GST_FLAG_SET (src, GST_TCPSRC_OPEN);
-
-  return TRUE;
-}
-
-static void
-gst_tcpsrc_close (GstTCPSrc * src)
-{
-  if (src->sock != -1) {
-    close (src->sock);
-    src->sock = -1;
-  }
-  if (src->control_sock != -1) {
-    close (src->control_sock);
-    src->control_sock = -1;
-  }
-  if (src->client_sock != -1) {
-    close (src->client_sock);
-    src->client_sock = -1;
-  }
-
-  GST_FLAG_UNSET (src, GST_TCPSRC_OPEN);
-}
-
-static GstElementStateReturn
-gst_tcpsrc_change_state (GstElement * element)
-{
-  g_return_val_if_fail (GST_IS_TCPSRC (element), GST_STATE_FAILURE);
-
-  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    if (GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN))
-      gst_tcpsrc_close (GST_TCPSRC (element));
-  } else {
-    if (!GST_FLAG_IS_SET (element, GST_TCPSRC_OPEN)) {
-      if (!gst_tcpsrc_init_receive (GST_TCPSRC (element)))
-        return GST_STATE_FAILURE;
-    }
-  }
-
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
-  return GST_STATE_SUCCESS;
-}
diff --git a/gst/tcp/gsttcpsrc.h b/gst/tcp/gsttcpsrc.h
deleted file mode 100644 (file)
index 8905abd..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Library General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Library General Public License for more details.
- *
- * You should have received a copy of the GNU Library General Public
- * License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-
-#ifndef __GST_TCPSRC_H__
-#define __GST_TCPSRC_H__
-
-#include <gst/gst.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
-
-#include <errno.h>
-#include <string.h>
-#include <sys/types.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include "gsttcpplugin.h"
-
-#include <fcntl.h>
-
-#define GST_TYPE_TCPSRC \
-  (gst_tcpsrc_get_type())
-#define GST_TCPSRC(obj) \
-  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSRC,GstTCPSrc))
-#define GST_TCPSRC_CLASS(klass) \
-  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSRC,GstTCPSrc))
-#define GST_IS_TCPSRC(obj) \
-  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSRC))
-#define GST_IS_TCPSRC_CLASS(obj) \
-  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSRC))
-
-typedef struct _GstTCPSrc GstTCPSrc;
-typedef struct _GstTCPSrcClass GstTCPSrcClass;
-
-typedef enum {
-  GST_TCPSRC_OPEN       = GST_ELEMENT_FLAG_LAST,
-  GST_TCPSRC_1ST_BUF,
-  GST_TCPSRC_CONNECTED,
-
-  GST_TCPSRC_FLAG_LAST,
-} GstTCPSrcFlags;
-
-struct _GstTCPSrc {
-  GstElement element;
-
-  /* pads */
-  GstPad *sinkpad,*srcpad;
-
-  int port;
-  int sock;
-  int client_sock;
-  int control_sock;
-/*  gboolean socket_options;*/
-  Gst_TCP_Control control;
-
-  struct sockaddr_in myaddr;
-  GstClock *clock;
-};
-
-struct _GstTCPSrcClass {
-  GstElementClass parent_class;
-};
-
-GType gst_tcpsrc_get_type(void);
-
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-
-
-#endif /* __GST_TCPSRC_H__ */