net: port to use gio's networking API
authorTim-Philipp Müller <tim.muller@collabora.co.uk>
Wed, 5 May 2010 15:33:51 +0000 (16:33 +0100)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Wed, 18 Jan 2012 01:22:43 +0000 (01:22 +0000)
Some warts still, but it's a start.

configure.ac
libs/gst/net/Makefile.am
libs/gst/net/gstnetclientclock.c
libs/gst/net/gstnetclientclock.h
libs/gst/net/gstnettimepacket.c
libs/gst/net/gstnettimepacket.h
libs/gst/net/gstnettimeprovider.c
libs/gst/net/gstnettimeprovider.h
tests/check/Makefile.am
tests/check/libs/gstnettimeprovider.c

index 2983e96..ce97f0e 100644 (file)
@@ -534,21 +534,7 @@ AG_GST_GLIB_CHECK([$GLIB_REQ])
 dnl Check for glib2 without extra fat, useful for the unversioned tool frontends
 PKG_CHECK_MODULES(GLIB_ONLY, glib-2.0 >= $GLIB_REQ)
 
-dnl Check for GIO
-translit(dnm, m, l) AM_CONDITIONAL(USE_GIO, true)
-AG_GST_CHECK_FEATURE(GIO, [GIO library], gio, [
-  PKG_CHECK_MODULES(GIO, gio-2.0 >= 2.31.10)
-  GIO_MODULE_DIR="`$PKG_CONFIG --variable=giomoduledir gio-2.0`"
-  AC_DEFINE_UNQUOTED(GIO_MODULE_DIR, "$GIO_MODULE_DIR",
-      [The GIO modules directory.])
-  GIO_LIBDIR="`$PKG_CONFIG --variable=libdir gio-2.0`"
-      AC_DEFINE_UNQUOTED(GIO_LIBDIR, "$GIO_LIBDIR",
-          [The GIO library directory.])
-
-  AC_SUBST(GIO_CFLAGS)
-  AC_SUBST(GIO_LIBS)
-  AC_SUBST(GIO_LDFLAGS)
-])
+PKG_CHECK_MODULES(GIO, gio-2.0 >= $GLIB_REQ)
 
 dnl Check for documentation xrefs
 GLIB_PREFIX="`$PKG_CONFIG --variable=prefix glib-2.0`"
index 814cb32..de63c40 100644 (file)
@@ -15,7 +15,7 @@ libgstnet_@GST_MAJORMINOR@_la_SOURCES = \
     gstnettimeprovider.c
 
 libgstnet_@GST_MAJORMINOR@_la_CFLAGS = $(GST_OBJ_CFLAGS) $(GIO_CFLAGS)
-libgstnet_@GST_MAJORMINOR@_la_LIBADD = $(GST_OBJ_LIBS) $(INET_ATON_LIBS) $(WIN32_LIBS) $(GIO_LIBS)
+libgstnet_@GST_MAJORMINOR@_la_LIBADD = $(GST_OBJ_LIBS) $(GIO_LIBS) $(INET_ATON_LIBS) $(WIN32_LIBS)
 libgstnet_@GST_MAJORMINOR@_la_LDFLAGS = $(GST_LIB_LDFLAGS) $(GST_ALL_LDFLAGS) $(GST_LT_LDFLAGS)
 
 CLEANFILES = *.gcno *.gcda *.gcov
@@ -63,6 +63,7 @@ GstNet-@GST_MAJORMINOR@.gir: $(INTROSPECTION_SCANNER) libgstnet-@GST_MAJORMINOR@
                --library=$(top_builddir)/gst/libgstreamer-@GST_MAJORMINOR@.la \
                --library=libgstnet-@GST_MAJORMINOR@.la \
                --include=Gst-@GST_MAJORMINOR@ \
+               --include=Gio-2.0 \
                --libtool="$(top_builddir)/libtool" \
                --pkg gstreamer-@GST_MAJORMINOR@ \
                --pkg gio-2.0 \
index 8326604..1121051 100644 (file)
@@ -2,6 +2,7 @@
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
  *                    2005 Wim Taymans <wim@fluendo.com>
  *                    2005 Andy Wingo <wingo@pobox.com>
+ * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
  *
  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
  * the network
 #include "gstnettimepacket.h"
 #include "gstnetclientclock.h"
 
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-#if defined (_MSC_VER) && _MSC_VER >= 1400
-#include <io.h>
-#endif
+#include <gio/gio.h>
 
 GST_DEBUG_CATEGORY_STATIC (ncc_debug);
 #define GST_CAT_DEFAULT (ncc_debug)
@@ -68,15 +63,11 @@ GST_DEBUG_CATEGORY_STATIC (ncc_debug);
 #define DEFAULT_PORT            5637
 #define DEFAULT_TIMEOUT         GST_SECOND
 
-#ifdef G_OS_WIN32
-#define getsockname(sock,addr,len) getsockname(sock,addr,(int *)len)
-#endif
-
 enum
 {
   PROP_0,
   PROP_ADDRESS,
-  PROP_PORT,
+  PROP_PORT
 };
 
 #define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj)  \
@@ -84,8 +75,16 @@ enum
 
 struct _GstNetClientClockPrivate
 {
-  GstPollFD sock;
-  GstPoll *fdset;
+  GThread *thread;
+
+  GSocket *socket;
+  GSocketAddress *servaddr;
+  GCancellable *cancel;
+
+  GstClockTime timeout_expiration;
+
+  gchar *address;
+  gint port;
 };
 
 #define _do_init \
