add new tcp elements
authorThomas Vander Stichele <thomas@apestaart.org>
Thu, 20 May 2004 10:15:31 +0000 (10:15 +0000)
committerThomas Vander Stichele <thomas@apestaart.org>
Thu, 20 May 2004 10:15:31 +0000 (10:15 +0000)
Original commit message from CVS:
add new tcp elements

15 files changed:
ChangeLog
gst/tcp/.gitignore [new file with mode: 0644]
gst/tcp/Makefile.am
gst/tcp/README
gst/tcp/gsttcp.c [new file with mode: 0644]
gst/tcp/gsttcp.h [new file with mode: 0644]
gst/tcp/gsttcpclientsink.c [new file with mode: 0644]
gst/tcp/gsttcpclientsink.h [new file with mode: 0644]
gst/tcp/gsttcpclientsrc.c [new file with mode: 0644]
gst/tcp/gsttcpclientsrc.h [new file with mode: 0644]
gst/tcp/gsttcpplugin.c
gst/tcp/gsttcpserversink.c [new file with mode: 0644]
gst/tcp/gsttcpserversink.h [new file with mode: 0644]
gst/tcp/gsttcpserversrc.c [new file with mode: 0644]
gst/tcp/gsttcpserversrc.h [new file with mode: 0644]

index ed229c4..ef28f58 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,22 @@
+2004-05-20  Thomas Vander Stichele  <thomas at apestaart dot org>
+
+       * gst/tcp/.cvsignore:
+         ignore enums
+       * gst/tcp/Makefile.am:
+       * gst/tcp/README:
+       * gst/tcp/gsttcp.c:
+       * gst/tcp/gsttcp.h:
+       * gst/tcp/gsttcpclientsink.c:
+       * gst/tcp/gsttcpclientsink.h:
+       * gst/tcp/gsttcpclientsrc.c:
+       * gst/tcp/gsttcpclientsrc.h:
+       * gst/tcp/gsttcpplugin.c:
+       * gst/tcp/gsttcpserversink.c:
+       * gst/tcp/gsttcpserversink.h:
+       * gst/tcp/gsttcpserversrc.c:
+       * gst/tcp/gsttcpserversrc.h:
+          add new tcp elements
+
 2004-05-19  Wim Taymans  <wim@fluendo.com>
 
        * gst/law/mulaw-conversion.c: (mulaw_encode):
diff --git a/gst/tcp/.gitignore b/gst/tcp/.gitignore
new file mode 100644 (file)
index 0000000..799a57f
--- /dev/null
@@ -0,0 +1,2 @@
+gsttcp-enumtypes.c
+gsttcp-enumtypes.h
index 7bf0454..a5b45b8 100644 (file)
@@ -1,11 +1,35 @@
-
 plugin_LTLIBRARIES = libgsttcp.la
 
-libgsttcp_la_SOURCES = gsttcpplugin.c gsttcpsrc.c gsttcpsink.c
+# variables used for enum/marshal generation
+glib_enum_headers = gsttcp.h
+glib_enum_define = GST_TCP_PROTOCOL
+glib_enum_prefix = gst_tcp_protocol
+
+include $(top_srcdir)/common/glib-gen.mak
+
+built_sources = gsttcp-enumtypes.c
+built_headers = gsttcp-enumtypes.h
+
+BUILT_SOURCES = $(built_sources) $(built_headers)
+
+libgsttcp_la_SOURCES = \
+       gsttcpplugin.c \
+       gsttcpsrc.c gsttcpsink.c \
+       $(built_sources) \
+       gsttcp.c \
+       gsttcpclientsrc.c gsttcpclientsink.c \
+       gsttcpserversrc.c gsttcpserversink.c
+
 libgsttcp_la_CFLAGS = $(GST_CFLAGS)
 libgsttcp_la_LIBADD =
 libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS)
 
-noinst_HEADERS = gsttcpsink.h gsttcpsrc.h gsttcpplugin.h
+noinst_HEADERS = \
+  $(built_headers) \
+  gsttcpplugin.h \
+  gsttcpsrc.h gsttcpsink.h \
+  gsttcp.h \
+  gsttcpclientsrc.h gsttcpclientsink.h \
+  gsttcpserversrc.h gsttcpserversink.h
 
-EXTRA_DIST = README
+CLEANFILES = $(BUILT_SOURCES)
index 72972ab..29e26da 100644 (file)
@@ -1,6 +1,49 @@
+This part of the documentation is for the new tcp elements:
+- tcpclientsrc
+- tcpclientsink
+- tcpserversrc
+- tcpserversink
+                                                                                
+which are created to replace the old tcpsrc/tcpsink
+                                                                                
+TESTS
+-----
+Use these tests to test functionality of the various tcp plugins
+
+* server: nc -l -p 3000
+  client: nc localhost 3000
+  everything you type in the server is shown on the client
+  everything you type in the client is shown on the server
+
+* server: nc -l -p 3000
+  client: gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2
+  everything you type in the server is shown on the client
+
+* server: nc -l -p 3000
+  client: gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000
+  everything you type in the client is shown on the server
+
+* server: gst-launch tcpserversrc protocol=none port=3000 ! fdsink fd=2
+  client: gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000
+
+
+TODO
+----
+- implement DNS resolution
+
+--------
+
+This is the old documentation for the original tcpsrc/tcpsink elements.
+
 * What is TCP src/sink?
 
-solution, like icecast or realaudio or whatever. But the future RTP plugins shall not do the actual transmission/reception of packets on the network themselve but the Application developer would be encouraged to use either the TCP or the UDP plugins for that. UDP would be used mostly but there could be situations where TCP would be the only available choice. For example streaming accross firewalls that do not allow UDP.
+solution, like icecast or realaudio or whatever.
+But the future RTP plugins shall not do the actual transmission/reception
+of packets on the network themselve but the Application developer would be
+encouraged to use either the TCP or the UDP plugins for that. UDP would be
+used mostly but there could be situations where TCP would be the only
+available choice. For example streaming accross firewalls that do not
+allow UDP.
 
 * Shortcomings
 
diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c
new file mode 100644 (file)
index 0000000..ef4c299
--- /dev/null
@@ -0,0 +1,314 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * gsttcp.c: TCP functions
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <unistd.h>
+
+#include <glib.h>
+#include <gst/gst.h>
+#include <gst/gst-i18n-plugin.h>
+#include <gst/dataprotocol/dataprotocol.h>
+
+/* resolve host to IP address, throwing errors if it fails */
+/* host can already be an IP address */
+/* returns a newly allocated gchar * with the dotted ip address */
+gchar *
+gst_tcp_host_to_ip (GstElement * element, const gchar * host)
+{
+  struct hostent *hostinfo;
+  char **addrs;
+  gchar *ip;
+  struct in_addr addr;
+
+  /* first check if it already is an IP address */
+  if (inet_aton (host, &addr)) {
+    return g_strdup (host);
+  }
+
+  /* FIXME: could do a localhost check here */
+
+  /* perform a name lookup */
+  hostinfo = gethostbyname (host);
+  if (!hostinfo) {
+    GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
+        ("Could not find IP address for host \"%s\".", host));
+    return NULL;
+  }
+
+  if (hostinfo->h_addrtype != AF_INET) {
+    GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL),
+        ("host \"%s\" is not an IP host", host));
+    return NULL;
+  }
+
+  addrs = hostinfo->h_addr_list;
+  /* There could be more than one IP address, but we just return the first */
+  ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs));
+
+  return ip;
+}
+
+/* write buffer to given socket incrementally.
+ * Returns number of bytes written.
+ */
+gint
+gst_tcp_socket_write (int socket, const void *buf, size_t count)
+{
+  size_t bytes_written = 0;
+
+  while (bytes_written < count) {
+    size_t wrote = write (socket, buf + bytes_written,
+        count - bytes_written);
+
+    if (wrote <= 0) {
+      return bytes_written;
+    }
+    bytes_written += wrote;
+  }
+
+  if (bytes_written < 0)
+    GST_DEBUG ("error while writing");
+  else
+    GST_DEBUG ("wrote %d bytes succesfully", bytes_written);
+  return bytes_written;
+}
+
+/* read number of bytes from a socket into a given buffer incrementally.
+ * Returns number of bytes read.
+ */
+gint
+gst_tcp_socket_read (int socket, void *buf, size_t count)
+{
+  size_t bytes_read = 0;
+
+  while (bytes_read < count) {
+    size_t ret = read (socket, buf + bytes_read,
+        count - bytes_read);
+
+    if (ret <= 0) {
+      return bytes_read;
+    }
+    bytes_read += ret;
+  }
+
+  if (bytes_read < 0)
+    GST_DEBUG ("error while reading");
+  else
+    GST_DEBUG ("read %d bytes succesfully", bytes_read);
+  return bytes_read;
+}
+
+/* read the gdp buffer header from the given socket
+ * returns a GstData,
+ * representing the new GstBuffer to read data into, or an EOS event
+ */
+GstData *
+gst_tcp_gdp_read_header (GstElement * this, int socket)
+{
+  size_t header_length = GST_DP_HEADER_LENGTH;
+  size_t readsize;
+  guint8 *header = NULL;
+  size_t ret;
+  GstBuffer *buffer;
+
+  header = g_malloc (header_length);
+
+  readsize = header_length;
+
+  GST_DEBUG_OBJECT (this, "Reading %d bytes for buffer packet header",
+      readsize);
+  ret = read (socket, header, readsize);
+  /* if we read 0 bytes, and we're blocking, we hit eos */
+  if (ret == 0) {
+    GST_DEBUG ("blocking read returns 0, EOS");
+    gst_element_set_eos (GST_ELEMENT (this));
+    return GST_DATA (gst_event_new (GST_EVENT_EOS));
+  }
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return NULL;
+  }
+  if (ret != readsize) {
+    g_warning ("Wanted %d bytes, got %d bytes", readsize, ret);
+  }
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_header (header_length, header)) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP buffer packet header does not validate"));
+    g_free (header);
+    return NULL;
+  }
+  GST_DEBUG_OBJECT (this, "validated buffer packet header");
+
+  buffer = gst_dp_buffer_from_header (header_length, header);
+
+  GST_DEBUG_OBJECT (this, "created new buffer %p from packet header", buffer);
+  return GST_DATA (buffer);
+}
+
+/* read the GDP caps packet from the given socket
+ * returns the caps, or NULL in case of an error */
+GstCaps *
+gst_tcp_gdp_read_caps (GstElement * this, int socket)
+{
+  size_t header_length = GST_DP_HEADER_LENGTH;
+  size_t readsize;
+  guint8 *header = NULL;
+  guint8 *payload = NULL;
+  size_t ret;
+  GstCaps *caps;
+  gchar *string;
+
+  header = g_malloc (header_length);
+
+  readsize = header_length;
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
+  ret = read (socket, header, readsize);
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return NULL;
+  }
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_header (header_length, header)) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP caps packet header does not validate"));
+    g_free (header);
+    return NULL;
+  }
+
+  readsize = gst_dp_header_payload_length (header);
+  payload = g_malloc (readsize);
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
+  ret = read (socket, payload, readsize);
+
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    g_free (header);
+    g_free (payload);
+    return NULL;
+  }
+  if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("Header read doesn't describe CAPS payload"));
+    g_free (header);
+    g_free (payload);
+    return NULL;
+  }
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_payload (readsize, header, payload)) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP caps packet payload does not validate"));
+    g_free (header);
+    g_free (payload);
+    return NULL;
+  }
+
+  caps = gst_dp_caps_from_packet (header_length, header, payload);
+  string = gst_caps_to_string (caps);
+  GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
+
+  g_free (header);
+  g_free (payload);
+  g_free (string);
+  return caps;
+}
+
+/* write a GDP header to the socket.  Return false if fails. */
+gboolean
+gst_tcp_gdp_write_header (GstElement * this, int socket, GstBuffer * buffer,
+    gboolean fatal, const gchar * host, int port)
+{
+  guint length;
+  guint8 *header;
+  size_t wrote;
+
+  if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) {
+    if (fatal)
+      GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
+          ("Could not create GDP header from buffer"));
+    return FALSE;
+  }
+
+  GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length);
+  wrote = gst_tcp_socket_write (socket, header, length);
+  if (wrote != length) {
+    if (fatal)
+      GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
+          (_("Error while sending data to \"%s:%d\"."), host, port),
+          ("Only %d of %d bytes written: %s",
+              wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno)));
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
+/* write GDP header and payload to the given socket for the given caps.
+ * Return false if fails. */
+gboolean
+gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps,
+    gboolean fatal, const char *host, int port)
+{
+  guint length;
+  guint8 *header;
+  guint8 *payload;
+  size_t wrote;
+
+  if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+    if (fatal)
+      GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL),
+          ("Could not create GDP packet from caps"));
+    return FALSE;
+  }
+  GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length);
+  wrote = gst_tcp_socket_write (socket, header, length);
+  if (wrote != length) {
+    if (fatal)
+      GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
+          (_("Error while sending gdp header data to \"%s:%d\"."), host, port),
+          ("Only %d of %d bytes written: %s",
+              wrote, length, g_strerror (errno)));
+    return FALSE;
+  }
+
+  length = gst_dp_header_payload_length (header);
+  GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length);
+  wrote = gst_tcp_socket_write (socket, payload, length);
+  if (wrote != length) {
+    if (fatal)
+      GST_ELEMENT_ERROR (this, RESOURCE, WRITE,
+          (_("Error while sending gdp payload data to \"%s:%d\"."), host, port),
+          ("Only %d of %d bytes written: %s",
+              wrote, length, g_strerror (errno)));
+    return FALSE;
+  }
+  return TRUE;
+}
diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h
new file mode 100644 (file)
index 0000000..67cee58
--- /dev/null
@@ -0,0 +1,51 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * gsttcp.h: helper functions
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __GST_TCP_HELP_H__
+#define __GST_TCP_HELP_H__
+
+#include "gsttcp-enumtypes.h"
+#include <gst/gst.h>
+#include <gst/dataprotocol/dataprotocol.h>
+
+G_BEGIN_DECLS
+
+typedef enum
+{
+  GST_TCP_PROTOCOL_TYPE_NONE,
+  GST_TCP_PROTOCOL_TYPE_GDP
+} GstTCPProtocolType;
+
+gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host);
+
+gint gst_tcp_socket_write (int socket, const void *buf, size_t count);
+gint gst_tcp_socket_read (int socket, void *buf, size_t count);
+
+GstData * gst_tcp_gdp_read_header (GstElement *this, int socket);
+GstCaps * gst_tcp_gdp_read_caps (GstElement *this, int socket);
+
+gboolean gst_tcp_gdp_write_header (GstElement *this, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port);
+gboolean gst_tcp_gdp_write_caps (GstElement *this, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port);
+
+G_END_DECLS
+
+#endif /* __GST_TCP_HELP_H__ */
diff --git a/gst/tcp/gsttcpclientsink.c b/gst/tcp/gsttcpclientsink.c
new file mode 100644 (file)
index 0000000..8f42eca
--- /dev/null
@@ -0,0 +1,392 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gst/gst-i18n-plugin.h>
+#include <gst/dataprotocol/dataprotocol.h>
+#include "gsttcp.h"
+#include "gsttcpclientsink.h"
+
+#define TCP_DEFAULT_HOST       "localhost"
+#define TCP_DEFAULT_PORT       4953
+
+/* elementfactory information */
+static GstElementDetails gst_tcpclientsink_details =
+GST_ELEMENT_DETAILS ("TCP Client sink",
+    "Sink/Network",
+    "Send data as a client over the network via TCP",
+    "Thomas Vander Stichele <thomas at apestaart dot org>");
+
+/* TCPClientSink signals and args */
+enum
+{
+  FRAME_ENCODED,
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+GST_DEBUG_CATEGORY (tcpclientsink_debug);
+#define GST_CAT_DEFAULT (tcpclientsink_debug)
+
+enum
+{
+  ARG_0,
+  ARG_HOST,
+  ARG_PORT,
+  ARG_PROTOCOL
+      /* FILL ME */
+};
+
+static void gst_tcpclientsink_base_init (gpointer g_class);
+static void gst_tcpclientsink_class_init (GstTCPClientSink * klass);
+static void gst_tcpclientsink_init (GstTCPClientSink * tcpclientsink);
+
+static void gst_tcpclientsink_set_clock (GstElement * element,
+    GstClock * clock);
+
+static void gst_tcpclientsink_chain (GstPad * pad, GstData * _data);
+static GstElementStateReturn gst_tcpclientsink_change_state (GstElement *
+    element);
+
+static void gst_tcpclientsink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_tcpclientsink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+
+static GstElementClass *parent_class = NULL;
+
+/*static guint gst_tcpclientsink_signals[LAST_SIGNAL] = { 0 }; */
+
+GType
+gst_tcpclientsink_get_type (void)
+{
+  static GType tcpclientsink_type = 0;
+
+
+  if (!tcpclientsink_type) {
+    static const GTypeInfo tcpclientsink_info = {
+      sizeof (GstTCPClientSinkClass),
+      gst_tcpclientsink_base_init,
+      NULL,
+      (GClassInitFunc) gst_tcpclientsink_class_init,
+      NULL,
+      NULL,
+      sizeof (GstTCPClientSink),
+      0,
+      (GInstanceInitFunc) gst_tcpclientsink_init,
+      NULL
+    };
+
+    tcpclientsink_type =
+        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSink",
+        &tcpclientsink_info, 0);
+  }
+  return tcpclientsink_type;
+}
+
+static void
+gst_tcpclientsink_base_init (gpointer g_class)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_set_details (element_class, &gst_tcpclientsink_details);
+}
+
+static void
+gst_tcpclientsink_class_init (GstTCPClientSink * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
+      g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
+          TCP_DEFAULT_HOST, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
+      g_param_spec_int ("port", "Port", "The port to send the packets to",
+          0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
+  g_object_class_install_property (gobject_class, ARG_PROTOCOL,
+      g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
+          GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_GDP,
+          G_PARAM_READWRITE));
+  gobject_class->set_property = gst_tcpclientsink_set_property;
+  gobject_class->get_property = gst_tcpclientsink_get_property;
+
+  gstelement_class->change_state = gst_tcpclientsink_change_state;
+  gstelement_class->set_clock = gst_tcpclientsink_set_clock;
+
+  GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
+}
+
+static void
+gst_tcpclientsink_set_clock (GstElement * element, GstClock * clock)
+{
+  GstTCPClientSink *tcpclientsink;
+
+  tcpclientsink = GST_TCPCLIENTSINK (element);
+
+  tcpclientsink->clock = clock;
+}
+
+static void
+gst_tcpclientsink_init (GstTCPClientSink * this)
+{
+  /* create the sink pad */
+  this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
+  gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
+  gst_pad_set_chain_function (this->sinkpad, gst_tcpclientsink_chain);
+
+  this->host = g_strdup (TCP_DEFAULT_HOST);
+  this->port = TCP_DEFAULT_PORT;
+  /* should support as minimum 576 for IPV4 and 1500 for IPV6 */
+  /* this->mtu = 1500; */
+
+  this->sock_fd = -1;
+  this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
+  GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
+
+  this->clock = NULL;
+}
+
+static void
+gst_tcpclientsink_chain (GstPad * pad, GstData * _data)
+{
+  size_t wrote = 0;
+
+  GstBuffer *buf = GST_BUFFER (_data);
+  GstTCPClientSink *sink;
+
+  g_return_if_fail (pad != NULL);
+  g_return_if_fail (GST_IS_PAD (pad));
+  g_return_if_fail (buf != NULL);
+  sink = GST_TCPCLIENTSINK (GST_OBJECT_PARENT (pad));
+  g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPCLIENTSINK_OPEN));
+
+  if (GST_IS_EVENT (buf)) {
+    g_warning ("FIXME: handl events");
+    return;
+  }
+
+  /* write the buffer header if we have one */
+  switch (sink->protocol) {
+    case GST_TCP_PROTOCOL_TYPE_NONE:
+      break;
+
+    case GST_TCP_PROTOCOL_TYPE_GDP:
+      /* if we haven't send caps yet, send them first */
+      if (!sink->caps_sent) {
+        const GstCaps *caps;
+        gchar *string;
+
+        caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
+        string = gst_caps_to_string (caps);
+        GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string);
+        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd, caps,
+                TRUE, sink->host, sink->port)) {
+          g_free (string);
+          return;
+        }
+        g_free (string);
+        sink->caps_sent = TRUE;
+      }
+      GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
+      if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), sink->sock_fd, buf,
+              TRUE, sink->host, sink->port))
+        return;
+      break;
+    default:
+      g_warning ("Unhandled protocol type");
+      break;
+  }
+
+  GST_LOG_OBJECT (sink, "writing %d bytes for buffer data",
+      GST_BUFFER_SIZE (buf));
+  wrote =
+      gst_tcp_socket_write (sink->sock_fd, GST_BUFFER_DATA (buf),
+      GST_BUFFER_SIZE (buf));
+
+  if (wrote < GST_BUFFER_SIZE (buf)) {
+    GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
+        (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
+        ("Only %d of %d bytes written: %s",
+            wrote, GST_BUFFER_SIZE (buf), g_strerror (errno)));
+  }
+  sink->data_written += wrote;
+
+  gst_buffer_unref (buf);
+
+  /* FIXME: emit signal ? */
+}
+
+static void
+gst_tcpclientsink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstTCPClientSink *tcpclientsink;
+
+  /* it's not null if we got it, but it might not be ours */
+  g_return_if_fail (GST_IS_TCPCLIENTSINK (object));
+  tcpclientsink = GST_TCPCLIENTSINK (object);
+
+  switch (prop_id) {
+    case ARG_HOST:
+      if (tcpclientsink->host != NULL)
+        g_free (tcpclientsink->host);
+      if (g_value_get_string (value) == NULL)
+        tcpclientsink->host = NULL;
+      else
+        tcpclientsink->host = g_strdup (g_value_get_string (value));
+      break;
+    case ARG_PORT:
+      tcpclientsink->port = g_value_get_int (value);
+      break;
+    case ARG_PROTOCOL:
+      tcpclientsink->protocol = g_value_get_enum (value);
+      break;
+    default:
+      break;
+  }
+}
+
+static void
+gst_tcpclientsink_get_property (GObject * object, guint prop_id, GValue * value,
+    GParamSpec * pspec)
+{
+  GstTCPClientSink *tcpclientsink;
+
+  /* it's not null if we got it, but it might not be ours */
+  g_return_if_fail (GST_IS_TCPCLIENTSINK (object));
+  tcpclientsink = GST_TCPCLIENTSINK (object);
+
+  switch (prop_id) {
+    case ARG_HOST:
+      g_value_set_string (value, tcpclientsink->host);
+      break;
+    case ARG_PORT:
+      g_value_set_int (value, tcpclientsink->port);
+      break;
+    case ARG_PROTOCOL:
+      g_value_set_enum (value, tcpclientsink->protocol);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+
+/* create a socket for sending to remote machine */
+static gboolean
+gst_tcpclientsink_init_send (GstTCPClientSink * this)
+{
+  int ret;
+  gchar *ip;
+
+  /* reset caps_sent flag */
+  this->caps_sent = FALSE;
+
+  /* create sending client socket */
+  GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
+      this->port);
+  if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+  GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d",
+      this->sock_fd);
+
+  /* look up name if we need to */
+  ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
+  if (!ip)
+    return FALSE;
+  GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
+
+  /* connect to server */
+  memset (&this->server_sin, 0, sizeof (this->server_sin));
+  this->server_sin.sin_family = AF_INET;        /* network socket */
+  this->server_sin.sin_port = htons (this->port);       /* on port */
+  this->server_sin.sin_addr.s_addr = inet_addr (ip);    /* on host ip */
+
+  GST_DEBUG_OBJECT (this, "connecting to server");
+  ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
+      sizeof (this->server_sin));
+
+  if (ret) {
+    switch (errno) {
+      case ECONNREFUSED:
+        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE,
+            (_("Connection to %s:%d refused."), this->host, this->port),
+            (NULL));
+        return FALSE;
+        break;
+      default:
+        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+            ("connect to %s:%d failed: %s", this->host, this->port,
+                g_strerror (errno)));
+        return FALSE;
+        break;
+    }
+  }
+
+  GST_FLAG_SET (this, GST_TCPCLIENTSINK_OPEN);
+
+  this->data_written = 0;
+
+  return TRUE;
+}
+
+static void
+gst_tcpclientsink_close (GstTCPClientSink * this)
+{
+  if (this->sock_fd != -1) {
+    close (this->sock_fd);
+    this->sock_fd = -1;
+  }
+
+  GST_FLAG_UNSET (this, GST_TCPCLIENTSINK_OPEN);
+}
+
+static GstElementStateReturn
+gst_tcpclientsink_change_state (GstElement * element)
+{
+  g_return_val_if_fail (GST_IS_TCPCLIENTSINK (element), GST_STATE_FAILURE);
+
+  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
+    if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN))
+      gst_tcpclientsink_close (GST_TCPCLIENTSINK (element));
+  } else {
+    if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSINK_OPEN)) {
+      if (!gst_tcpclientsink_init_send (GST_TCPCLIENTSINK (element)))
+        return GST_STATE_FAILURE;
+    }
+  }
+
+  if (GST_ELEMENT_CLASS (parent_class)->change_state)
+    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+  return GST_STATE_SUCCESS;
+}
diff --git a/gst/tcp/gsttcpclientsink.h b/gst/tcp/gsttcpclientsink.h
new file mode 100644 (file)
index 0000000..9cd657a
--- /dev/null
@@ -0,0 +1,101 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifndef __GST_TCPCLIENTSINK_H__
+#define __GST_TCPCLIENTSINK_H__
+
+
+#include <gst/gst.h>
+#include "gsttcp.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <fcntl.h>
+#include <arpa/inet.h>
+#include "gsttcp.h"
+
+#define GST_TYPE_TCPCLIENTSINK \
+  (gst_tcpclientsink_get_type())
+#define GST_TCPCLIENTSINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPCLIENTSINK,GstTCPClientSink))
+#define GST_TCPCLIENTSINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPCLIENTSINK,GstTCPClientSink))
+#define GST_IS_TCPCLIENTSINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPCLIENTSINK))
+#define GST_IS_TCPCLIENTSINK_CLASS(obj) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPCLIENTSINK))
+
+typedef struct _GstTCPClientSink GstTCPClientSink;
+typedef struct _GstTCPClientSinkClass GstTCPClientSinkClass;
+
+typedef enum {
+  GST_TCPCLIENTSINK_OPEN             = GST_ELEMENT_FLAG_LAST,
+
+  GST_TCPCLIENTSINK_FLAG_LAST        = GST_ELEMENT_FLAG_LAST + 2,
+} GstTCPClientSinkFlags;
+
+struct _GstTCPClientSink {
+  GstElement element;
+
+  /* pad */
+  GstPad *sinkpad;
+
+  /* server information */
+  int port;
+  gchar *host;
+  struct sockaddr_in server_sin;
+
+  /* socket */
+  int sock_fd;
+
+  size_t data_written; /* how much bytes have we written ? */
+  GstTCPProtocolType protocol; /* used with the protocol enum */
+  gboolean caps_sent; /* whether or not we sent caps already */
+
+  guint mtu;
+  GstClock *clock;
+};
+
+struct _GstTCPClientSinkClass {
+  GstElementClass parent_class;
+};
+
+GType gst_tcpclientsink_get_type(void);
+
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+
+#endif /* __GST_TCPCLIENTSINK_H__ */
diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c
new file mode 100644 (file)
index 0000000..86a46dd
--- /dev/null
@@ -0,0 +1,472 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst-i18n-plugin.h>
+#include "gsttcp.h"
+#include "gsttcpclientsrc.h"
+#include <string.h>             /* memset */
+#include <unistd.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/ioctl.h>
+
+GST_DEBUG_CATEGORY (tcpclientsrc_debug);
+#define GST_CAT_DEFAULT tcpclientsrc_debug
+
+#define TCP_DEFAULT_PORT               4953
+#define TCP_DEFAULT_HOST               "localhost"
+#define MAX_READ_SIZE                  4 * 1024
+
+/* elementfactory information */
+static GstElementDetails gst_tcpclientsrc_details =
+GST_ELEMENT_DETAILS ("TCP Client source",
+    "Source/Network",
+    "Receive data as a client over the network via TCP",
+    "Thomas Vander Stichele <thomas at apestaart dot org>");
+
+/* TCPClientSrc signals and args */
+enum
+{
+  LAST_SIGNAL
+};
+
+enum
+{
+  ARG_0,
+  ARG_PORT,
+  ARG_HOST,
+  ARG_PROTOCOL
+};
+
+static void gst_tcpclientsrc_base_init (gpointer g_class);
+static void gst_tcpclientsrc_class_init (GstTCPClientSrc * klass);
+static void gst_tcpclientsrc_init (GstTCPClientSrc * tcpclientsrc);
+
+static GstData *gst_tcpclientsrc_get (GstPad * pad);
+static GstElementStateReturn gst_tcpclientsrc_change_state (GstElement *
+    element);
+
+static void gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_tcpclientsrc_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+static void gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock);
+
+static GstElementClass *parent_class = NULL;
+
+/*static guint gst_tcpclientsrc_signals[LAST_SIGNAL] = { 0 }; */
+
+GType
+gst_tcpclientsrc_get_type (void)
+{
+  static GType tcpclientsrc_type = 0;
+
+
+  if (!tcpclientsrc_type) {
+    static const GTypeInfo tcpclientsrc_info = {
+      sizeof (GstTCPClientSrcClass),
+      gst_tcpclientsrc_base_init,
+      NULL,
+      (GClassInitFunc) gst_tcpclientsrc_class_init,
+      NULL,
+      NULL,
+      sizeof (GstTCPClientSrc),
+      0,
+      (GInstanceInitFunc) gst_tcpclientsrc_init,
+      NULL
+    };
+
+    tcpclientsrc_type =
+        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPClientSrc",
+        &tcpclientsrc_info, 0);
+  }
+  return tcpclientsrc_type;
+}
+
+static void
+gst_tcpclientsrc_base_init (gpointer g_class)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_set_details (element_class, &gst_tcpclientsrc_details);
+}
+
+static void
+gst_tcpclientsrc_class_init (GstTCPClientSrc * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
+      g_param_spec_string ("host", "Host",
+          "The host IP address to receive packets from", TCP_DEFAULT_HOST,
+          G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
+      g_param_spec_int ("port", "Port", "The port to receive packets from", 0,
+          32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
+  g_object_class_install_property (gobject_class, ARG_PROTOCOL,
+      g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
+          GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_GDP,
+          G_PARAM_READWRITE));
+
+  gobject_class->set_property = gst_tcpclientsrc_set_property;
+  gobject_class->get_property = gst_tcpclientsrc_get_property;
+
+  gstelement_class->change_state = gst_tcpclientsrc_change_state;
+  gstelement_class->set_clock = gst_tcpclientsrc_set_clock;
+
+  GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
+      "TCP Client Source");
+}
+
+static void
+gst_tcpclientsrc_set_clock (GstElement * element, GstClock * clock)
+{
+  GstTCPClientSrc *tcpclientsrc;
+
+  tcpclientsrc = GST_TCPCLIENTSRC (element);
+
+  tcpclientsrc->clock = clock;
+}
+
+static void
+gst_tcpclientsrc_init (GstTCPClientSrc * this)
+{
+  /* create the src pad */
+  this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
+  gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
+  gst_pad_set_get_function (this->srcpad, gst_tcpclientsrc_get);
+
+  this->port = TCP_DEFAULT_PORT;
+  this->host = g_strdup (TCP_DEFAULT_HOST);
+  this->clock = NULL;
+  this->sock_fd = -1;
+  this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
+  this->curoffset = 0;
+
+  GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
+}
+
+static GstData *
+gst_tcpclientsrc_get (GstPad * pad)
+{
+  GstTCPClientSrc *src;
+  size_t readsize;
+  int ret;
+
+  GstData *data = NULL;
+  GstBuffer *buf = NULL;
+  GstCaps *caps;
+
+  g_return_val_if_fail (pad != NULL, NULL);
+  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
+  src = GST_TCPCLIENTSRC (GST_OBJECT_PARENT (pad));
+  g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPCLIENTSRC_OPEN), NULL);
+
+  /* if we have a left over buffer after a discont, return that */
+  if (src->buffer_after_discont) {
+    buf = src->buffer_after_discont;
+    GST_LOG_OBJECT (src,
+        "Returning buffer after discont of size %d with timestamp %"
+        GST_TIME_FORMAT " and duration %" GST_TIME_FORMAT,
+        GST_BUFFER_SIZE (buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+        GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
+    src->buffer_after_discont = NULL;
+    return GST_DATA (buf);
+  }
+
+  /* read the buffer header if we're using a protocol */
+  switch (src->protocol) {
+      fd_set testfds;
+
+    case GST_TCP_PROTOCOL_TYPE_NONE:
+      /* do a blocking select on the socket */
+      FD_ZERO (&testfds);
+      FD_SET (src->sock_fd, &testfds);
+      ret = select (src->sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0, 0);
+      /* no action (0) is an error too in our case */
+      if (ret <= 0) {
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("select failed: %s", g_strerror (errno)));
+        return NULL;
+      }
+
+      /* ask how much is available for reading on the socket */
+      ret = ioctl (src->sock_fd, FIONREAD, &readsize);
+      if (ret < 0) {
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("ioctl failed: %s", g_strerror (errno)));
+        return NULL;
+      }
+      buf = gst_buffer_new_and_alloc (readsize);
+      break;
+    case GST_TCP_PROTOCOL_TYPE_GDP:
+      /* if we haven't received caps yet, we should get them first */
+      if (!src->caps_received) {
+        gchar *string;
+
+        if (!(caps = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd))) {
+          GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+              ("Could not read caps through GDP"));
+          return NULL;
+        }
+        src->caps_received = TRUE;
+        string = gst_caps_to_string (caps);
+        GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
+        g_free (string);
+
+        if (!gst_pad_try_set_caps (pad, caps)) {
+          g_warning ("Could not set caps");
+          return NULL;
+        }
+      }
+
+      /* now receive the buffer header */
+      if (!(data = gst_tcp_gdp_read_header (GST_ELEMENT (src), src->sock_fd))) {
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("Could not read data header through GDP"));
+        return NULL;
+      }
+      if (GST_IS_EVENT (data))
+        return data;
+      buf = GST_BUFFER (data);
+
+      GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
+          buf);
+      /* use this new buffer to read data into */
+      readsize = GST_BUFFER_SIZE (buf);
+      break;
+    default:
+      g_warning ("Unhandled protocol type");
+      break;
+  }
+
+  GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
+  ret = gst_tcp_socket_read (src->sock_fd, GST_BUFFER_DATA (buf), readsize);
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return NULL;
+  }
+
+  /* if we read 0 bytes, and we're blocking, we hit eos */
+  if (ret == 0) {
+    GST_DEBUG ("blocking read returns 0, EOS");
+    gst_buffer_unref (buf);
+    gst_element_set_eos (GST_ELEMENT (src));
+    return GST_DATA (gst_event_new (GST_EVENT_EOS));
+  }
+
+  readsize = ret;
+  GST_BUFFER_SIZE (buf) = readsize;
+  GST_BUFFER_MAXSIZE (buf) = readsize;
+  GST_BUFFER_OFFSET (buf) = src->curoffset;
+  GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
+
+  /* if this is our first buffer, we need to send a discont with the
+   * given timestamp or the current offset, and store the buffer for
+   * the next iteration through the get loop */
+  if (src->send_discont) {
+    GstClockTime timestamp;
+    GstEvent *event;
+
+    src->send_discont = FALSE;
+    src->buffer_after_discont = buf;
+    /* if the timestamp is valid, send a timed discont
+     * taking into account the incoming buffer's timestamps */
+    timestamp = GST_BUFFER_TIMESTAMP (buf);
+    if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
+      GST_DEBUG_OBJECT (src,
+          "sending discontinuous with timestamp %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (timestamp));
+      event =
+          gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, timestamp, NULL);
+      return GST_DATA (event);
+    }
+    /* otherwise, send an offset discont */
+    GST_DEBUG_OBJECT (src, "sending discontinuous with offset %d",
+        src->curoffset);
+    event =
+        gst_event_new_discontinuous (FALSE, GST_FORMAT_BYTES, src->curoffset,
+        NULL);
+    return GST_DATA (event);
+  }
+
+  src->curoffset += readsize;
+  GST_LOG_OBJECT (src,
+      "Returning buffer of size %d with timestamp %" GST_TIME_FORMAT
+      " and duration %" GST_TIME_FORMAT, readsize,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
+  return GST_DATA (buf);
+}
+
+
+static void
+gst_tcpclientsrc_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstTCPClientSrc *tcpclientsrc;
+
+  /* it's not null if we got it, but it might not be ours */
+  g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
+  tcpclientsrc = GST_TCPCLIENTSRC (object);
+
+  switch (prop_id) {
+    case ARG_PORT:
+      tcpclientsrc->port = g_value_get_int (value);
+      break;
+    case ARG_HOST:
+      /* FIXME: create a setter and handle changes correctly */
+      g_free (tcpclientsrc->host);
+      tcpclientsrc->host = g_strdup (g_value_get_string (value));
+      break;
+    case ARG_PROTOCOL:
+      tcpclientsrc->protocol = g_value_get_enum (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_tcpclientsrc_get_property (GObject * object, guint prop_id, GValue * value,
+    GParamSpec * pspec)
+{
+  GstTCPClientSrc *tcpclientsrc;
+
+  g_return_if_fail (GST_IS_TCPCLIENTSRC (object));
+  tcpclientsrc = GST_TCPCLIENTSRC (object);
+
+  switch (prop_id) {
+    case ARG_PORT:
+      g_value_set_int (value, tcpclientsrc->port);
+      break;
+    case ARG_HOST:
+      g_value_set_string (value, tcpclientsrc->host);
+      break;
+    case ARG_PROTOCOL:
+      g_value_set_enum (value, tcpclientsrc->protocol);
+      break;
+
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+/* create a socket for connecting to remote server */
+static gboolean
+gst_tcpclientsrc_init_receive (GstTCPClientSrc * this)
+{
+  int ret;
+  gchar *ip;
+
+  /* create receiving client socket */
+  GST_DEBUG_OBJECT (this, "opening receiving client socket to %s:%d",
+      this->host, this->port);
+  if ((this->sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+  GST_DEBUG_OBJECT (this, "opened receiving client socket with fd %d",
+      this->sock_fd);
+
+  /* look up name if we need to */
+  ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
+  if (!ip)
+    return FALSE;
+  GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
+
+  /* connect to server */
+  memset (&this->server_sin, 0, sizeof (this->server_sin));
+  this->server_sin.sin_family = AF_INET;        /* network socket */
+  this->server_sin.sin_port = htons (this->port);       /* on port */
+  this->server_sin.sin_addr.s_addr = inet_addr (ip);    /* on host ip */
+
+  GST_DEBUG_OBJECT (this, "connecting to server");
+  ret = connect (this->sock_fd, (struct sockaddr *) &this->server_sin,
+      sizeof (this->server_sin));
+
+  if (ret) {
+    switch (errno) {
+      case ECONNREFUSED:
+        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ,
+            (_("Connection to %s:%d refused."), this->host, this->port),
+            (NULL));
+        return FALSE;
+        break;
+      default:
+        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+            ("connect to %s:%d failed: %s", this->host, this->port,
+                g_strerror (errno)));
+        return FALSE;
+        break;
+    }
+  }
+
+  this->send_discont = TRUE;
+  this->buffer_after_discont = NULL;
+  GST_FLAG_SET (this, GST_TCPCLIENTSRC_OPEN);
+
+  return TRUE;
+}
+
+static void
+gst_tcpclientsrc_close (GstTCPClientSrc * this)
+{
+  if (this->sock_fd != -1) {
+    close (this->sock_fd);
+    this->sock_fd = -1;
+  }
+  GST_FLAG_UNSET (this, GST_TCPCLIENTSRC_OPEN);
+}
+
+static GstElementStateReturn
+gst_tcpclientsrc_change_state (GstElement * element)
+{
+  g_return_val_if_fail (GST_IS_TCPCLIENTSRC (element), GST_STATE_FAILURE);
+
+  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
+    if (GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN))
+      gst_tcpclientsrc_close (GST_TCPCLIENTSRC (element));
+  } else {
+    if (!GST_FLAG_IS_SET (element, GST_TCPCLIENTSRC_OPEN)) {
+      if (!gst_tcpclientsrc_init_receive (GST_TCPCLIENTSRC (element)))
+        return GST_STATE_FAILURE;
+    }
+  }
+
+  if (GST_ELEMENT_CLASS (parent_class)->change_state)
+    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+  return GST_STATE_SUCCESS;
+}
diff --git a/gst/tcp/gsttcpclientsrc.h b/gst/tcp/gsttcpclientsrc.h
new file mode 100644 (file)
index 0000000..9f3221e
--- /dev/null
@@ -0,0 +1,90 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifndef __GST_TCPCLIENTSRC_H__
+#define __GST_TCPCLIENTSRC_H__
+
+#include <gst/gst.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+#include <netdb.h>                        /* sockaddr_in */
+#include "gsttcp.h"
+
+#define GST_TYPE_TCPCLIENTSRC \
+  (gst_tcpclientsrc_get_type())
+#define GST_TCPCLIENTSRC(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPCLIENTSRC,GstTCPClientSrc))
+#define GST_TCPCLIENTSRC_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPCLIENTSRC,GstTCPClientSrc))
+#define GST_IS_TCPCLIENTSRC(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPCLIENTSRC))
+#define GST_IS_TCPCLIENTSRC_CLASS(obj) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPCLIENTSRC))
+
+typedef struct _GstTCPClientSrc GstTCPClientSrc;
+typedef struct _GstTCPClientSrcClass GstTCPClientSrcClass;
+
+typedef enum {
+  GST_TCPCLIENTSRC_OPEN       = GST_ELEMENT_FLAG_LAST,
+
+  GST_TCPCLIENTSRC_FLAG_LAST,
+} GstTCPClientSrcFlags;
+
+struct _GstTCPClientSrc {
+  GstElement element;
+
+  /* pad */
+  GstPad *srcpad;
+
+  /* server information */
+  int port;
+  gchar *host;
+  struct sockaddr_in server_sin;
+
+  /* socket */
+  int sock_fd;
+
+  /* number of bytes we've gotten */
+  off_t curoffset;
+
+  GstTCPProtocolType protocol; /* protocol used for reading data */
+  gboolean caps_received;      /* if we have received caps yet */
+  GstClock *clock;
+
+  gboolean send_discont;       /* TRUE when we need to send a discont */
+  GstBuffer *buffer_after_discont; /* temporary storage for buffer */
+};
+
+struct _GstTCPClientSrcClass {
+  GstElementClass parent_class;
+};
+
+GType gst_tcpclientsrc_get_type (void);
+
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif /* __GST_TCPCLIENTSRC_H__ */
index ace1a34..69ae7a2 100644 (file)
 
 #include "gsttcpsrc.h"
 #include "gsttcpsink.h"
+#include "gsttcpclientsrc.h"
+#include "gsttcpclientsink.h"
+#include "gsttcpserversrc.h"
+#include "gsttcpserversink.h"
 
 static gboolean
 plugin_init (GstPlugin * plugin)
 {
+  if (!gst_library_load ("gstdataprotocol"))
+    return FALSE;
+
   if (!gst_element_register (plugin, "tcpsink", GST_RANK_NONE,
           GST_TYPE_TCPSINK))
     return FALSE;
@@ -34,6 +41,19 @@ plugin_init (GstPlugin * plugin)
   if (!gst_element_register (plugin, "tcpsrc", GST_RANK_NONE, GST_TYPE_TCPSRC))
     return FALSE;
 
+  if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE,
+          GST_TYPE_TCPCLIENTSINK))
+    return FALSE;
+  if (!gst_element_register (plugin, "tcpclientsrc", GST_RANK_NONE,
+          GST_TYPE_TCPCLIENTSRC))
+    return FALSE;
+  if (!gst_element_register (plugin, "tcpserversink", GST_RANK_NONE,
+          GST_TYPE_TCPSERVERSINK))
+    return FALSE;
+  if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE,
+          GST_TYPE_TCPSERVERSRC))
+    return FALSE;
+
   return TRUE;
 }
 
diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c
new file mode 100644 (file)
index 0000000..651d775
--- /dev/null
@@ -0,0 +1,545 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gst/gst-i18n-plugin.h>
+
+#include <sys/ioctl.h>
+#include "gsttcpserversink.h"
+
+#define TCP_DEFAULT_HOST       "127.0.0.1"
+#define TCP_DEFAULT_PORT       4953
+#define TCP_BACKLOG            5
+
+/* elementfactory information */
+static GstElementDetails gst_tcpserversink_details =
+GST_ELEMENT_DETAILS ("TCP Server sink",
+    "Sink/Network",
+    "Send data as a server over the network via TCP",
+    "Thomas Vander Stichele <thomas at apestaart dot org>");
+
+/* TCPServerSink signals and args */
+enum
+{
+  FRAME_ENCODED,
+  /* FILL ME */
+  LAST_SIGNAL
+};
+
+GST_DEBUG_CATEGORY (tcpserversink_debug);
+#define GST_CAT_DEFAULT (tcpserversink_debug)
+
+enum
+{
+  ARG_0,
+  ARG_HOST,
+  ARG_PORT,
+  /* FILL ME */
+};
+
+static void gst_tcpserversink_base_init (gpointer g_class);
+static void gst_tcpserversink_class_init (GstTCPServerSink * klass);
+static void gst_tcpserversink_init (GstTCPServerSink * tcpserversink);
+
+static void gst_tcpserversink_set_clock (GstElement * element,
+    GstClock * clock);
+
+static void gst_tcpserversink_chain (GstPad * pad, GstData * _data);
+static GstElementStateReturn gst_tcpserversink_change_state (GstElement *
+    element);
+
+static void gst_tcpserversink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_tcpserversink_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+
+static GstElementClass *parent_class = NULL;
+
+/*static guint gst_tcpserversink_signals[LAST_SIGNAL] = { 0 }; */
+
+GType
+gst_tcpserversink_get_type (void)
+{
+  static GType tcpserversink_type = 0;
+
+
+  if (!tcpserversink_type) {
+    static const GTypeInfo tcpserversink_info = {
+      sizeof (GstTCPServerSinkClass),
+      gst_tcpserversink_base_init,
+      NULL,
+      (GClassInitFunc) gst_tcpserversink_class_init,
+      NULL,
+      NULL,
+      sizeof (GstTCPServerSink),
+      0,
+      (GInstanceInitFunc) gst_tcpserversink_init,
+      NULL
+    };
+
+    tcpserversink_type =
+        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSink",
+        &tcpserversink_info, 0);
+  }
+  return tcpserversink_type;
+}
+
+static void
+gst_tcpserversink_base_init (gpointer g_class)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_set_details (element_class, &gst_tcpserversink_details);
+}
+
+static void
+gst_tcpserversink_class_init (GstTCPServerSink * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
+      g_param_spec_string ("host", "host", "The host/IP to send the packets to",
+          TCP_DEFAULT_HOST, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
+      g_param_spec_int ("port", "port", "The port to send the packets to",
+          0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
+  gobject_class->set_property = gst_tcpserversink_set_property;
+  gobject_class->get_property = gst_tcpserversink_get_property;
+
+  gstelement_class->change_state = gst_tcpserversink_change_state;
+  gstelement_class->set_clock = gst_tcpserversink_set_clock;
+
+  GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink");
+}
+
+static void
+gst_tcpserversink_set_clock (GstElement * element, GstClock * clock)
+{
+  GstTCPServerSink *tcpserversink;
+
+  tcpserversink = GST_TCPSERVERSINK (element);
+
+  tcpserversink->clock = clock;
+}
+
+static void
+gst_tcpserversink_init (GstTCPServerSink * this)
+{
+  /* create the sink pad */
+  this->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
+  gst_element_add_pad (GST_ELEMENT (this), this->sinkpad);
+  gst_pad_set_chain_function (this->sinkpad, gst_tcpserversink_chain);
+
+  this->server_port = TCP_DEFAULT_PORT;
+  /* should support as minimum 576 for IPV4 and 1500 for IPV6 */
+  /* this->mtu = 1500; */
+
+  this->server_sock_fd = -1;
+  GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
+
+  this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
+  this->clock = NULL;
+}
+
+static void
+gst_tcpserversink_debug_fdset (GstTCPServerSink * sink, fd_set * testfds)
+{
+  int fd;
+
+  for (fd = 0; fd < FD_SETSIZE; fd++) {
+    if (FD_ISSET (fd, testfds)) {
+      GST_LOG_OBJECT (sink, "fd %d", fd);
+    }
+  }
+}
+
+/* handle a read request on the server,
+ * which indicates a new client connection */
+static gboolean
+gst_tcpserversink_handle_server_read (GstTCPServerSink * sink)
+{
+  /* new client */
+  int client_sock_fd;
+  struct sockaddr_in client_address;
+  int client_address_len;
+
+  client_sock_fd =
+      accept (sink->server_sock_fd, (struct sockaddr *) &client_address,
+      &client_address_len);
+  if (client_sock_fd == -1) {
+    GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
+        ("Could not accept client on server socket: %s", g_strerror (errno)));
+    return FALSE;
+  }
+  FD_SET (client_sock_fd, &(sink->clientfds));
+  GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d",
+      inet_ntoa (client_address.sin_addr), client_sock_fd);
+
+  return TRUE;
+}
+
+/* handle a read on a client fd,
+ * which either indicates a close or should be ignored */
+static gboolean
+gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd)
+{
+  int nread;
+
+  GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd);
+
+  ioctl (fd, FIONREAD, &nread);
+  if (nread == 0) {
+    /* client sent close, so remove it */
+    GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd);
+    if (close (fd) != 0) {
+      GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL),
+          ("error closing fd %d: %s", fd, g_strerror (errno)));
+      return FALSE;
+    }
+    FD_CLR (fd, &sink->clientfds);
+    FD_CLR (fd, &sink->caps_sent);
+  } else {
+    /* FIXME: we should probably just Read 'n' Drop */
+    g_warning ("Don't know what to do with %d bytes to read", nread);
+  }
+  return TRUE;
+}
+
+/* handle a write on a client fd,
+ * which indicates a read request from a client */
+static gboolean
+gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd,
+    GstPad * pad, GstBuffer * buf)
+{
+  /* write the buffer header if we have one */
+  switch (sink->protocol) {
+    case GST_TCP_PROTOCOL_TYPE_NONE:
+      break;
+
+    case GST_TCP_PROTOCOL_TYPE_GDP:
+      /* if we haven't send caps yet, send them first */
+      if (!FD_ISSET (fd, &(sink->caps_sent))) {
+        const GstCaps *caps;
+        gchar *string;
+
+        caps = GST_PAD_CAPS (GST_PAD_PEER (pad));
+        string = gst_caps_to_string (caps);
+        GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string,
+            fd);
+        /* FIXME: fix this again so that write_caps is non-fatal for multiple clients; also use a fd, host, port struct */
+        if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE,
+                "unknown", 0)) {
+          g_free (string);
+          return FALSE;
+        }
+        g_free (string);
+        FD_SET (fd, &(sink->caps_sent));
+      }
+      GST_LOG_OBJECT (sink, "Sending buffer header through GDP");
+      if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), fd, buf, FALSE,
+              "unknown", 0))
+        return FALSE;
+      break;
+    default:
+      g_warning ("Unhandled protocol type");
+      break;
+  }
+
+  /* serve data to client */
+  GST_LOG_OBJECT (sink, "serving data buffer of size %d to client on fd %d",
+      GST_BUFFER_SIZE (buf), fd);
+
+  int wrote = 0;
+
+  wrote =
+      gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
+
+  if (wrote < GST_BUFFER_SIZE (buf)) {
+/* FIXME: keep track of client ip and port and so on */
+/*
+              GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
+                (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
+                ("Only %d of %d bytes written: %s",
+                  bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno)));
+*/
+    /* FIXME: there should be a better way to report problems, since we
+       want to continue for other clients and just drop this particular one */
+    g_warning ("Write failed: %d of %d written", wrote, GST_BUFFER_SIZE (buf));
+  }
+  return TRUE;
+}
+
+static void
+gst_tcpserversink_chain (GstPad * pad, GstData * _data)
+{
+  int result;
+  int fd;
+  fd_set testreadfds, testwritefds;
+  struct timeval timeout;
+  struct timeval *timeoutp;
+
+  GstBuffer *buf = GST_BUFFER (_data);
+  GstTCPServerSink *sink;
+
+  g_return_if_fail (pad != NULL);
+  g_return_if_fail (GST_IS_PAD (pad));
+  g_return_if_fail (buf != NULL);
+  sink = GST_TCPSERVERSINK (GST_OBJECT_PARENT (pad));
+  g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN));
+
+  if (GST_IS_EVENT (buf)) {
+    g_warning ("FIXME: handl events");
+    return;
+  }
+
+  /* if the incoming buffer has a duration, we can use that as the timeout
+   * value; otherwise, we block */
+  timeout.tv_sec = 0;
+  timeout.tv_usec = 0;
+  timeoutp = NULL;
+  GST_LOG_OBJECT (sink, "incoming buffer duration: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
+  if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) {
+    GST_TIME_TO_TIMEVAL (GST_BUFFER_DURATION (buf), timeout);
+    timeoutp = &timeout;
+    GST_LOG_OBJECT (sink, "select will be with timeout %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (GST_BUFFER_DURATION (buf)));
+    GST_LOG_OBJECT (sink, "select will be with timeout %d.%d",
+        timeout.tv_sec, timeout.tv_usec);
+  }
+  /* check for:
+   * - server socket input (ie, new client connections)
+   * - client socket input (ie, clients saying goodbye)
+   * - client socket output (ie, client reads)          */
+  testwritefds = sink->clientfds;
+  testreadfds = sink->clientfds;
+  FD_SET (sink->server_sock_fd, &testreadfds);
+
+  GST_LOG_OBJECT (sink, "doing select on server + client fds for reads");
+  gst_tcpserversink_debug_fdset (sink, &testreadfds);
+  GST_LOG_OBJECT (sink, "doing select on client fds for writes");
+  gst_tcpserversink_debug_fdset (sink, &testwritefds);
+
+  result = select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0,
+      timeoutp);
+  /* < 0 is an error, 0 just means a timeout happened */
+  if (result < 0) {
+    GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
+        ("select failed: %s", g_strerror (errno)));
+    return;
+  }
+  GST_LOG_OBJECT (sink, "%d sockets had action", result);
+  GST_LOG_OBJECT (sink, "done select on server/client fds for reads");
+  gst_tcpserversink_debug_fdset (sink, &testreadfds);
+  GST_LOG_OBJECT (sink, "done select on client fds for writes");
+  gst_tcpserversink_debug_fdset (sink, &testwritefds);
+
+  /* Check the reads */
+  for (fd = 0; fd < FD_SETSIZE; fd++) {
+    if (FD_ISSET (fd, &testreadfds)) {
+      if (fd == sink->server_sock_fd) {
+        /* handle new client connection on server socket */
+        if (!gst_tcpserversink_handle_server_read (sink))
+          return;
+      } else {
+        /* handle client read */
+        if (!gst_tcpserversink_handle_client_read (sink, fd))
+          return;
+      }
+    }
+  }
+
+  /* Check the writes */
+  for (fd = 0; fd < FD_SETSIZE; fd++) {
+    if (FD_ISSET (fd, &testwritefds)) {
+      if (!gst_tcpserversink_handle_client_write (sink, fd, pad, buf))
+        return;
+    }
+  }
+  sink->data_written += GST_BUFFER_SIZE (buf);
+
+  gst_buffer_unref (buf);
+
+  /* FIXME: emit signal ? */
+}
+
+static void
+gst_tcpserversink_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstTCPServerSink *tcpserversink;
+
+  g_return_if_fail (GST_IS_TCPSERVERSINK (object));
+  tcpserversink = GST_TCPSERVERSINK (object);
+
+  switch (prop_id) {
+    case ARG_HOST:
+      if (tcpserversink->host != NULL)
+        g_free (tcpserversink->host);
+      if (g_value_get_string (value) == NULL)
+        tcpserversink->host = NULL;
+      else
+        tcpserversink->host = g_strdup (g_value_get_string (value));
+      break;
+    case ARG_PORT:
+      tcpserversink->server_port = g_value_get_int (value);
+      break;
+    default:
+      break;
+  }
+}
+
+static void
+gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value,
+    GParamSpec * pspec)
+{
+  GstTCPServerSink *tcpserversink;
+
+  g_return_if_fail (GST_IS_TCPSERVERSINK (object));
+  tcpserversink = GST_TCPSERVERSINK (object);
+
+  switch (prop_id) {
+    case ARG_HOST:
+      g_value_set_string (value, tcpserversink->host);
+      break;
+    case ARG_PORT:
+      g_value_set_int (value, tcpserversink->server_port);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+
+/* create a socket for sending to remote machine */
+static gboolean
+gst_tcpserversink_init_send (GstTCPServerSink * this)
+{
+  int ret;
+
+  /* create sending server socket */
+  if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+  GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d",
+      this->server_sock_fd);
+
+  /* make address reusable */
+  if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
+          sizeof (int)) < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
+        ("Could not setsockopt: %s", g_strerror (errno)));
+    return FALSE;
+  }
+  /* keep connection alive; avoids SIGPIPE during write */
+  if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_KEEPALIVE, &ret,
+          sizeof (int)) < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
+        ("Could not setsockopt: %s", g_strerror (errno)));
+    return FALSE;
+  }
+
+  /* name the socket */
+  memset (&this->server_sin, 0, sizeof (this->server_sin));
+  this->server_sin.sin_family = AF_INET;        /* network socket */
+  this->server_sin.sin_port = htons (this->server_port);        /* on port */
+  this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);        /* for hosts */
+
+  /* bind it */
+  GST_DEBUG_OBJECT (this, "binding server socket to address");
+  ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
+      sizeof (this->server_sin));
+
+  if (ret) {
+    switch (errno) {
+      default:
+        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+            ("bind failed: %s", g_strerror (errno)));
+        return FALSE;
+        break;
+    }
+  }
+
+  /* set the server socket to nonblocking */
+  fcntl (this->server_sock_fd, F_SETFL, O_NONBLOCK);
+
+  GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
+      this->server_sock_fd, TCP_BACKLOG);
+  if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+        ("Could not listen on server socket: %s", g_strerror (errno)));
+    return FALSE;
+  }
+  GST_DEBUG_OBJECT (this,
+      "listened on server socket %d, returning from connection setup",
+      this->server_sock_fd);
+
+  FD_ZERO (&this->clientfds);
+  FD_ZERO (&this->caps_sent);
+  FD_SET (this->server_sock_fd, &this->clientfds);
+  GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN);
+
+  this->data_written = 0;
+
+  return TRUE;
+}
+
+static void
+gst_tcpserversink_close (GstTCPServerSink * this)
+{
+  if (this->server_sock_fd != -1) {
+    close (this->server_sock_fd);
+    this->server_sock_fd = -1;
+  }
+
+  GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN);
+}
+
+static GstElementStateReturn
+gst_tcpserversink_change_state (GstElement * element)
+{
+  g_return_val_if_fail (GST_IS_TCPSERVERSINK (element), GST_STATE_FAILURE);
+
+  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
+    if (GST_FLAG_IS_SET (element, GST_TCPSERVERSINK_OPEN))
+      gst_tcpserversink_close (GST_TCPSERVERSINK (element));
+  } else {
+    if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSINK_OPEN)) {
+      if (!gst_tcpserversink_init_send (GST_TCPSERVERSINK (element)))
+        return GST_STATE_FAILURE;
+    }
+  }
+
+  if (GST_ELEMENT_CLASS (parent_class)->change_state)
+    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+  return GST_STATE_SUCCESS;
+}
diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h
new file mode 100644 (file)
index 0000000..095ea5a
--- /dev/null
@@ -0,0 +1,103 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifndef __GST_TCPSERVERSINK_H__
+#define __GST_TCPSERVERSINK_H__
+
+
+#include <gst/gst.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <fcntl.h>
+#include <arpa/inet.h>
+#include "gsttcp.h"
+
+#define GST_TYPE_TCPSERVERSINK \
+  (gst_tcpserversink_get_type())
+#define GST_TCPSERVERSINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSERVERSINK,GstTCPServerSink))
+#define GST_TCPSERVERSINK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSERVERSINK,GstTCPServerSink))
+#define GST_IS_TCPSERVERSINK(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSERVERSINK))
+#define GST_IS_TCPSERVERSINK_CLASS(obj) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSERVERSINK))
+
+typedef struct _GstTCPServerSink GstTCPServerSink;
+typedef struct _GstTCPServerSinkClass GstTCPServerSinkClass;
+
+typedef enum {
+  GST_TCPSERVERSINK_OPEN             = GST_ELEMENT_FLAG_LAST,
+
+  GST_TCPSERVERSINK_FLAG_LAST        = GST_ELEMENT_FLAG_LAST + 2,
+} GstTCPServerSinkFlags;
+
+struct _GstTCPServerSink {
+  GstElement element;
+
+  /* pad */
+  GstPad *sinkpad;
+
+  /* server information */
+  int server_port;
+  gchar *host;
+  struct sockaddr_in server_sin;
+
+  /* socket */
+  int server_sock_fd;
+
+  size_t data_written; /* how much bytes have we written ? */
+
+  fd_set clientfds; /* all the client file descriptors that are open */
+  fd_set caps_sent; /* all the client file descriptors that have had caps sent */
+
+  GstTCPProtocolType protocol;
+  guint mtu;
+  GstClock *clock;
+};
+
+struct _GstTCPServerSinkClass {
+  GstElementClass parent_class;
+};
+
+GType gst_tcpserversink_get_type (void);
+
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+
+#endif /* __GST_TCPSERVERSINK_H__ */
diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c
new file mode 100644 (file)
index 0000000..449dcb8
--- /dev/null
@@ -0,0 +1,557 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gst/gst-i18n-plugin.h>
+#include "gsttcp.h"
+#include "gsttcpserversrc.h"
+#include <unistd.h>
+#include <sys/ioctl.h>
+
+GST_DEBUG_CATEGORY (tcpserversrc_debug);
+#define GST_CAT_DEFAULT tcpserversrc_debug
+
+#define TCP_DEFAULT_PORT               4953
+#define TCP_DEFAULT_HOST               NULL    /* listen on all interfaces */
+#define TCP_BACKLOG                    1       /* client connection queue */
+
+/* elementfactory information */
+static GstElementDetails gst_tcpserversrc_details =
+GST_ELEMENT_DETAILS ("TCP Server source",
+    "Source/Network",
+    "Receive data as a server over the network via TCP",
+    "Thomas Vander Stichele <thomas at apestaart dot org>");
+
+/* TCPServerSrc signals and args */
+enum
+{
+  LAST_SIGNAL
+};
+
+enum
+{
+  ARG_0,
+  ARG_PORT,
+  ARG_HOST,
+  ARG_PROTOCOL
+};
+
+static void gst_tcpserversrc_base_init (gpointer g_class);
+static void gst_tcpserversrc_class_init (GstTCPServerSrc * klass);
+static void gst_tcpserversrc_init (GstTCPServerSrc * tcpserversrc);
+
+static GstData *gst_tcpserversrc_get (GstPad * pad);
+static GstElementStateReturn gst_tcpserversrc_change_state (GstElement *
+    element);
+
+static void gst_tcpserversrc_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_tcpserversrc_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+static void gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock);
+
+static GstElementClass *parent_class = NULL;
+
+/*static guint gst_tcpserversrc_signals[LAST_SIGNAL] = { 0 }; */
+
+GType
+gst_tcpserversrc_get_type (void)
+{
+  static GType tcpserversrc_type = 0;
+
+
+  if (!tcpserversrc_type) {
+    static const GTypeInfo tcpserversrc_info = {
+      sizeof (GstTCPServerSrcClass),
+      gst_tcpserversrc_base_init,
+      NULL,
+      (GClassInitFunc) gst_tcpserversrc_class_init,
+      NULL,
+      NULL,
+      sizeof (GstTCPServerSrc),
+      0,
+      (GInstanceInitFunc) gst_tcpserversrc_init,
+      NULL
+    };
+
+    tcpserversrc_type =
+        g_type_register_static (GST_TYPE_ELEMENT, "GstTCPServerSrc",
+        &tcpserversrc_info, 0);
+  }
+  return tcpserversrc_type;
+}
+
+static void
+gst_tcpserversrc_base_init (gpointer g_class)
+{
+  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
+
+  gst_element_class_set_details (element_class, &gst_tcpserversrc_details);
+}
+
+static void
+gst_tcpserversrc_class_init (GstTCPServerSrc * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+
+  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT,
+      g_param_spec_int ("port", "Port", "The port to listen to",
+          0, 32768, TCP_DEFAULT_PORT, G_PARAM_READWRITE));
+  g_object_class_install_property (gobject_class, ARG_PROTOCOL,
+      g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in",
+          GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_GDP,
+          G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST,
+      g_param_spec_string ("host", "Host", "The hostname to listen",
+          TCP_DEFAULT_HOST, G_PARAM_READWRITE));
+
+  gobject_class->set_property = gst_tcpserversrc_set_property;
+  gobject_class->get_property = gst_tcpserversrc_get_property;
+
+  gstelement_class->change_state = gst_tcpserversrc_change_state;
+  gstelement_class->set_clock = gst_tcpserversrc_set_clock;
+
+  GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
+      "TCP Server Source");
+}
+
+static void
+gst_tcpserversrc_set_clock (GstElement * element, GstClock * clock)
+{
+  GstTCPServerSrc *tcpserversrc;
+
+  tcpserversrc = GST_TCPSERVERSRC (element);
+
+  tcpserversrc->clock = clock;
+}
+
+static void
+gst_tcpserversrc_init (GstTCPServerSrc * this)
+{
+  /* create the src pad */
+  this->srcpad = gst_pad_new ("src", GST_PAD_SRC);
+  gst_element_add_pad (GST_ELEMENT (this), this->srcpad);
+  gst_pad_set_get_function (this->srcpad, gst_tcpserversrc_get);
+
+  this->server_port = TCP_DEFAULT_PORT;
+  this->host = TCP_DEFAULT_HOST;
+  this->clock = NULL;
+  this->server_sock_fd = -1;
+  this->client_sock_fd = -1;
+  this->curoffset = 0;
+  this->protocol = GST_TCP_PROTOCOL_TYPE_GDP;
+
+  GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
+}
+
+/* read the gdp caps packet from the socket */
+static GstCaps *
+gst_tcpserversrc_gdp_read_caps (GstTCPServerSrc * this)
+{
+  size_t header_length = GST_DP_HEADER_LENGTH;
+  size_t readsize;
+  guint8 *header = NULL;
+  guint8 *payload = NULL;
+  size_t ret;
+  GstCaps *caps;
+  gchar *string;
+
+  header = g_malloc (header_length);
+
+  readsize = header_length;
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", readsize);
+  ret = read (this->client_sock_fd, header, readsize);
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return NULL;
+  }
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_header (header_length, header)) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP caps packet header does not validate"));
+    g_free (header);
+    return NULL;
+  }
+
+  readsize = gst_dp_header_payload_length (header);
+  payload = g_malloc (readsize);
+  GST_LOG_OBJECT (this, "Reading %d bytes for caps packet payload", readsize);
+  ret = read (this->client_sock_fd, payload, readsize);
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    g_free (header);
+    return NULL;
+  }
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_payload (readsize, header, payload)) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP caps packet payload does not validate"));
+    g_free (header);
+    g_free (payload);
+    return NULL;
+  }
+
+  caps = gst_dp_caps_from_packet (header_length, header, payload);
+  string = gst_caps_to_string (caps);
+  GST_DEBUG_OBJECT (this, "retrieved GDP caps from packet payload: %s", string);
+
+  g_free (header);
+  g_free (payload);
+  g_free (string);
+  return caps;
+}
+
+/* read the gdp buffer header from the socket
+ * returns a GstData,
+ * representing the new GstBuffer to read data into, or an EOS event
+ */
+static GstData *
+gst_tcpserversrc_gdp_read_header (GstTCPServerSrc * this)
+{
+  size_t header_length = GST_DP_HEADER_LENGTH;
+  size_t readsize;
+  guint8 *header = NULL;
+  size_t ret;
+  GstBuffer *buffer;
+
+  header = g_malloc (header_length);
+
+  readsize = header_length;
+  GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", readsize);
+  ret = read (this->client_sock_fd, header, readsize);
+  /* if we read 0 bytes, and we're blocking, we hit eos */
+  if (ret == 0) {
+    GST_DEBUG ("blocking read returns 0, EOS");
+    gst_element_set_eos (GST_ELEMENT (this));
+    return GST_DATA (gst_event_new (GST_EVENT_EOS));
+  }
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return NULL;
+  }
+  if (ret != readsize) {
+    g_warning ("Wanted %d bytes, got %d bytes", readsize, ret);
+  }
+  g_assert (ret == readsize);
+
+  if (!gst_dp_validate_header (header_length, header)) {
+    GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
+        ("GDP buffer packet header does not validate"));
+    g_free (header);
+    return NULL;
+  }
+  GST_LOG_OBJECT (this, "validated buffer packet header");
+
+  buffer = gst_dp_buffer_from_header (header_length, header);
+
+  GST_LOG_OBJECT (this, "created new buffer %p from packet header", buffer);
+  return GST_DATA (buffer);
+}
+
+static GstData *
+gst_tcpserversrc_get (GstPad * pad)
+{
+  GstTCPServerSrc *src;
+  size_t readsize;
+  int ret;
+
+  GstData *data = NULL;
+  GstBuffer *buf = NULL;
+  GstCaps *caps;
+
+  g_return_val_if_fail (pad != NULL, NULL);
+  g_return_val_if_fail (GST_IS_PAD (pad), NULL);
+  src = GST_TCPSERVERSRC (GST_OBJECT_PARENT (pad));
+  g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_TCPSERVERSRC_OPEN), NULL);
+
+  /* read the buffer header if we're using a protocol */
+  switch (src->protocol) {
+      fd_set testfds;
+
+    case GST_TCP_PROTOCOL_TYPE_NONE:
+      /* do a blocking select on the socket */
+      FD_ZERO (&testfds);
+      FD_SET (src->client_sock_fd, &testfds);
+      ret =
+          select (src->client_sock_fd + 1, &testfds, (fd_set *) 0, (fd_set *) 0,
+          0);
+      /* no action (0) is an error too in our case */
+      if (ret <= 0) {
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("select failed: %s", g_strerror (errno)));
+        return NULL;
+      }
+      /* ask how much is available for reading on the socket */
+      ret = ioctl (src->client_sock_fd, FIONREAD, &readsize);
+      if (ret < 0) {
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("ioctl failed: %s", g_strerror (errno)));
+        return NULL;
+      }
+
+      buf = gst_buffer_new_and_alloc (readsize);
+      break;
+    case GST_TCP_PROTOCOL_TYPE_GDP:
+      /* if we haven't received caps yet, we should get them first */
+      if (!src->caps_received) {
+        gchar *string;
+
+        if (!(caps = gst_tcpserversrc_gdp_read_caps (src))) {
+          GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+              ("Could not read caps through GDP"));
+          return NULL;
+        }
+        src->caps_received = TRUE;
+        string = gst_caps_to_string (caps);
+        GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string);
+        g_free (string);
+
+        if (!gst_pad_try_set_caps (pad, caps)) {
+          g_warning ("Could not set caps");
+          return NULL;
+        }
+      }
+
+      /* now receive the buffer header */
+      if (!(data = gst_tcpserversrc_gdp_read_header (src))) {
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+            ("Could not read data header through GDP"));
+        return NULL;
+      }
+      if (GST_IS_EVENT (data))
+        return data;
+      buf = GST_BUFFER (data);
+
+      GST_LOG_OBJECT (src, "Going to read data from socket into buffer %p",
+          buf);
+      /* use this new buffer to read data into */
+      readsize = GST_BUFFER_SIZE (buf);
+      break;
+    default:
+      g_warning ("Unhandled protocol type");
+      break;
+  }
+
+  GST_LOG_OBJECT (src, "Reading %d bytes", readsize);
+  ret =
+      gst_tcp_socket_read (src->client_sock_fd, GST_BUFFER_DATA (buf),
+      readsize);
+  if (ret < 0) {
+    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+    return NULL;
+  }
+
+  /* if we read 0 bytes, and we're blocking, we hit eos */
+  if (ret == 0) {
+    GST_DEBUG ("blocking read returns 0, EOS");
+    gst_buffer_unref (buf);
+    gst_element_set_eos (GST_ELEMENT (src));
+    return GST_DATA (gst_event_new (GST_EVENT_EOS));
+  }
+
+  readsize = ret;
+  GST_LOG_OBJECT (src, "Read %d bytes", readsize);
+  GST_BUFFER_SIZE (buf) = readsize;
+  GST_BUFFER_MAXSIZE (buf) = readsize;
+  GST_BUFFER_OFFSET (buf) = src->curoffset;
+  GST_BUFFER_OFFSET_END (buf) = src->curoffset + readsize;
+  src->curoffset += readsize;
+  return GST_DATA (buf);
+}
+
+
+static void
+gst_tcpserversrc_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstTCPServerSrc *tcpserversrc;
+
+  g_return_if_fail (GST_IS_TCPSERVERSRC (object));
+  tcpserversrc = GST_TCPSERVERSRC (object);
+
+  switch (prop_id) {
+    case ARG_PORT:
+      tcpserversrc->server_port = g_value_get_int (value);
+      break;
+    case ARG_PROTOCOL:
+      tcpserversrc->protocol = g_value_get_enum (value);
+      break;
+    case ARG_HOST:
+      if (tcpserversrc->host)
+        g_free (tcpserversrc->host);
+      tcpserversrc->host = g_strdup (g_value_get_string (value));
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_tcpserversrc_get_property (GObject * object, guint prop_id, GValue * value,
+    GParamSpec * pspec)
+{
+  GstTCPServerSrc *tcpserversrc;
+
+  g_return_if_fail (GST_IS_TCPSERVERSRC (object));
+  tcpserversrc = GST_TCPSERVERSRC (object);
+
+  switch (prop_id) {
+    case ARG_PORT:
+      g_value_set_int (value, tcpserversrc->server_port);
+      break;
+    case ARG_PROTOCOL:
+      g_value_set_enum (value, tcpserversrc->protocol);
+      break;
+    case ARG_HOST:
+      g_value_set_string (value, tcpserversrc->host);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+/* set up server */
+static gboolean
+gst_tcpserversrc_init_receive (GstTCPServerSrc * this)
+{
+  int ret;
+
+  /* reset caps_received flag */
+  this->caps_received = FALSE;
+
+  /* create the server listener socket */
+  if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM);
+    return FALSE;
+  }
+  GST_DEBUG_OBJECT (this, "opened receiving server socket with fd %d",
+      this->server_sock_fd);
+
+  /* make address reusable */
+  if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret,
+          sizeof (int)) < 0) {
+    GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
+        ("Could not setsockopt: %s", g_strerror (errno)));
+    return FALSE;
+  }
+
+  /* name the socket */
+  memset (&this->server_sin, 0, sizeof (this->server_sin));
+  this->server_sin.sin_family = AF_INET;        /* network socket */
+  this->server_sin.sin_port = htons (this->server_port);        /* on port */
+  if (this->host) {
+    gchar *host = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host);
+
+    if (!host)
+      return FALSE;
+
+    this->server_sin.sin_addr.s_addr = inet_addr (host);
+    g_free (host);
+  } else
+    this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY);
+
+  /* bind it */
+  GST_DEBUG_OBJECT (this, "binding server socket to address");
+  ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin,
+      sizeof (this->server_sin));
+
+  if (ret) {
+    switch (errno) {
+      default:
+        GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+            ("bind failed: %s", g_strerror (errno)));
+        return FALSE;
+        break;
+    }
+  }
+
+  GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d",
+      this->server_sock_fd, TCP_BACKLOG);
+  if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+        ("Could not listen on server socket: %s", g_strerror (errno)));
+    return FALSE;
+  }
+
+  /* FIXME: maybe we should think about moving actual client accepting
+     somewhere else */
+  GST_DEBUG_OBJECT (this, "waiting for client");
+  this->client_sock_fd =
+      accept (this->server_sock_fd, (struct sockaddr *) &this->client_sin,
+      &this->client_sin_len);
+  if (this->client_sock_fd == -1) {
+    GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
+        ("Could not accept client on server socket: %s", g_strerror (errno)));
+    return FALSE;
+  }
+  GST_DEBUG_OBJECT (this, "received client");
+
+  GST_FLAG_SET (this, GST_TCPSERVERSRC_OPEN);
+  return TRUE;
+}
+
+static void
+gst_tcpserversrc_close (GstTCPServerSrc * this)
+{
+  if (this->server_sock_fd != -1) {
+    close (this->server_sock_fd);
+    this->server_sock_fd = -1;
+  }
+  if (this->client_sock_fd != -1) {
+    close (this->client_sock_fd);
+    this->client_sock_fd = -1;
+  }
+  GST_FLAG_UNSET (this, GST_TCPSERVERSRC_OPEN);
+}
+
+static GstElementStateReturn
+gst_tcpserversrc_change_state (GstElement * element)
+{
+  g_return_val_if_fail (GST_IS_TCPSERVERSRC (element), GST_STATE_FAILURE);
+
+  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
+    if (GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN))
+      gst_tcpserversrc_close (GST_TCPSERVERSRC (element));
+  } else {
+    if (!GST_FLAG_IS_SET (element, GST_TCPSERVERSRC_OPEN)) {
+      if (!gst_tcpserversrc_init_receive (GST_TCPSERVERSRC (element)))
+        return GST_STATE_FAILURE;
+    }
+  }
+
+  if (GST_ELEMENT_CLASS (parent_class)->change_state)
+    return GST_ELEMENT_CLASS (parent_class)->change_state (element);
+
+  return GST_STATE_SUCCESS;
+}
diff --git a/gst/tcp/gsttcpserversrc.h b/gst/tcp/gsttcpserversrc.h
new file mode 100644 (file)
index 0000000..fbc9e36
--- /dev/null
@@ -0,0 +1,98 @@
+/* GStreamer
+ * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#ifndef __GST_TCPSERVERSRC_H__
+#define __GST_TCPSERVERSRC_H__
+
+#include <gst/gst.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif /* __cplusplus */
+
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include "gsttcp.h"
+
+#include <fcntl.h>
+
+#define GST_TYPE_TCPSERVERSRC \
+  (gst_tcpserversrc_get_type())
+#define GST_TCPSERVERSRC(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCPSERVERSRC,GstTCPServerSrc))
+#define GST_TCPSERVERSRC_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCPSERVERSRC,GstTCPServerSrc))
+#define GST_IS_TCPSERVERSRC(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCPSERVERSRC))
+#define GST_IS_TCPSERVERSRC_CLASS(obj) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCPSERVERSRC))
+
+typedef struct _GstTCPServerSrc GstTCPServerSrc;
+typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass;
+
+typedef enum {
+  GST_TCPSERVERSRC_OPEN       = GST_ELEMENT_FLAG_LAST,
+
+  GST_TCPSERVERSRC_FLAG_LAST,
+} GstTCPServerSrcFlags;
+
+struct _GstTCPServerSrc {
+  GstElement element;
+
+  /* pad */
+  GstPad *srcpad;
+
+  /* server information */
+  int server_port;
+  gchar *host;
+  struct sockaddr_in server_sin;
+  int server_sock_fd;
+
+  /* client information */
+  struct sockaddr_in client_sin;
+  socklen_t client_sin_len;
+  int client_sock_fd;
+
+  /* number of bytes we've gotten */
+  off_t curoffset;
+
+  GstTCPProtocolType protocol; /* protocol used for reading data */
+  gboolean caps_received;      /* if we have received caps yet */
+  GstClock *clock;
+};
+
+struct _GstTCPServerSrcClass {
+  GstElementClass parent_class;
+};
+
+GType gst_tcpserversrc_get_type (void);
+
+
+#ifdef __cplusplus
+}
+#endif /* __cplusplus */
+
+#endif /* __GST_TCPSERVERSRC_H__ */