@@ -102,21 +101,6 @@ static void gst_net_client_clock_get_property (GObject * object, guint prop_id,
 
 static void gst_net_client_clock_stop (GstNetClientClock * self);
 
-#ifdef G_OS_WIN32
-static int
-inet_aton (const char *c, struct in_addr *paddr)
-{
-  /* note that inet_addr is deprecated on unix because
-   * inet_addr returns -1 (INADDR_NONE) for the valid 255.255.255.255
-   * address. */
-  paddr->s_addr = inet_addr (c);
-  if (paddr->s_addr == INADDR_NONE)
-    return 0;
-
-  return 1;
-}
-#endif
-
 static void
 gst_net_client_clock_class_init (GstNetClientClockClass * klass)
 {
@@ -132,9 +116,8 @@ gst_net_client_clock_class_init (GstNetClientClockClass * klass)
 
   g_object_class_install_property (gobject_class, PROP_ADDRESS,
       g_param_spec_string ("address", "address",
-          "The address of the machine providing a time server, "
-          "as a dotted quad (x.x.x.x)", DEFAULT_ADDRESS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "The IP address of the machine providing a time server",
+          DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_PORT,
       g_param_spec_int ("port", "port",
           "The port on which the remote server is listening", 0, G_MAXUINT16,
@@ -146,28 +129,16 @@ gst_net_client_clock_init (GstNetClientClock * self)
 {
   GstClock *clock = GST_CLOCK_CAST (self);
 
-#ifdef G_OS_WIN32
-  WSADATA w;
-  int error = WSAStartup (0x0202, &w);
-
-  if (error) {
-    GST_DEBUG_OBJECT (self, "Error on WSAStartup");
-  }
-  if (w.wVersion != 0x0202) {
-    WSACleanup ();
-  }
-#endif
   self->priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self);
 
-  self->port = DEFAULT_PORT;
-  self->address = g_strdup (DEFAULT_ADDRESS);
+  self->priv->port = DEFAULT_PORT;
+  self->priv->address = g_strdup (DEFAULT_ADDRESS);
 
   clock->timeout = DEFAULT_TIMEOUT;
 
-  self->priv->sock.fd = -1;
-  self->thread = NULL;
+  self->priv->thread = NULL;
 
-  self->servaddr = NULL;
+  self->priv->servaddr = NULL;
 }
 
 static void
@@ -175,25 +146,23 @@ gst_net_client_clock_finalize (GObject * object)
 {
   GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object);
 
-  if (self->thread) {
+  if (self->priv->thread) {
     gst_net_client_clock_stop (self);
-    g_assert (self->thread == NULL);
-  }
-
-  if (self->priv->fdset) {
-    gst_poll_free (self->priv->fdset);
-    self->priv->fdset = NULL;
   }
 
-  g_free (self->address);
-  self->address = NULL;
+  g_free (self->priv->address);
+  self->priv->address = NULL;
 
-  g_free (self->servaddr);
-  self->servaddr = NULL;
+  if (self->priv->servaddr != NULL) {
+    g_object_unref (self->priv->servaddr);
+    self->priv->servaddr = NULL;
+  }
 
-#ifdef G_OS_WIN32
-  WSACleanup ();
-#endif
+  if (self->priv->socket != NULL) {
+    g_socket_close (self->priv->socket, NULL);
+    g_object_unref (self->priv->socket);
+    self->priv->socket = NULL;
+  }
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -206,14 +175,13 @@ gst_net_client_clock_set_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_ADDRESS:
-      g_free (self->address);
-      if (g_value_get_string (value) == NULL)
-        self->address = g_strdup (DEFAULT_ADDRESS);
-      else
-        self->address = g_strdup (g_value_get_string (value));
+      g_free (self->priv->address);
+      self->priv->address = g_value_dup_string (value);
+      if (self->priv->address == NULL)
+        self->priv->address = g_strdup (DEFAULT_ADDRESS);
       break;
     case PROP_PORT:
-      self->port = g_value_get_int (value);
+      self->priv->port = g_value_get_int (value);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -229,10 +197,10 @@ gst_net_client_clock_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_ADDRESS:
-      g_value_set_string (value, self->address);
+      g_value_set_string (value, self->priv->address);
       break;
     case PROP_PORT:
-      g_value_set_int (value, self->port);
+      g_value_set_int (value, self->priv->port);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -244,6 +212,7 @@ static void
 gst_net_client_clock_observe_times (GstNetClientClock * self,
     GstClockTime local_1, GstClockTime remote, GstClockTime local_2)
 {
+  GstClockTime current_timeout;
   GstClockTime local_avg;
   gdouble r_squared;
   GstClock *clock;
@@ -259,13 +228,14 @@ gst_net_client_clock_observe_times (GstNetClientClock * self,
 
   GST_CLOCK_SLAVE_LOCK (self);
   if (clock->filling) {
-    self->current_timeout = 0;
+    current_timeout = 0;
   } else {
     /* geto formula */
-    self->current_timeout =
-        (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
-    self->current_timeout = MIN (self->current_timeout, clock->timeout);
+    current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND;
+    current_timeout = MIN (current_timeout, clock->timeout);
   }
+  GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout));
+  self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout;
   GST_CLOCK_SLAVE_UNLOCK (clock);
 
   return;
@@ -279,67 +249,111 @@ bogus_observation:
   }
 }
 
-static gint
-gst_net_client_clock_do_select (GstNetClientClock * self)
+typedef struct
 {
-  while (TRUE) {
-    GstClockTime diff;
-    gint ret;
+  GSource source;
+  GstNetClientClock *clock;
+  gboolean *p_timeout;
+} GstNetClientClockTimeoutSource;
 
-    GST_LOG_OBJECT (self, "doing select");
+static gboolean
+gst_net_client_clock_timeout_source_prepare (GSource * s, gint * p_timeout)
+{
+  GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
+  GstClockTime expiration_time = source->clock->priv->timeout_expiration;
+  GstClockTime now = gst_util_get_timestamp ();
 
-    diff = gst_clock_get_internal_time (GST_CLOCK (self));
-    ret = gst_poll_wait (self->priv->fdset, self->current_timeout);
-    diff = gst_clock_get_internal_time (GST_CLOCK (self)) - diff;
+  if (now >= expiration_time || (expiration_time - now) <= GST_MSECOND) {
+    *p_timeout = 0;
+    return TRUE;
+  }
 
-    if (diff > self->current_timeout)
-      self->current_timeout = 0;
-    else
-      self->current_timeout -= diff;
+  *p_timeout = (expiration_time - now) / GST_MSECOND;
+  GST_TRACE_OBJECT (source->clock, "time out in %d ms please", *p_timeout);
+  return FALSE;
+}
 
-    GST_LOG_OBJECT (self, "select returned %d", ret);
+static gboolean
+gst_net_client_clock_timeout_source_check (GSource * s)
+{
+  GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
 
-    if (ret < 0 && errno != EBUSY) {
-      if (errno != EAGAIN && errno != EINTR)
-        goto select_error;
-      else
-        continue;
-    } else {
-      return ret;
-    }
+  return (gst_util_get_timestamp () >= source->clock->priv->timeout_expiration);
+}
 
-    g_assert_not_reached ();
+static gboolean
+gst_net_client_clock_timeout_source_dispatch (GSource * s, GSourceFunc cb,
+    gpointer data)
+{
+  GstNetClientClockTimeoutSource *source = (GstNetClientClockTimeoutSource *) s;
 
-    /* log errors and keep going */
-  select_error:
-    {
-      GST_WARNING_OBJECT (self, "select error %d: %s (%d)", ret,
-          g_strerror (errno), errno);
-      continue;
-    }
-  }
+  GST_TRACE_OBJECT (source->clock, "timed out");
+  *source->p_timeout = TRUE;
+  return TRUE;
+}
 
-  g_assert_not_reached ();
-  return -1;
+static gboolean
+gst_net_client_clock_socket_cb (GSocket * socket, GIOCondition condition,
+    gpointer user_data)
+{
+  GIOCondition *p_cond = user_data;
+
+  GST_TRACE ("socket %p I/O condition: 0x%02x", socket, condition);
+  *p_cond = condition;
+  return TRUE;
 }
 
 static gpointer
 gst_net_client_clock_thread (gpointer data)
 {
   GstNetClientClock *self = data;
-  struct sockaddr_in tmpaddr;
-  socklen_t len;
   GstNetTimePacket *packet;
-  gint ret;
+  GMainContext *ctx;
+  GSourceFuncs funcs = { NULL, };
+  GSource *source;
+  GIOCondition cond;
+  gboolean timeout;
+  GSocket *socket = self->priv->socket;
+  GError *err = NULL;
   GstClock *clock = data;
 
-  while (TRUE) {
-    ret = gst_net_client_clock_do_select (self);
+  GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket);
+
+  g_socket_set_blocking (socket, TRUE);
+  g_socket_set_timeout (socket, 0);
+
+  ctx = g_main_context_new ();
+
+  source = g_socket_create_source (socket, G_IO_IN, self->priv->cancel);
+  g_source_set_name (source, "GStreamer net client clock thread socket");
+  g_source_set_callback (source, (GSourceFunc) gst_net_client_clock_socket_cb,
+      &cond, NULL);
+  g_source_attach (source, ctx);
+  g_source_unref (source);
+
+  /* GSocket only support second granularity for timeouts, so roll our own
+   * timeout source (so we don't have to create a new source whenever the
+   * timeout changes, as we would have to do with the default timeout source) */
+  funcs.prepare = gst_net_client_clock_timeout_source_prepare;
+  funcs.check = gst_net_client_clock_timeout_source_check;
+  funcs.dispatch = gst_net_client_clock_timeout_source_dispatch;
+  funcs.finalize = NULL;
+  source = g_source_new (&funcs, sizeof (GstNetClientClockTimeoutSource));
+  ((GstNetClientClockTimeoutSource *) source)->clock = self;
+  ((GstNetClientClockTimeoutSource *) source)->p_timeout = &timeout;
+  g_source_set_name (source, "GStreamer net client clock timeout");
+  g_source_attach (source, ctx);
+  g_source_unref (source);
+
+  while (!g_cancellable_is_cancelled (self->priv->cancel)) {
+    cond = 0;
+    timeout = FALSE;
+    g_main_context_iteration (ctx, TRUE);
+
+    if (g_cancellable_is_cancelled (self->priv->cancel))
+      break;
 
-    if (ret < 0 && errno == EBUSY) {
-      GST_LOG_OBJECT (self, "stop");
-      goto stopped;
-    } else if (ret == 0) {
+    if (timeout) {
       /* timed out, let's send another packet */
       GST_DEBUG_OBJECT (self, "timed out");
 
@@ -349,24 +363,32 @@ gst_net_client_clock_thread (gpointer data)
 
       GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT,
           GST_TIME_ARGS (packet->local_time));
-      gst_net_time_packet_send (packet, self->priv->sock.fd,
-          (struct sockaddr *) self->servaddr, sizeof (struct sockaddr_in));
+
+      gst_net_time_packet_send (packet, self->priv->socket,
+          self->priv->servaddr, NULL);
 
       g_free (packet);
 
-      /* reset timeout */
-      self->current_timeout = clock->timeout;
+      /* reset timeout (but are expecting a response sooner anyway) */
+      self->priv->timeout_expiration =
+          gst_util_get_timestamp () + clock->timeout;
       continue;
-    } else if (gst_poll_fd_can_read (self->priv->fdset, &self->priv->sock)) {
-      /* got data in */
-      GstClockTime new_local = gst_clock_get_internal_time (GST_CLOCK (self));
+    }
+
+    /* got data to read? */
+    if ((cond & G_IO_IN)) {
+      GstClockTime new_local;
+
+      new_local = gst_clock_get_internal_time (GST_CLOCK (self));
 
-      len = sizeof (struct sockaddr);
-      packet = gst_net_time_packet_receive (self->priv->sock.fd,
-          (struct sockaddr *) &tmpaddr, &len);
+      packet = gst_net_time_packet_receive (socket, NULL, &err);
 
-      if (!packet)
-        goto receive_error;
+      if (err != NULL) {
+        GST_WARNING_OBJECT (self, "receive error: %s", err->message);
+        g_error_free (err);
+        err = NULL;
+        continue;
+      }
 
       GST_LOG_OBJECT (self, "got packet back");
       GST_LOG_OBJECT (self, "local_1 = %" GST_TIME_FORMAT,
@@ -382,79 +404,72 @@ gst_net_client_clock_thread (gpointer data)
 
       g_free (packet);
       continue;
-    } else {
-      GST_WARNING_OBJECT (self, "unhandled select return state?");
-      continue;
     }
 
-    g_assert_not_reached ();
-
-  stopped:
-    {
-      GST_DEBUG_OBJECT (self, "shutting down");
-      /* socket gets closed in _stop() */
-      return NULL;
-    }
-  receive_error:
-    {
-      GST_WARNING_OBJECT (self, "receive error");
+    if ((cond & (G_IO_ERR | G_IO_HUP))) {
+      GST_DEBUG_OBJECT (self, "socket error?! %s", g_strerror (errno));
+      g_usleep (G_USEC_PER_SEC / 10);
       continue;
     }
-
-    g_assert_not_reached ();
-
   }
 
-  g_assert_not_reached ();
-
+  GST_INFO_OBJECT (self, "shutting down net client clock thread");
+  g_main_context_unref (ctx);
   return NULL;
 }
 
 static gboolean
 gst_net_client_clock_start (GstNetClientClock * self)
 {
-  struct sockaddr_in servaddr, myaddr;
-  socklen_t len;
-  gint ret;
+  GSocketAddress *servaddr;
+  GSocketAddress *myaddr;
+  GInetAddress *inetaddr;
+  GSocket *socket;
   GError *error = NULL;
 
-  g_return_val_if_fail (self->address != NULL, FALSE);
-  g_return_val_if_fail (self->servaddr == NULL, FALSE);
+  g_return_val_if_fail (self->priv->address != NULL, FALSE);
+  g_return_val_if_fail (self->priv->servaddr == NULL, FALSE);
 
-  if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
+  socket = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
+      G_SOCKET_PROTOCOL_UDP, &error);
+
+  if (socket == NULL)
     goto no_socket;
 
-  self->priv->sock.fd = ret;
+  /* check address we're bound to, mostly for debugging purposes */
+  myaddr = g_socket_get_local_address (socket, &error);
 
-  len = sizeof (myaddr);
-  ret = getsockname (self->priv->sock.fd, (struct sockaddr *) &myaddr, &len);
-  if (ret < 0)
+  if (myaddr == NULL)
     goto getsockname_error;
 
-  memset (&servaddr, 0, sizeof (servaddr));
-  servaddr.sin_family = AF_INET;        /* host byte order */
-  servaddr.sin_port = htons (self->port);       /* short, network byte order */
-
   GST_DEBUG_OBJECT (self, "socket opened on UDP port %hd",
-      ntohs (servaddr.sin_port));
+      g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (myaddr)));
 
-  if (!inet_aton (self->address, &servaddr.sin_addr))
+  g_object_unref (myaddr);
+
+  /* create target address */
+  inetaddr = g_inet_address_new_from_string (self->priv->address);
+
+  if (inetaddr == NULL)
     goto bad_address;
 
-  self->servaddr = g_malloc (sizeof (struct sockaddr_in));
-  memcpy (self->servaddr, &servaddr, sizeof (servaddr));
+  servaddr = g_inet_socket_address_new (inetaddr, self->priv->port);
+  g_object_unref (inetaddr);
+
+  g_assert (servaddr != NULL);
 
-  GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address,
-      self->port);
+  GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->priv->address,
+      self->priv->port);
 
-  gst_poll_add_fd (self->priv->fdset, &self->priv->sock);
-  gst_poll_fd_ctl_read (self->priv->fdset, &self->priv->sock, TRUE);
+  self->priv->cancel = g_cancellable_new ();
+  self->priv->socket = socket;
+  self->priv->servaddr = G_SOCKET_ADDRESS (servaddr);
 
 #if !GLIB_CHECK_VERSION (2, 31, 0)
-  self->thread = g_thread_create (gst_net_client_clock_thread, self, TRUE,
+  self->priv->thread = g_thread_create (gst_net_client_clock_thread, self, TRUE,
       &error);
 #else
-  self->thread = g_thread_try_new ("GstNetClientClock",
+  self->priv->thread = g_thread_try_new ("GstNetClientClock",
       gst_net_client_clock_thread, self, &error);
 #endif
 
@@ -466,34 +481,31 @@ gst_net_client_clock_start (GstNetClientClock * self)
   /* ERRORS */
 no_socket:
   {
-    GST_ERROR_OBJECT (self, "socket failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
+    GST_ERROR_OBJECT (self, "socket_new() failed: %s", error->message);
+    g_error_free (error);
     return FALSE;
   }
 getsockname_error:
   {
-    GST_ERROR_OBJECT (self, "getsockname failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
+    GST_ERROR_OBJECT (self, "get_local_address() failed: %s", error->message);
+    g_error_free (error);
+    g_object_unref (socket);
     return FALSE;
   }
 bad_address:
   {
-    GST_ERROR_OBJECT (self, "inet_aton failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
+    GST_ERROR_OBJECT (self, "inet_address_new_from_string('%s') failed",
+        self->priv->address);
+    g_object_unref (socket);
     return FALSE;
   }
 no_thread:
   {
     GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
-    gst_poll_remove_fd (self->priv->fdset, &self->priv->sock);
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-    g_free (self->servaddr);
-    self->servaddr = NULL;
+    g_object_unref (self->priv->servaddr);
+    self->priv->servaddr = NULL;
+    g_object_unref (self->priv->socket);
+    self->priv->socket = NULL;
     g_error_free (error);
     return FALSE;
   }
@@ -502,15 +514,25 @@ no_thread:
 static void
 gst_net_client_clock_stop (GstNetClientClock * self)
 {
-  gst_poll_set_flushing (self->priv->fdset, TRUE);
-  g_thread_join (self->thread);
-  self->thread = NULL;
-
-  if (self->priv->sock.fd != -1) {
-    gst_poll_remove_fd (self->priv->fdset, &self->priv->sock);
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-  }
+  if (self->priv->thread == NULL)
+    return;
+
+  GST_INFO_OBJECT (self, "stopping...");
+  g_cancellable_cancel (self->priv->cancel);
+
+  g_thread_join (self->priv->thread);
+  self->priv->thread = NULL;
+
+  g_object_unref (self->priv->cancel);
+  self->priv->cancel = NULL;
+
+  g_object_unref (self->priv->servaddr);
+  self->priv->servaddr = NULL;
+
+  g_object_unref (self->priv->socket);
+  self->priv->socket = NULL;
+
+  GST_INFO_OBJECT (self, "stopped");
 }
 
 /**
@@ -531,6 +553,7 @@ GstClock *
 gst_net_client_clock_new (gchar * name, const gchar * remote_address,
     gint remote_port, GstClockTime base_time)
 {
+  /* FIXME: gst_net_client_clock_new() should be a thin wrapper for g_object_new() */
   GstNetClientClock *ret;
   GstClockTime internal;
 
@@ -562,22 +585,12 @@ gst_net_client_clock_new (gchar * name, const gchar * remote_address,
     }
   }
 
-  if ((ret->priv->fdset = gst_poll_new (TRUE)) == NULL)
-    goto no_fdset;
-
   if (!gst_net_client_clock_start (ret))
     goto failed_start;
 
   /* all systems go, cap'n */
   return (GstClock *) ret;
 
-no_fdset:
-  {
-    GST_ERROR_OBJECT (ret, "could not create an fdset: %s (%d)",
-        g_strerror (errno), errno);
-    gst_object_unref (ret);
-    return NULL;
-  }
 failed_start:
   {
     /* already printed a nice error */
index d0a35e8..a0fcf1f 100644 (file)
@@ -2,6 +2,7 @@
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
  *                    2005 Wim Taymans <wim@fluendo.com>
  *                    2005 Andy Wingo <wingo@pobox.com>
+ * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
  *
  * gstnetclientclock.h: clock that synchronizes itself to a time provider over
  * the network
 
 G_BEGIN_DECLS
 
-#include <errno.h>
-#include <string.h>
-#include <sys/types.h>
-
-#ifdef G_OS_WIN32
-# include <winsock2.h>
-#else
-# include <netdb.h>
-# include <sys/socket.h>
-# include <netinet/in.h>
-# include <arpa/inet.h>
-#endif /*G_OS_WIN32 */
-
-#include <fcntl.h>
-
 #define GST_TYPE_NET_CLIENT_CLOCK \
   (gst_net_client_clock_get_type())
 #define GST_NET_CLIENT_CLOCK(obj) \
@@ -69,20 +55,6 @@ typedef struct _GstNetClientClockPrivate GstNetClientClockPrivate;
 struct _GstNetClientClock {
   GstSystemClock clock;
 
-  /*< protected >*/
-  gchar *address;
-  gint port;
-
-  /*< private >*/
-  int sock;
-  int control_sock[2];
-
-  GstClockTime current_timeout;
-
-  struct sockaddr_in *servaddr;
-
-  GThread *thread;
-
   /*< private >*/
   GstNetClientClockPrivate *priv;
 
@@ -97,6 +69,7 @@ struct _GstNetClientClockClass {
 };
 
 GType           gst_net_client_clock_get_type  (void);
+
 GstClock*      gst_net_client_clock_new        (gchar *name, const gchar *remote_address,
                                                  gint remote_port, GstClockTime base_time);
 
index bf8a027..597199e 100644 (file)
@@ -1,5 +1,7 @@
 /* GStreamer
  * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
+ * Copyright (C) 2010 Tim-Philipp Müller <tim centricular net>
+ * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
@@ -107,112 +109,106 @@ gst_net_time_packet_serialize (const GstNetTimePacket * packet)
 
 /**
  * gst_net_time_packet_receive:
- * @fd: a file descriptor created by socket(2)
- * @addr: a pointer to a sockaddr to hold the address of the sender
- * @len: a pointer to the size of the data pointed to by @addr
+ * @socket: socket to receive the time packet on
+ * @src_addr: (out): address of variable to return sender address
+ * @err: return address for a #GError, or NULL
  *
- * Receives a #GstNetTimePacket over a socket. Handles interrupted system calls,
- * but otherwise returns NULL on error. See recvfrom(2) for more information on
- * how to interpret @sockaddr.
+ * Receives a #GstNetTimePacket over a socket. Handles interrupted system
+ * calls, but otherwise returns NULL on error.
  *
- * MT safe. Caller owns return value (g_free to free).
- *
- * Returns: The new #GstNetTimePacket.
+ * Returns: (transfer full): a new #GstNetTimePacket, or NULL on error. Free
+ *    with g_free() when done.
  */
 GstNetTimePacket *
-gst_net_time_packet_receive (gint fd, struct sockaddr * addr, socklen_t * len)
+gst_net_time_packet_receive (GSocket * socket,
+    GSocketAddress ** src_address, GError ** error)
 {
-  guint8 buffer[GST_NET_TIME_PACKET_SIZE];
-  gint ret;
+  gchar buffer[GST_NET_TIME_PACKET_SIZE];
+  GError *err = NULL;
+  gssize ret;
+
+  g_return_val_if_fail (G_IS_SOCKET (socket), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
 
   while (TRUE) {
-#ifdef G_OS_WIN32
-    ret = recvfrom (fd, (char *) buffer, GST_NET_TIME_PACKET_SIZE,
-#else
-    ret = recvfrom (fd, buffer, GST_NET_TIME_PACKET_SIZE,
-#endif
-        0, (struct sockaddr *) addr, len);
+    ret = g_socket_receive_from (socket, src_address, buffer,
+        GST_NET_TIME_PACKET_SIZE, NULL, &err);
+
     if (ret < 0) {
-      if (errno != EAGAIN && errno != EINTR)
-        goto receive_error;
-      else
+      if (err->code == G_IO_ERROR_WOULD_BLOCK) {
+        g_error_free (err);
+        err = NULL;
         continue;
+      } else {
+        goto receive_error;
+      }
     } else if (ret < GST_NET_TIME_PACKET_SIZE) {
       goto short_packet;
     } else {
-      return gst_net_time_packet_new (buffer);
+      return gst_net_time_packet_new ((const guint8 *) buffer);
     }
   }
 
 receive_error:
   {
-    GST_DEBUG ("receive error %d: %s (%d)", ret, g_strerror (errno), errno);
+    GST_DEBUG ("receive error: %s", err->message);
+    g_propagate_error (error, err);
     return NULL;
   }
 short_packet:
   {
     GST_DEBUG ("someone sent us a short packet (%d < %d)",
         ret, GST_NET_TIME_PACKET_SIZE);
+    g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_DATA,
+        "short time packet (%d < %d)", (int) ret, GST_NET_TIME_PACKET_SIZE);
     return NULL;
   }
 }
 
 /**
  * gst_net_time_packet_send:
- * @packet: the #GstNetTimePacket
- * @fd: a file descriptor created by socket(2)
- * @addr: a pointer to a sockaddr to hold the address of the sender
- * @len: the size of the data pointed to by @addr
+ * @packet: the #GstNetTimePacket to send
+ * @socket: socket to send the time packet on
+ * @dest_addr: address to send the time packet to
+ * @err: return address for a #GError, or NULL
  *
- * Sends a #GstNetTimePacket over a socket. Essentially a thin wrapper around
- * sendto(2) and gst_net_time_packet_serialize(). 
+ * Sends a #GstNetTimePacket over a socket.
  *
  * MT safe.
  *
- * Returns: The return value of sendto(2).
+ * Returns: TRUE if successful, FALSE in case an error occured.
  */
-gint
-gst_net_time_packet_send (const GstNetTimePacket * packet, gint fd,
-    struct sockaddr * addr, socklen_t len)
+gboolean
+gst_net_time_packet_send (const GstNetTimePacket * packet,
+    GSocket * socket, GSocketAddress * dest_address, GError ** error)
 {
-#if defined __CYGWIN__
-  gint fdflags;
-#elif defined G_OS_WIN32
-  gulong flags;
-#endif
-
+  gboolean was_blocking;
   guint8 *buffer;
-  gint ret, send_flags;
+  gssize res;
 
-  g_return_val_if_fail (packet != NULL, -EINVAL);
+  g_return_val_if_fail (packet != NULL, FALSE);
+  g_return_val_if_fail (G_IS_SOCKET (socket), FALSE);
+  g_return_val_if_fail (G_IS_SOCKET_ADDRESS (dest_address), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
 
-#ifdef __CYGWIN__
-  send_flags = 0;
-  fdflags = fcntl (fd, F_GETFL);
-  fcntl (fd, F_SETFL, fdflags | O_NONBLOCK);
-#elif defined G_OS_WIN32
-  flags = 1;
-  send_flags = 0;
-#else
-  send_flags = MSG_DONTWAIT;
-#endif
+  was_blocking = g_socket_get_blocking (socket);
 
+  if (was_blocking)
+    g_socket_set_blocking (socket, FALSE);
+
+  /* FIXME: avoid pointless alloc/free, serialise into stack-allocated buffer */
   buffer = gst_net_time_packet_serialize (packet);
 
-#ifdef G_OS_WIN32
-  ioctlsocket (fd, FIONBIO, &flags);    /* Set nonblocking mode */
-  ret =
-      sendto (fd, (char *) buffer, GST_NET_TIME_PACKET_SIZE, send_flags, addr,
-      len);
-#else
-  ret = sendto (fd, buffer, GST_NET_TIME_PACKET_SIZE, send_flags, addr, len);
-#endif
+  res = g_socket_send_to (socket, dest_address, (const gchar *) buffer,
+      GST_NET_TIME_PACKET_SIZE, NULL, error);
 
-#ifdef __CYGWIN__
-  fcntl (fd, F_SETFL, fdflags);
-#endif
+  /* datagram packets should be sent as a whole or not at all */
+  g_assert (res < 0 || res == GST_NET_TIME_PACKET_SIZE);
 
   g_free (buffer);
 
-  return ret;
+  if (was_blocking)
+    g_socket_set_blocking (socket, TRUE);
+
+  return (res == GST_NET_TIME_PACKET_SIZE);
 }
index 90d6136..e37342c 100644 (file)
 #define __GST_NET_TIME_PACKET_H__
 
 #include <gst/gst.h>
+#include <gio/gio.h>
 
 G_BEGIN_DECLS
 
-#include <errno.h>
-#include <string.h>
-#include <sys/types.h>
-
-#ifdef G_OS_WIN32
-#include <winsock2.h>
-#include <ws2tcpip.h>
-#ifndef socklen_t
-#define socklen_t int
-#endif
-#else
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#endif
-
 /**
  * GST_NET_TIME_PACKET_SIZE:
  *
@@ -63,14 +47,18 @@ struct _GstNetTimePacket {
   GstClockTime remote_time;
 };
 
+/* FIXME 0.11: get rid of the packet stuff? add an unref/free function? */
 GstNetTimePacket*       gst_net_time_packet_new         (const guint8 *buffer);
 guint8*                 gst_net_time_packet_serialize   (const GstNetTimePacket *packet);
 
-GstNetTimePacket*       gst_net_time_packet_receive     (gint fd, struct sockaddr *addr,
-                                                         socklen_t *len);
-gint                    gst_net_time_packet_send        (const GstNetTimePacket *packet,
-                                                         gint fd, struct sockaddr *addr,
-                                                         socklen_t len);
+GstNetTimePacket*      gst_net_time_packet_receive     (GSocket         * socket,
+                                                         GSocketAddress ** src_address,
+                                                         GError         ** error);
+
+gboolean                gst_net_time_packet_send        (const GstNetTimePacket * packet,
+                                                         GSocket                * socket,
+                                                         GSocketAddress         * dest_address,
+                                                         GError                ** error);
 
 G_END_DECLS
 
index 8acca09..3683b24 100644 (file)
 #include "gstnettimeprovider.h"
 #include "gstnettimepacket.h"
 
-#include <glib.h>
-
-#ifdef HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-#if defined (_MSC_VER) && _MSC_VER >= 1400
-#include <io.h>
-#endif
-
-#ifndef G_OS_WIN32
-#include <sys/ioctl.h>
-#endif
-
-#ifdef HAVE_FIONREAD_IN_SYS_FILIO
-#include <sys/filio.h>
-#endif
-
 GST_DEBUG_CATEGORY_STATIC (ntp_debug);
 #define GST_CAT_DEFAULT (ntp_debug)
 
-#ifdef G_OS_WIN32
-#define close(sock) closesocket(sock)
-#endif
-
 #define DEFAULT_ADDRESS         "0.0.0.0"
 #define DEFAULT_PORT            5637
 
-#define IS_ACTIVE(self) (g_atomic_int_get (&((self)->active)))
+#define IS_ACTIVE(self) (g_atomic_int_get (&((self)->priv->active)))
 
-#ifdef G_OS_WIN32
-#define setsockopt(sock, sol_flags, reuse_flags, ru, sizeofru) setsockopt (sock, sol_flags, reuse_flags, (char *)ru, sizeofru)
-#endif
 enum
 {
   PROP_0,
@@ -82,7 +57,6 @@ enum
   PROP_ADDRESS,
   PROP_CLOCK,
   PROP_ACTIVE
-      /* FILL ME */
 };
 
 #define GST_NET_TIME_PROVIDER_GET_PRIVATE(obj)  \
@@ -90,8 +64,17 @@ enum
 
 struct _GstNetTimeProviderPrivate
 {
-  GstPollFD sock;
-  GstPoll *fdset;
+  gchar *address;
+  int port;
+
+  GThread *thread;
+
+  GstClock *clock;
+
+  gboolean active;              /* ATOMIC */
+
+  GSocket *socket;
+  GCancellable *cancel;
 };
 
 static gboolean gst_net_time_provider_start (GstNetTimeProvider * bself);
@@ -112,18 +95,6 @@ static void gst_net_time_provider_get_property (GObject * object, guint prop_id,
 G_DEFINE_TYPE_WITH_CODE (GstNetTimeProvider, gst_net_time_provider,
     GST_TYPE_OBJECT, _do_init);
 
-#ifdef G_OS_WIN32
-static int
-inet_aton (const char *c, struct in_addr *paddr)
-{
-  paddr->s_addr = inet_addr (c);
-  if (paddr->s_addr == INADDR_NONE)
-    return 0;
-
-  return 1;
-}
-#endif
-
 static void
 gst_net_time_provider_class_init (GstNetTimeProviderClass * klass)
 {
@@ -160,24 +131,12 @@ gst_net_time_provider_class_init (GstNetTimeProviderClass * klass)
 static void
 gst_net_time_provider_init (GstNetTimeProvider * self)
 {
-#ifdef G_OS_WIN32
-  WSADATA w;
-  int error = WSAStartup (0x0202, &w);
-
-  if (error) {
-    GST_DEBUG_OBJECT (self, "Error on WSAStartup");
-  }
-  if (w.wVersion != 0x0202) {
-    WSACleanup ();
-  }
-#endif
   self->priv = GST_NET_TIME_PROVIDER_GET_PRIVATE (self);
 
-  self->port = DEFAULT_PORT;
-  self->priv->sock.fd = -1;
-  self->address = g_strdup (DEFAULT_ADDRESS);
-  self->thread = NULL;
-  self->active = TRUE;
+  self->priv->port = DEFAULT_PORT;
+  self->priv->address = g_strdup (DEFAULT_ADDRESS);
+  self->priv->thread = NULL;
+  self->priv->active = TRUE;
 }
 
 static void
@@ -185,26 +144,17 @@ gst_net_time_provider_finalize (GObject * object)
 {
   GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object);
 
-  if (self->thread) {
+  if (self->priv->thread) {
     gst_net_time_provider_stop (self);
-    g_assert (self->thread == NULL);
-  }
-
-  if (self->priv->fdset) {
-    gst_poll_free (self->priv->fdset);
-    self->priv->fdset = NULL;
+    g_assert (self->priv->thread == NULL);
   }
 
-  g_free (self->address);
-  self->address = NULL;
-
-  if (self->clock)
-    gst_object_unref (self->clock);
-  self->clock = NULL;
+  g_free (self->priv->address);
+  self->priv->address = NULL;
 
-#ifdef G_OS_WIN32
-  WSACleanup ();
-#endif
+  if (self->priv->clock)
+    gst_object_unref (self->priv->clock);
+  self->priv->clock = NULL;
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -213,75 +163,56 @@ static gpointer
 gst_net_time_provider_thread (gpointer data)
 {
   GstNetTimeProvider *self = data;
-  struct sockaddr_in tmpaddr;
-  socklen_t len;
+  GCancellable *cancel = self->priv->cancel;
+  GSocket *socket = self->priv->socket;
   GstNetTimePacket *packet;
-  gint ret;
-
-  while (TRUE) {
-    GST_LOG_OBJECT (self, "doing select");
-    ret = gst_poll_wait (self->priv->fdset, GST_CLOCK_TIME_NONE);
-    GST_LOG_OBJECT (self, "select returned %d", ret);
-
-    if (ret <= 0) {
-      if (errno == EBUSY) {
-        GST_LOG_OBJECT (self, "stop");
-        goto stopped;
-      } else if (errno != EAGAIN && errno != EINTR)
-        goto select_error;
-      else
-        continue;
-    } else {
-      /* got data in */
-      len = sizeof (struct sockaddr);
+  GError *err = NULL;
 
-      packet = gst_net_time_packet_receive (self->priv->sock.fd,
-          (struct sockaddr *) &tmpaddr, &len);
+  GST_INFO_OBJECT (self, "time provider thread is running");
 
-      if (!packet)
-        goto receive_error;
-
-      if (IS_ACTIVE (self)) {
-        /* do what we were asked to and send the packet back */
-        packet->remote_time = gst_clock_get_time (self->clock);
+  while (TRUE) {
+    GSocketAddress *sender_addr = NULL;
 
-        /* ignore errors */
-        gst_net_time_packet_send (packet, self->priv->sock.fd,
-            (struct sockaddr *) &tmpaddr, len);
-      }
+    GST_LOG_OBJECT (self, "waiting on socket");
+    if (!g_socket_condition_wait (socket, G_IO_IN, cancel, &err)) {
+      GST_INFO_OBJECT (self, "socket error: %s", err->message);
 
-      g_free (packet);
+      if (err->code == G_IO_ERROR_CANCELLED)
+        break;
 
+      /* try again */
+      g_usleep (G_USEC_PER_SEC / 10);
+      g_error_free (err);
+      err = NULL;
       continue;
     }
 
-    g_assert_not_reached ();
+    /* got data in */
+    packet = gst_net_time_packet_receive (socket, &sender_addr, &err);
 
-    /* log errors and keep going */
-  select_error:
-    {
-      GST_DEBUG_OBJECT (self, "select error %d: %s (%d)", ret,
-          g_strerror (errno), errno);
-      continue;
-    }
-  stopped:
-    {
-      GST_DEBUG_OBJECT (self, "shutting down");
-      /* close socket */
-      return NULL;
-    }
-  receive_error:
-    {
-      GST_DEBUG_OBJECT (self, "receive error");
+    if (err != NULL) {
+      GST_DEBUG_OBJECT (self, "receive error: %s", err->message);
+      g_usleep (G_USEC_PER_SEC / 10);
+      g_error_free (err);
+      err = NULL;
       continue;
     }
 
-    g_assert_not_reached ();
+    if (IS_ACTIVE (self)) {
+      /* do what we were asked to and send the packet back */
+      packet->remote_time = gst_clock_get_time (self->priv->clock);
 
+      /* ignore errors */
+      gst_net_time_packet_send (packet, socket, sender_addr, NULL);
+      g_object_unref (sender_addr);
+      g_free (packet);
+    }
   }
 
-  g_assert_not_reached ();
+  if (err != NULL)
+    g_error_free (err);
 
+  GST_INFO_OBJECT (self, "time provider thread is stopping");
   return NULL;
 }
 
@@ -290,25 +221,25 @@ gst_net_time_provider_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
   GstNetTimeProvider *self = GST_NET_TIME_PROVIDER (object);
-  GstClock **clock_p = &self->clock;
+  GstClock **clock_p = &self->priv->clock;
 
   switch (prop_id) {
     case PROP_PORT:
-      self->port = g_value_get_int (value);
+      self->priv->port = g_value_get_int (value);
       break;
     case PROP_ADDRESS:
-      g_free (self->address);
+      g_free (self->priv->address);
       if (g_value_get_string (value) == NULL)
-        self->address = g_strdup (DEFAULT_ADDRESS);
+        self->priv->address = g_strdup (DEFAULT_ADDRESS);
       else
-        self->address = g_strdup (g_value_get_string (value));
+        self->priv->address = g_strdup (g_value_get_string (value));
       break;
     case PROP_CLOCK:
       gst_object_replace ((GstObject **) clock_p,
           (GstObject *) g_value_get_object (value));
       break;
     case PROP_ACTIVE:
-      g_atomic_int_set (&self->active, g_value_get_boolean (value));
+      g_atomic_int_set (&self->priv->active, g_value_get_boolean (value));
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -324,13 +255,13 @@ gst_net_time_provider_get_property (GObject * object, guint prop_id,
 
   switch (prop_id) {
     case PROP_PORT:
-      g_value_set_int (value, self->port);
+      g_value_set_int (value, self->priv->port);
       break;
     case PROP_ADDRESS:
-      g_value_set_string (value, self->address);
+      g_value_set_string (value, self->priv->address);
       break;
     case PROP_CLOCK:
-      g_value_set_object (value, self->clock);
+      g_value_set_object (value, self->priv->clock);
       break;
     case PROP_ACTIVE:
       g_value_set_boolean (value, IS_ACTIVE (self));
@@ -344,118 +275,91 @@ gst_net_time_provider_get_property (GObject * object, guint prop_id,
 static gboolean
 gst_net_time_provider_start (GstNetTimeProvider * self)
 {
-  gint ru;
-  struct sockaddr_in my_addr;
-  socklen_t len;
+  GSocketAddress *socket_addr, *bound_addr;
+  GInetAddress *inet_addr;
+  GSocket *socket;
+  GError *err = NULL;
   int port;
-  gint ret;
-  GError *error = NULL;
 
-  if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
+  if (self->priv->address) {
+    inet_addr = g_inet_address_new_from_string (self->priv->address);
+    if (inet_addr == NULL)
+      goto invalid_address;
+  } else {
+    inet_addr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+  }
+
+  GST_LOG_OBJECT (self, "creating socket");
+  socket = g_socket_new (g_inet_address_get_family (inet_addr),
+      G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err);
+
+  if (err != NULL)
     goto no_socket;
 
-  self->priv->sock.fd = ret;
-
-  ru = 1;
-  ret =
-      setsockopt (self->priv->sock.fd, SOL_SOCKET, SO_REUSEADDR, &ru,
-      sizeof (ru));
-  if (ret < 0)
-    goto setsockopt_error;
-
-  memset (&my_addr, 0, sizeof (my_addr));
-  my_addr.sin_family = AF_INET; /* host byte order */
-  my_addr.sin_port = htons ((gint16) self->port);       /* short, network byte order */
-  my_addr.sin_addr.s_addr = INADDR_ANY;
-  if (self->address) {
-    ret = inet_aton (self->address, &my_addr.sin_addr);
-    if (ret == 0)
-      goto invalid_address_error;
-  }
+  GST_DEBUG_OBJECT (self, "binding on port %d", self->priv->port);
+  socket_addr = g_inet_socket_address_new (inet_addr, self->priv->port);
+  g_socket_bind (socket, socket_addr, TRUE, &err);
+  g_object_unref (socket_addr);
+  g_object_unref (inet_addr);
 
-  GST_DEBUG_OBJECT (self, "binding on port %d", self->port);
-  ret =
-      bind (self->priv->sock.fd, (struct sockaddr *) &my_addr,
-      sizeof (my_addr));
-  if (ret < 0)
+  if (err != NULL)
     goto bind_error;
 
-  len = sizeof (my_addr);
-  ret = getsockname (self->priv->sock.fd, (struct sockaddr *) &my_addr, &len);
-  if (ret < 0)
-    goto getsockname_error;
+  bound_addr = g_socket_get_local_address (socket, NULL);
+  port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (bound_addr));
+  GST_DEBUG_OBJECT (self, "bound on UDP port %d", port);
+  g_object_unref (bound_addr);
 
-  port = ntohs (my_addr.sin_port);
-  GST_DEBUG_OBJECT (self, "bound, on port %d", port);
-
-  if (port != self->port) {
-    self->port = port;
-    GST_DEBUG_OBJECT (self, "notifying %d", port);
+  if (port != self->priv->port) {
+    self->priv->port = port;
+    GST_DEBUG_OBJECT (self, "notifying port %d", port);
     g_object_notify (G_OBJECT (self), "port");
   }
 
-  gst_poll_add_fd (self->priv->fdset, &self->priv->sock);
-  gst_poll_fd_ctl_read (self->priv->fdset, &self->priv->sock, TRUE);
+  self->priv->socket = socket;
+  self->priv->cancel = g_cancellable_new ();
 
 #if !GLIB_CHECK_VERSION (2, 31, 0)
-  self->thread = g_thread_create (gst_net_time_provider_thread, self, TRUE,
-      &error);
+  self->priv->thread =
+      g_thread_create (gst_net_time_provider_thread, self, TRUE, &err);
 #else
-  self->thread = g_thread_try_new ("GstNetTimeProvider",
-      gst_net_time_provider_thread, self, &error);
+  self->priv->thread = g_thread_try_new ("GstNetTimeProvider",
+      gst_net_time_provider_thread, self, &err);
 #endif
 
-  if (error != NULL)
+  if (err != NULL)
     goto no_thread;
 
   return TRUE;
 
   /* ERRORS */
-no_socket:
-  {
-    GST_ERROR_OBJECT (self, "socket failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
-    return FALSE;
-  }
-setsockopt_error:
+invalid_address:
   {
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-    GST_ERROR_OBJECT (self, "setsockopt failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
+    GST_ERROR_OBJECT (self, "invalid address: %s", self->priv->address);
     return FALSE;
   }
-invalid_address_error:
+no_socket:
   {
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-    GST_ERROR_OBJECT (self, "invalid network address %s: %s (%d)",
-        self->address, g_strerror (errno), errno);
+    GST_ERROR_OBJECT (self, "could not create socket: %s", err->message);
+    g_error_free (err);
+    g_object_unref (inet_addr);
     return FALSE;
   }
 bind_error:
   {
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-    GST_ERROR_OBJECT (self, "bind failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
-    return FALSE;
-  }
-getsockname_error:
-  {
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-    GST_ERROR_OBJECT (self, "getsockname failed %d: %s (%d)", ret,
-        g_strerror (errno), errno);
+    GST_ERROR_OBJECT (self, "bind failed: %s", err->message);
+    g_error_free (err);
+    g_object_unref (socket);
     return FALSE;
   }
 no_thread:
   {
-    gst_poll_remove_fd (self->priv->fdset, &self->priv->sock);
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-    GST_ERROR_OBJECT (self, "could not create thread: %s", error->message);
-    g_error_free (error);
+    GST_ERROR_OBJECT (self, "could not create thread: %s", err->message);
+    g_error_free (err);
+    g_object_unref (self->priv->socket);
+    self->priv->socket = NULL;
+    g_object_unref (self->priv->cancel);
+    self->priv->cancel = NULL;
     return FALSE;
   }
 }
@@ -463,15 +367,21 @@ no_thread:
 static void
 gst_net_time_provider_stop (GstNetTimeProvider * self)
 {
-  gst_poll_set_flushing (self->priv->fdset, TRUE);
-  g_thread_join (self->thread);
-  self->thread = NULL;
-
-  if (self->priv->sock.fd != -1) {
-    gst_poll_remove_fd (self->priv->fdset, &self->priv->sock);
-    close (self->priv->sock.fd);
-    self->priv->sock.fd = -1;
-  }
+  g_return_if_fail (self->priv->thread != NULL);
+
+  GST_INFO_OBJECT (self, "stopping..");
+  g_cancellable_cancel (self->priv->cancel);
+
+  g_thread_join (self->priv->thread);
+  self->priv->thread = NULL;
+
+  g_object_unref (self->priv->cancel);
+  self->priv->cancel = NULL;
+
+  g_object_unref (self->priv->socket);
+  self->priv->socket = NULL;
+
+  GST_INFO_OBJECT (self, "stopped");
 }
 
 /**
@@ -496,22 +406,12 @@ gst_net_time_provider_new (GstClock * clock, const gchar * address, gint port)
   ret = g_object_new (GST_TYPE_NET_TIME_PROVIDER, "clock", clock, "address",
       address, "port", port, NULL);
 
-  if ((ret->priv->fdset = gst_poll_new (TRUE)) == NULL)
-    goto no_fdset;
-
   if (!gst_net_time_provider_start (ret))
     goto failed_start;
 
   /* all systems go, cap'n */
   return ret;
 
-no_fdset:
-  {
-    GST_ERROR_OBJECT (ret, "could not create an fdset: %s (%d)",
-        g_strerror (errno), errno);
-    gst_object_unref (ret);
-    return NULL;
-  }
 failed_start:
   {
     /* already printed a nice error */
index c8cde3f..81c0fbe 100644 (file)
@@ -1,6 +1,7 @@
 /* GStreamer
  * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
  *               2006 Joni Valtanen <joni.valtanen@movial.fi>
+ * Copyright (C) 2012 Collabora Ltd. <tim.muller@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public
 #ifndef __GST_NET_TIME_PROVIDER_H__
 #define __GST_NET_TIME_PROVIDER_H__
 
-/* to determinate os */
-#include <glib.h>
-
 #include <gst/gst.h>
 
 G_BEGIN_DECLS
 
-#include <errno.h>
-#include <string.h>
-#include <sys/types.h>
-
-#ifdef G_OS_WIN32
-#include <winsock2.h>
-#include <ws2tcpip.h>
-#ifndef socklen_t
-#define socklen_t int
-#endif
-#else
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#endif
-
-#include <fcntl.h>
-
 #define GST_TYPE_NET_TIME_PROVIDER \
   (gst_net_time_provider_get_type())
 #define GST_NET_TIME_PROVIDER(obj) \
@@ -72,20 +51,6 @@ struct _GstNetTimeProvider {
   GstObject parent;
 
   /*< private >*/
-  gchar *address;
-  int port;
-
-  int sock;
-  int control_sock[2];
-
-  GThread *thread;
-
-  GstClock *clock;
-
-  /* has to be a gint, we use atomic ops here */
-  gint active;
-
-  /*< private >*/
   GstNetTimeProviderPrivate *priv;
 
   gpointer _gst_reserved[GST_PADDING];
@@ -98,6 +63,7 @@ struct _GstNetTimeProviderClass {
 };
 
 GType                   gst_net_time_provider_get_type  (void);
+
 GstNetTimeProvider*     gst_net_time_provider_new       (GstClock *clock,
                                                          const gchar *address,
                                                          gint port);
index 1892d98..f254fae 100644 (file)
@@ -193,7 +193,7 @@ libs_gstnetclientclock_LDADD = \
        $(LDADD)
 libs_gstnettimeprovider_LDADD = \
        $(top_builddir)/libs/gst/net/libgstnet-@GST_MAJORMINOR@.la \
-       $(LDADD)
+       $(GIO_LIBS) $(LDADD)
 
 # valgrind testing
 # these just need valgrind fixing, period
index 1e63270..49bfee0 100644 (file)
@@ -57,9 +57,10 @@ GST_START_TEST (test_functioning)
   GstNetTimePacket *packet;
   GstClock *clock;
   GstClockTime local;
-  struct sockaddr_in servaddr;
-  gint port = -1, sockfd, ret;
-  socklen_t len;
+  GSocketAddress *server_addr;
+  GInetAddress *addr;
+  GSocket *socket;
+  gint port = -1;
 
   clock = gst_system_clock_obtain ();
   fail_unless (clock != NULL, "failed to get system clock");
@@ -69,33 +70,24 @@ GST_START_TEST (test_functioning)
   g_object_get (ntp, "port", &port, NULL);
   fail_unless (port > 0);
 
-  sockfd = socket (AF_INET, SOCK_DGRAM, 0);
-  fail_if (sockfd < 0, "socket failed");
+  socket = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
+      G_SOCKET_PROTOCOL_UDP, NULL);
+  fail_unless (socket != NULL, "could not create socket");
 
-  memset (&servaddr, 0, sizeof (servaddr));
-  servaddr.sin_family = AF_INET;
-  servaddr.sin_port = htons (port);
-#ifndef G_OS_WIN32
-  inet_aton ("127.0.0.1", &servaddr.sin_addr);
-#else
-  servaddr.sin_addr.s_addr = inet_addr ("127.0.0.1");
-#endif
+  addr = g_inet_address_new_from_string ("127.0.0.1");
+  server_addr = g_inet_socket_address_new (addr, port);
+  g_object_unref (addr);
 
   packet = gst_net_time_packet_new (NULL);
   fail_unless (packet != NULL, "failed to create packet");
 
   packet->local_time = local = gst_clock_get_time (clock);
 
-  len = sizeof (servaddr);
-  ret = gst_net_time_packet_send (packet, sockfd,
-      (struct sockaddr *) &servaddr, len);
-
-  fail_unless (ret == GST_NET_TIME_PACKET_SIZE, "failed to send packet");
+  fail_unless (gst_net_time_packet_send (packet, socket, server_addr, NULL));
 
   g_free (packet);
 
-  packet = gst_net_time_packet_receive (sockfd, (struct sockaddr *) &servaddr,
-      &len);
+  packet = gst_net_time_packet_receive (socket, NULL, NULL);
 
   fail_unless (packet != NULL, "failed to receive packet");
   fail_unless (packet->local_time == local, "local time is not the same");
@@ -105,7 +97,8 @@ GST_START_TEST (test_functioning)
 
   g_free (packet);
 
-  close (sockfd);
+  g_object_unref (socket);
+  g_object_unref (server_addr);
 
   gst_object_unref (ntp);
   gst_object_unref (clock);