ptp: Initial implementation of a PTP clock
authorSebastian Dröge <sebastian@centricular.com>
Thu, 14 May 2015 10:18:25 +0000 (12:18 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Wed, 3 Jun 2015 11:55:28 +0000 (13:55 +0200)
GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in
slave-only mode, that allows a GStreamer pipeline to synchronize
to a PTP network clock in some specific domain.

The PTP subsystem can be initialized with gst_ptp_init(), which then
starts a helper process to do the actual communication via the PTP
ports. This is required as PTP listens on ports < 1024 and thus
requires special privileges. Once this helper process is started, the
main process will synchronize to all PTP domains that are detected on
the selected interfaces.

gst_ptp_clock_new() then allows to create a GstClock that provides the
PTP time from a master clock inside a specific PTP domain. This clock
will only return valid timestamps once the timestamps in the PTP domain
are known. To check this, the GstPtpClock::internal-clock property and
the related notify::clock signal can be used. Once the internal clock
is not NULL, the PTP domain's time is known. Alternatively you can wait
for this with gst_ptp_clock_wait_ready().

To gather statistics about the PTP clock synchronization,
gst_ptp_statistics_callback_add() can be used. This gives the
application the possibility to collect all kinds of statistics
from the clock synchronization.

https://bugzilla.gnome.org/show_bug.cgi?id=749391

15 files changed:
configure.ac
docs/libs/gstreamer-libs-docs.sgml
docs/libs/gstreamer-libs-sections.txt
libs/gst/helpers/Makefile.am
libs/gst/helpers/gst-ptp-helper.c [new file with mode: 0644]
libs/gst/net/Makefile.am
libs/gst/net/gstptp_private.h [new file with mode: 0644]
libs/gst/net/gstptpclock.c [new file with mode: 0644]
libs/gst/net/gstptpclock.h [new file with mode: 0644]
libs/gst/net/net.h
tests/examples/Makefile.am
tests/examples/ptp/.gitignore [new file with mode: 0644]
tests/examples/ptp/Makefile.am [new file with mode: 0644]
tests/examples/ptp/ptp-print-times.c [new file with mode: 0644]
win32/common/libgstnet.def

index 42fe32a..c64855b 100644 (file)
@@ -258,6 +258,105 @@ if test "x$USE_POISONING" = xyes; then
     [Define if we should poison deallocated memory])
 fi
 
+dnl PTP support parts
+AC_MSG_CHECKING([whether PTP support can be enabled])
+case "$host_os" in
+  *android*)
+    dnl Can't run on Android because of permissions
+    HAVE_PTP=no
+    ;;
+  mingw*|pw32*|cygwin*)
+    dnl Not ported to Windows yet
+    HAVE_PTP=no
+    ;;
+  darwin*)
+    dnl Can't run on iOS because of permissions
+    AC_CHECK_HEADER(MobileCoreServices/MobileCoreServices.h, HAVE_PTP="no", HAVE_PTP="yes", [-])
+    ;;
+  linux*|darwin*|solaris*|netbsd*|freebsd*|openbsd*|kfreebsd*|dragonfly*|gnu*)
+    HAVE_PTP=yes
+    ;;
+  *)
+    HAVE_PTP=no
+    ;;
+esac
+AC_MSG_RESULT([$HAVE_PTP])
+
+dnl user/group to change to in gst-ptp-helper
+AC_ARG_WITH([ptp-helper-setuid-user],
+  AS_HELP_STRING([--with-ptp-helper-setuid-user],[User to switch to when installing gst-ptp-helper setuid root]),
+  [
+    if test "x$withval" != "x"
+    then
+      AC_DEFINE_UNQUOTED(HAVE_PTP_HELPER_SETUID_USER, "$withval", [PTP helper setuid user])
+    fi
+  ], []
+)
+
+dnl group/group to change to in gst-ptp-helper
+AC_ARG_WITH([ptp-helper-setuid-group],
+  AS_HELP_STRING([--with-ptp-helper-setuid-group],[Group to switch to when installing gst-ptp-helper setuid root]),
+  [
+    if test "x$withval" != "x"
+    then
+      AC_DEFINE_UNQUOTED(HAVE_PTP_HELPER_SETUID_GROUP, "$withval", [PTP helper setuid group])
+    fi
+  ], []
+)
+
+AC_ARG_WITH(
+  ptp-helper-permissions,
+  AC_HELP_STRING(
+    [--with-ptp-helper-permissions],
+    [how to gain PTP permissions (none, setuid-root, capabilities, auto)]),
+    [],
+    [with_ptp_helper_permissions=auto])
+
+gst_ptp_have_cap=no
+AG_GST_CHECK_LIBHEADER(CAP, cap,
+                       cap_init, ,
+                       sys/capability.h,
+                       CAP_LIBS="-lcap"
+                       AC_SUBST(CAP_LIBS)
+                       gst_ptp_have_cap=yes)
+
+AC_PATH_PROG([SETCAP], [setcap], [no], [$PATH:/usr/bin:/bin:/usr/sbin:/sbin])
+
+if test "x$HAVE_PTP" = "xyes"; then
+AC_DEFINE(HAVE_PTP, 1, [PTP support available])
+
+AC_MSG_CHECKING([how to install gst-ptp-helper])
+if test "x$with_ptp_helper_permissions" = "xauto"; then
+    if test "x$gst_ptp_have_cap" = "xyes" -a "x$SETCAP" != "xno"; then
+        with_ptp_helper_permissions="capabilities"
+    else
+        with_ptp_helper_permissions="setuid-root"
+    fi
+fi
+AC_MSG_RESULT([$with_ptp_helper_permissions])
+
+case "$with_ptp_helper_permissions" in
+  none)
+    ;;
+  setuid-root)
+     AC_DEFINE(HAVE_PTP_HELPER_SETUID, 1,
+        [Use setuid-root for permissions in PTP helper])
+    ;;
+  capabilities)
+     AC_DEFINE(HAVE_PTP_HELPER_CAPABILITIES, 1,
+        [Use capabilities for permissions in PTP helper])
+    ;;
+  *)
+    AC_MSG_ERROR(Invalid parameter [$with_ptp_helper_permissions])
+    ;;
+esac
+
+fi
+
+AM_CONDITIONAL(HAVE_PTP, test "x$HAVE_PTP" = "xyes")
+AM_CONDITIONAL(HAVE_PTP_HELPER_SETUID, test "x$with_ptp_helper_permissions" = "xsetuid-root")
+AM_CONDITIONAL(HAVE_PTP_HELPER_CAPABILITIES, test "x$with_ptp_helper_permissions" = "xcapabilities")
+
 dnl *** checks for platform ***
 
 dnl * hardware/architecture *
@@ -806,6 +905,12 @@ AC_DEFINE_UNQUOTED(GST_PLUGIN_SCANNER_INSTALLED,
     "$GST_PLUGIN_SCANNER_INSTALLED", [location of the installed gst-plugin-scanner])
 AC_SUBST(GST_PLUGIN_SCANNER_INSTALLED)
 
+dnl ptp helper locations
+AS_AC_EXPAND(GST_PTP_HELPER_INSTALLED,${libexecdir}/gstreamer-$GST_API_VERSION/gst-ptp-helper)
+AC_DEFINE_UNQUOTED(GST_PTP_HELPER_INSTALLED,
+    "$GST_PTP_HELPER_INSTALLED", [location of the installed gst-ptp-helper])
+AC_SUBST(GST_PTP_HELPER_INSTALLED)
+
 dnl things for our internal libcheck (must be called even if building
 dnl libcheck is disabled because it defines conditionals)
 AG_GST_CHECK_CHECKS()
@@ -842,6 +947,7 @@ tests/examples/helloworld/Makefile
 tests/examples/manual/Makefile
 tests/examples/memory/Makefile
 tests/examples/netclock/Makefile
+tests/examples/ptp/Makefile
 tests/examples/streamiddemux/Makefile
 tests/examples/streams/Makefile
 tools/Makefile
@@ -945,6 +1051,7 @@ Configuration
        Plugin support             : ${enable_plugin}
        Static plugins             : ${enable_static_plugins}
        Unit testing support       : ${BUILD_CHECK}
+       PTP clock support          : ${HAVE_PTP}
 
        Debug                      : ${USE_DEBUG}
        Profiling                  : ${USE_PROFILING}
index 0070771..406d606 100644 (file)
@@ -76,6 +76,7 @@
       <xi:include href="xml/gstnetclientclock.xml" />
       <xi:include href="xml/gstnettimepacket.xml" />
       <xi:include href="xml/gstnettimeprovider.xml" />
+      <xi:include href="xml/gstptpclock.xml" />
     </chapter>
 
     <chapter id="gstreamer-check">
index f1db3a0..1dfc53b 100644 (file)
@@ -942,6 +942,32 @@ gst_net_time_provider_get_type
 </SECTION>
 
 <SECTION>
+<FILE>gstptpclock</FILE>
+<TITLE>GstPtpClock</TITLE>
+<INCLUDE>gst/net/net.h</INCLUDE>
+gst_ptp_init
+gst_ptp_deinit
+gst_ptp_is_initialized
+gst_ptp_is_supported
+
+GstPtpClock
+gst_ptp_clock_new
+
+gst_ptp_statistics_callback_add
+gst_ptp_statistics_callback_remove
+<SUBSECTION Standard>
+GstPtpClockClass
+GstPtpClockPrivate
+GST_PTP_CLOCK
+GST_IS_PTP_CLOCK
+GST_TYPE_PTP_CLOCK
+GST_PTP_CLOCK_CLASS
+GST_IS_PTP_CLOCK_CLASS
+<SUBSECTION Private>
+gst_ptp_clock_get_type
+</SECTION>
+
+<SECTION>
 <FILE>gstcheck</FILE>
 <TITLE>GstCheck</TITLE>
 <INCLUDE>gst/check/gstcheck.h</INCLUDE>
index 1f1eacc..141c2c1 100644 (file)
@@ -7,8 +7,33 @@ gst_completion_helper_@GST_API_VERSION@_LDADD = $(GST_OBJ_LIBS)
 
 bashhelpersdir = $(BASH_HELPERS_DIR)
 dist_bashhelpers_DATA = gst
+endif
+
+helpers_PROGRAMS = gst-plugin-scanner
+helpersdir=$(libexecdir)/gstreamer-$(GST_API_VERSION)
+
+gst_plugin_scanner_SOURCES = gst-plugin-scanner.c
+gst_plugin_scanner_CFLAGS = $(GST_OBJ_CFLAGS)
+gst_plugin_scanner_LDADD = $(GST_OBJ_LIBS)
+
+if HAVE_PTP
+helpers_PROGRAMS += gst-ptp-helper
+gst_ptp_helper_SOURCES = gst-ptp-helper.c
+gst_ptp_helper_CFLAGS = $(GST_OBJ_CFLAGS) $(GIO_CFLAGS)
+gst_ptp_helper_LDADD = $(GST_OBJ_LIBS) $(GIO_LIBS) $(CAP_LIBS)
+endif
 
 install-exec-hook:
+if HAVE_PTP
+if HAVE_PTP_HELPER_SETUID
+       chown root $(DESTDIR)$(helpersdir)/gst-ptp-helper
+       chmod u+s $(DESTDIR)$(helpersdir)/gst-ptp-helper
+endif
+if HAVE_PTP_HELPER_CAPABILITIES
+       $(SETCAP) cap_net_bind_service,cap_net_admin+ep $(DESTDIR)$(helpersdir)/gst-ptp-helper
+endif
+endif
+if ENABLE_BASH_COMPLETION
        $(MKDIR_P) $(DESTDIR)$(BASH_HELPERS_DIR) && \
        cd $(DESTDIR)$(bindir) && \
        $(INSTALL) `echo "gst-completion-helper-" | sed '$(transform)'`@GST_API_VERSION@$(EXEEXT) \
@@ -19,13 +44,6 @@ uninstall-hook:
        rm -f $(DESTDIR)$(BASH_HELPERS_DIR)/gst-completion-helper-@GST_API_VERSION@$(EXEEXT)
 endif
 
-helpers_PROGRAMS = gst-plugin-scanner
-helpersdir=$(libexecdir)/gstreamer-$(GST_API_VERSION)
-
-gst_plugin_scanner_SOURCES = gst-plugin-scanner.c
-gst_plugin_scanner_CFLAGS = $(GST_OBJ_CFLAGS)
-gst_plugin_scanner_LDADD = $(GST_OBJ_LIBS)
-
 # clean out the old one to make sure everything is udpated correctly
 # remove again after release
 CLEANFILES = plugin-scanner
diff --git a/libs/gst/helpers/gst-ptp-helper.c b/libs/gst/helpers/gst-ptp-helper.c
new file mode 100644 (file)
index 0000000..2f10639
--- /dev/null
@@ -0,0 +1,560 @@
+/* GStreamer
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/* Helper process that runs setuid root or with appropriate privileges to
+ * listen on ports < 1024, do multicast operations and get MAC addresses of
+ * interfaces. Privileges are dropped after these operations are done.
+ *
+ * It listens on the PTP multicast group on port 319 and 320 and forwards
+ * everything received there to stdout, while forwarding everything received
+ * on stdout to those sockets.
+ * Additionally it provides the MAC address of a network interface via stdout
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <string.h>
+
+#ifdef HAVE_PTP_HELPER_SETUID
+#include <grp.h>
+#include <pwd.h>
+#endif
+
+#ifdef HAVE_PTP_HELPER_CAPABILITIES
+#include <sys/capability.h>
+#endif
+
+#include <glib.h>
+#include <gio/gio.h>
+
+#include <gst/gst.h>
+#include <gst/net/gstptp_private.h>
+
+#define PTP_MULTICAST_GROUP "224.0.1.129"
+#define PTP_EVENT_PORT   319
+#define PTP_GENERAL_PORT 320
+
+static gchar **ifaces = NULL;
+static gboolean verbose = FALSE;
+static guint64 clock_id = (guint64) - 1;
+static guint8 clock_id_array[8];
+
+static GOptionEntry opt_entries[] = {
+  {"interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY, &ifaces,
+      "Interface to listen on", NULL},
+  {"clock-id", 'c', 0, G_OPTION_ARG_INT64, &clock_id,
+      "PTP clock id", NULL},
+  {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose,
+      "Be verbose", NULL},
+  {NULL}
+};
+
+static GSocketAddress *event_saddr, *general_saddr;
+static GSocket *socket_event, *socket_general;
+static GIOChannel *stdin_channel, *stdout_channel;
+
+static gboolean
+have_socket_data_cb (GSocket * socket, GIOCondition condition,
+    gpointer user_data)
+{
+  gchar buffer[8192];
+  gssize read;
+  gsize written;
+  GError *err = NULL;
+  GIOStatus status;
+  StdIOHeader header = { 0, };
+
+  read = g_socket_receive (socket, buffer, sizeof (buffer), NULL, &err);
+  if (read == -1)
+    g_error ("Failed to read from socket: %s", err->message);
+
+  if (verbose)
+    g_message ("Received %" G_GSSIZE_FORMAT " bytes from %s socket", read,
+        (socket == socket_event ? "event" : "general"));
+
+  header.size = read;
+  header.type = (socket == socket_event) ? TYPE_EVENT : TYPE_GENERAL;
+
+  status =
+      g_io_channel_write_chars (stdout_channel, (gchar *) & header,
+      sizeof (header), &written, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_error ("Failed to write to stdout: %s", err->message);
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdout");
+    exit (0);
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_error ("Unexpected stdout write status: %d", status);
+  } else if (written != sizeof (header)) {
+    g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+  }
+
+  status =
+      g_io_channel_write_chars (stdout_channel, buffer, read, &written, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_error ("Failed to write to stdout: %s", err->message);
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdout");
+    exit (0);
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_error ("Unexpected stdout write status: %d", status);
+  } else if (written != read) {
+    g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+  }
+
+  return G_SOURCE_CONTINUE;
+}
+
+static gboolean
+have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
+    gpointer user_data)
+{
+  GIOStatus status;
+  StdIOHeader header = { 0, };
+  gchar buffer[8192];
+  GError *err = NULL;
+  gsize read;
+  gssize written;
+
+  if ((condition & G_IO_STATUS_EOF)) {
+    g_message ("EOF on stdin");
+    exit (0);
+  }
+
+  status =
+      g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
+      &read, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_error ("Failed to read from stdin: %s", err->message);
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdin");
+    exit (0);
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_error ("Unexpected stdin read status: %d", status);
+  } else if (read != sizeof (header)) {
+    g_error ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+  } else if (header.size > 8192) {
+    g_error ("Unexpected size: %u", header.size);
+  }
+
+  status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_error ("Failed to read from stdin: %s", err->message);
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdin");
+    exit (0);
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_error ("Unexpected stdin read status: %d", status);
+  } else if (read != header.size) {
+    g_error ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+  }
+
+  switch (header.type) {
+    case TYPE_EVENT:
+    case TYPE_GENERAL:
+      written =
+          g_socket_send_to (header.type ==
+          TYPE_EVENT ? socket_event : socket_general,
+          (header.type == TYPE_EVENT ? event_saddr : general_saddr), buffer,
+          header.size, NULL, &err);
+      if (written == -1)
+        g_error ("Failed to write to socket: %s", err->message);
+      else if (written != header.size)
+        g_error ("Unexpected write size: %" G_GSSIZE_FORMAT, written);
+
+      if (verbose)
+        g_message ("Sent %" G_GSSIZE_FORMAT " bytes to %s socket", read,
+            (header.type == TYPE_EVENT ? "event" : "general"));
+      break;
+    default:
+      break;
+  }
+
+  return G_SOURCE_CONTINUE;
+}
+
+static void
+setup_sockets (void)
+{
+  GInetAddress *bind_addr, *mcast_addr;
+  GSocketAddress *bind_saddr;
+  GSource *socket_event_source, *socket_general_source;
+  gchar **probed_ifaces = NULL;
+  GError *err = NULL;
+
+  /* Create sockets */
+  socket_event =
+      g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
+      G_SOCKET_PROTOCOL_UDP, &err);
+  if (!socket_event)
+    g_error ("Couldn't create event socket: %s", err->message);
+
+  socket_general =
+      g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
+      G_SOCKET_PROTOCOL_UDP, &err);
+  if (!socket_general)
+    g_error ("Couldn't create general socket: %s", err->message);
+
+  /* Bind sockets */
+  bind_addr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+  bind_saddr = g_inet_socket_address_new (bind_addr, PTP_EVENT_PORT);
+  if (!g_socket_bind (socket_event, bind_saddr, TRUE, &err))
+    g_error ("Couldn't bind event socket: %s", err->message);
+  g_object_unref (bind_saddr);
+  bind_saddr = g_inet_socket_address_new (bind_addr, PTP_GENERAL_PORT);
+  if (!g_socket_bind (socket_general, bind_saddr, TRUE, &err))
+    g_error ("Couldn't bind general socket: %s", err->message);
+  g_object_unref (bind_saddr);
+  g_object_unref (bind_addr);
+
+  /* Probe all non-loopback interfaces */
+  if (!ifaces) {
+    struct ifreq ifr;
+    struct ifconf ifc;
+    gchar buf[8192];
+
+    ifc.ifc_len = sizeof (buf);
+    ifc.ifc_buf = buf;
+    if (ioctl (g_socket_get_fd (socket_event), SIOCGIFCONF, &ifc) != -1) {
+      struct ifreq *it = ifc.ifc_req;
+      const struct ifreq *const end =
+          it + (ifc.ifc_len / sizeof (struct ifreq));
+      guint idx = 0;
+
+      probed_ifaces = g_new0 (gchar *, ifc.ifc_len + 1);
+
+      for (; it != end; ++it) {
+        strcpy (ifr.ifr_name, it->ifr_name);
+        if (ioctl (g_socket_get_fd (socket_event), SIOCGIFFLAGS, &ifr) == 0) {
+          if ((ifr.ifr_flags & IFF_LOOPBACK))
+            continue;
+          probed_ifaces[idx] = g_strdup (it->ifr_name);
+          idx++;
+        } else {
+          g_warning ("can't get flags of interface '%s'", it->ifr_name);
+          probed_ifaces[idx] = g_strdup (it->ifr_name);
+          idx++;
+        }
+      }
+
+      if (idx != 0)
+        ifaces = probed_ifaces;
+    }
+  }
+
+  /* Get a clock id from the MAC address if none was given */
+  if (clock_id == (guint64) - 1) {
+    struct ifreq ifr;
+    gboolean success = FALSE;
+
+    if (ifaces) {
+      gchar **ptr = ifaces;
+
+      while (*ptr) {
+        strcpy (ifr.ifr_name, *ptr);
+        if (ioctl (g_socket_get_fd (socket_event), SIOCGIFHWADDR, &ifr) == 0) {
+          clock_id_array[0] = ifr.ifr_hwaddr.sa_data[0];
+          clock_id_array[1] = ifr.ifr_hwaddr.sa_data[1];
+          clock_id_array[2] = ifr.ifr_hwaddr.sa_data[2];
+          clock_id_array[3] = 0xff;
+          clock_id_array[4] = 0xfe;
+          clock_id_array[5] = ifr.ifr_hwaddr.sa_data[3];
+          clock_id_array[6] = ifr.ifr_hwaddr.sa_data[4];
+          clock_id_array[7] = ifr.ifr_hwaddr.sa_data[5];
+          success = TRUE;
+          break;
+        }
+      }
+
+      ptr++;
+    } else {
+      struct ifconf ifc;
+      gchar buf[8192];
+
+      ifc.ifc_len = sizeof (buf);
+      ifc.ifc_buf = buf;
+      if (ioctl (g_socket_get_fd (socket_event), SIOCGIFCONF, &ifc) != -1) {
+        struct ifreq *it = ifc.ifc_req;
+        const struct ifreq *const end =
+            it + (ifc.ifc_len / sizeof (struct ifreq));
+
+        for (; it != end; ++it) {
+          strcpy (ifr.ifr_name, it->ifr_name);
+          if (ioctl (g_socket_get_fd (socket_event), SIOCGIFFLAGS, &ifr) == 0) {
+            if ((ifr.ifr_flags & IFF_LOOPBACK))
+              continue;
+
+            if (ioctl (g_socket_get_fd (socket_event), SIOCGIFHWADDR,
+                    &ifr) == 0) {
+              clock_id_array[0] = ifr.ifr_hwaddr.sa_data[0];
+              clock_id_array[1] = ifr.ifr_hwaddr.sa_data[1];
+              clock_id_array[2] = ifr.ifr_hwaddr.sa_data[2];
+              clock_id_array[3] = 0xff;
+              clock_id_array[4] = 0xfe;
+              clock_id_array[5] = ifr.ifr_hwaddr.sa_data[3];
+              clock_id_array[6] = ifr.ifr_hwaddr.sa_data[4];
+              clock_id_array[7] = ifr.ifr_hwaddr.sa_data[5];
+              success = TRUE;
+              break;
+            }
+          } else {
+            g_warning ("can't get flags of interface '%s'", it->ifr_name);
+          }
+        }
+      }
+    }
+
+    if (!success) {
+      g_warning ("can't get any MAC address, using random clock id");
+      clock_id = (((guint64) g_random_int ()) << 32) | (g_random_int ());
+      GST_WRITE_UINT64_BE (clock_id_array, clock_id);
+      clock_id_array[3] = 0xff;
+      clock_id_array[4] = 0xfe;
+    }
+  } else {
+    GST_WRITE_UINT64_BE (clock_id_array, clock_id);
+  }
+
+  /* Join multicast groups */
+  mcast_addr = g_inet_address_new_from_string (PTP_MULTICAST_GROUP);
+  if (ifaces) {
+    gchar **ptr = ifaces;
+    gboolean success = FALSE;
+
+    while (*ptr) {
+      gint c = 0;
+      if (!g_socket_join_multicast_group (socket_event, mcast_addr, FALSE, *ptr,
+              &err))
+        g_warning ("Couldn't join multicast group on interface '%s': %s",
+            *ptr, err->message);
+      else
+        c++;
+
+      if (!g_socket_join_multicast_group (socket_general, mcast_addr, FALSE,
+              *ptr, &err))
+        g_warning ("Couldn't join multicast group on interface '%s': %s",
+            *ptr, err->message);
+      else
+        c++;
+
+      if (c == 2)
+        success = TRUE;
+      ptr++;
+    }
+
+    if (!success) {
+      /* Join multicast group without any interface */
+      if (!g_socket_join_multicast_group (socket_event, mcast_addr, FALSE, NULL,
+              &err))
+        g_error ("Couldn't join multicast group: %s", err->message);
+      if (!g_socket_join_multicast_group (socket_general, mcast_addr, FALSE,
+              NULL, &err))
+        g_error ("Couldn't join multicast group: %s", err->message);
+    }
+  } else {
+    /* Join multicast group without any interface */
+    if (!g_socket_join_multicast_group (socket_event, mcast_addr, FALSE, NULL,
+            &err))
+      g_error ("Couldn't join multicast group: %s", err->message);
+    if (!g_socket_join_multicast_group (socket_general, mcast_addr, FALSE, NULL,
+            &err))
+      g_error ("Couldn't join multicast group: %s", err->message);
+  }
+
+  event_saddr = g_inet_socket_address_new (mcast_addr, PTP_EVENT_PORT);
+  general_saddr = g_inet_socket_address_new (mcast_addr, PTP_GENERAL_PORT);
+
+  /* Create socket sources */
+  socket_event_source =
+      g_socket_create_source (socket_event, G_IO_IN | G_IO_PRI, NULL);
+  g_source_set_priority (socket_event_source, G_PRIORITY_HIGH);
+  g_source_set_callback (socket_event_source, (GSourceFunc) have_socket_data_cb,
+      NULL, NULL);
+  g_source_attach (socket_event_source, NULL);
+  socket_general_source =
+      g_socket_create_source (socket_general, G_IO_IN | G_IO_PRI, NULL);
+  g_source_set_priority (socket_general_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (socket_general_source,
+      (GSourceFunc) have_socket_data_cb, NULL, NULL);
+  g_source_attach (socket_general_source, NULL);
+
+  g_strfreev (probed_ifaces);
+}
+
+static void
+drop_privileges (void)
+{
+#ifdef HAVE_PTP_HELPER_SETUID
+  /* Switch to the given user/group */
+#ifdef HAVE_PTP_HELPER_SETUID_GROUP
+  {
+    struct group *grp;
+
+    grp = getgrnam (HAVE_PTP_HELPER_SETUID_GROUP);
+    if (!grp)
+      g_error ("Failed to get group information '%s': %s",
+          HAVE_PTP_HELPER_SETUID_GROUP, g_strerror (errno));
+
+    if (setgid (grp->gr_gid) != 0)
+      g_error ("Failed to change to group '%s': %s",
+          HAVE_PTP_HELPER_SETUID_GROUP, g_strerror (errno));
+  }
+#endif
+
+#ifdef HAVE_PTP_HELPER_SETUID_USER
+  {
+    struct passwd *pwd;
+
+    pwd = getpwnam (HAVE_PTP_HELPER_SETUID_USER);
+    if (!pwd)
+      g_error ("Failed to get user information '%s': %s",
+          HAVE_PTP_HELPER_SETUID_USER, g_strerror (errno));
+
+#ifndef HAVE_PTP_HELPER_SETUID_GROUP
+    if (setgid (pwd->pw_gid) != 0)
+      g_error ("Failed to change to user group '%s': %s",
+          HAVE_PTP_HELPER_SETUID_USER, g_strerror (errno));
+#endif
+
+    if (setuid (pwd->pw_uid) != 0)
+      g_error ("Failed to change to user '%s': %s", HAVE_PTP_HELPER_SETUID_USER,
+          g_strerror (errno));
+  }
+#endif
+#endif
+#ifdef HAVE_PTP_HELPER_CAPABILITIES
+  /* Drop all capabilities */
+  {
+    cap_t caps;
+
+    caps = cap_get_proc ();
+    if (caps == 0)
+      g_error ("Failed to get process caps: %s", g_strerror (errno));
+    if (cap_clear (caps) != 0)
+      g_error ("Failed to clear caps: %s", g_strerror (errno));
+    if (cap_set_proc (caps) != 0)
+      g_error ("Failed to set process caps: %s", g_strerror (errno));
+  }
+#endif
+}
+
+static void
+setup_stdio_channels (void)
+{
+  GSource *stdin_source;
+
+  /* Create stdin source */
+  stdin_channel = g_io_channel_unix_new (STDIN_FILENO);
+  if (g_io_channel_set_encoding (stdin_channel, NULL,
+          NULL) == G_IO_STATUS_ERROR)
+    g_error ("Failed to set stdin to binary encoding");
+  g_io_channel_set_buffered (stdin_channel, FALSE);
+  stdin_source =
+      g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
+  g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
+      NULL);
+  g_source_attach (stdin_source, NULL);
+
+  /* Create stdout channel */
+  stdout_channel = g_io_channel_unix_new (STDOUT_FILENO);
+  if (g_io_channel_set_encoding (stdout_channel, NULL,
+          NULL) == G_IO_STATUS_ERROR)
+    g_error ("Failed to set stdout to binary encoding");
+  g_io_channel_set_buffered (stdout_channel, FALSE);
+}
+
+static void
+write_clock_id (void)
+{
+  GError *err = NULL;
+  GIOStatus status;
+  StdIOHeader header = { 0, };
+  gsize written;
+
+  /* Write clock id to stdout */
+
+  header.type = TYPE_CLOCK_ID;
+  header.size = 8;
+  status =
+      g_io_channel_write_chars (stdout_channel, (gchar *) & header,
+      sizeof (header), &written, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_error ("Failed to write to stdout: %s", err->message);
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdout");
+    exit (0);
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_error ("Unexpected stdout write status: %d", status);
+  } else if (written != sizeof (header)) {
+    g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+  }
+
+  status =
+      g_io_channel_write_chars (stdout_channel,
+      (const gchar *) clock_id_array, sizeof (clock_id_array), &written, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_error ("Failed to write to stdout: %s", err->message);
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdout");
+    exit (0);
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_error ("Unexpected stdout write status: %d", status);
+  } else if (written != sizeof (clock_id_array)) {
+    g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+  }
+}
+
+gint
+main (gint argc, gchar ** argv)
+{
+  GOptionContext *opt_ctx;
+  GMainLoop *loop;
+  GError *err = NULL;
+
+  opt_ctx = g_option_context_new ("- GStreamer PTP helper process");
+  g_option_context_add_main_entries (opt_ctx, opt_entries, NULL);
+  if (!g_option_context_parse (opt_ctx, &argc, &argv, &err))
+    g_error ("Error parsing options: %s", err->message);
+  g_option_context_free (opt_ctx);
+
+  setup_sockets ();
+  drop_privileges ();
+  setup_stdio_channels ();
+  write_clock_id ();
+
+  /* Get running */
+  loop = g_main_loop_new (NULL, FALSE);
+  g_main_loop_run (loop);
+
+  /* We never exit cleanly, so don't do cleanup */
+  g_assert_not_reached ();
+
+  return 0;
+}
index f78f679..cc81cef 100644 (file)
@@ -8,19 +8,36 @@ libgstnet_@GST_API_VERSION@_include_HEADERS = \
     gstnetclientclock.h \
     gstnetcontrolmessagemeta.h \
     gstnettimepacket.h \
-    gstnettimeprovider.h
+    gstnettimeprovider.h \
+    gstptpclock.h
 
 libgstnet_@GST_API_VERSION@_la_SOURCES = \
     gstnetaddressmeta.c \
     gstnetclientclock.c \
     gstnetcontrolmessagemeta.c \
     gstnettimepacket.c \
-    gstnettimeprovider.c
+    gstnettimeprovider.c \
+    gstptpclock.c
+
+noinst_HEADERS = gstptp_private.h
 
 libgstnet_@GST_API_VERSION@_la_CFLAGS = $(GST_OBJ_CFLAGS) $(GIO_CFLAGS)
-libgstnet_@GST_API_VERSION@_la_LIBADD = $(GST_OBJ_LIBS) $(GIO_LIBS)
+libgstnet_@GST_API_VERSION@_la_LIBADD = $(GST_OBJ_LIBS) $(GIO_LIBS) \
+       $(top_builddir)/libs/gst/base/libgstbase-@GST_API_VERSION@.la
 libgstnet_@GST_API_VERSION@_la_LDFLAGS = $(GST_LIB_LDFLAGS) $(GST_ALL_LDFLAGS) $(GST_LT_LDFLAGS)
 
+# try to prevent packaging errors
+check-libexecdir-consistency:
+       @if test "${GST_PTP_HELPER_INSTALLED}" != "${libexecdir}/gstreamer-$(GST_API_VERSION)/gst-ptp-helper"; then \
+         echo "*** Inconsistent libexecdir! Please use ./configure --libexecdir=/foo/bar"; \
+         echo "*** to set the libexecdir and not make libexecdir=/foo/bar or the like."; \
+         echo "*** The same goes for prefix, libdir etc."; \
+         echo "*** ${GST_PTP_HELPER_INSTALLED} != ${libexecdir}/gstreamer-$(GST_API_VERSION)/gst-ptp-helper"; \
+         exit 1; \
+       fi
+
+all-local: check-libexecdir-consistency
+
 CLEANFILES = *.gcno *.gcda *.gcov
 
 %.c.gcov: .libs/libgstnet_@GST_API_VERSION@_la-%.gcda %.c
diff --git a/libs/gst/net/gstptp_private.h b/libs/gst/net/gstptp_private.h
new file mode 100644 (file)
index 0000000..18e0e07
--- /dev/null
@@ -0,0 +1,19 @@
+#ifndef __GST_PTP_PRIVATE_H__
+#define __GST_PTP_PRIVATE_H__
+
+#include <glib.h>
+
+enum
+{
+  TYPE_EVENT,
+  TYPE_GENERAL,
+  TYPE_CLOCK_ID
+};
+
+typedef struct
+{
+  guint16 size;
+  guint8 type;
+} StdIOHeader;
+
+#endif /* __GST_PTP_PRIVATE_H__ */
diff --git a/libs/gst/net/gstptpclock.c b/libs/gst/net/gstptpclock.c
new file mode 100644 (file)
index 0000000..b4fc7e4
--- /dev/null
@@ -0,0 +1,2431 @@
+/* GStreamer
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:gstptpclock
+ * @short_description: Special clock that synchronizes to a remote time
+ *                     provider via PTP (IEEE1588:2008).
+ * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
+ *
+ * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
+ * mode, that allows a GStreamer pipeline to synchronize to a PTP network
+ * clock in some specific domain.
+ *
+ * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
+ * a helper process to do the actual communication via the PTP ports. This is
+ * required as PTP listens on ports < 1024 and thus requires special
+ * privileges. Once this helper process is started, the main process will
+ * synchronize to all PTP domains that are detected on the selected
+ * interfaces.
+ *
+ * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
+ * time from a master clock inside a specific PTP domain. This clock will only
+ * return valid timestamps once the timestamps in the PTP domain are known. To
+ * check this, the GstPtpClock::internal-clock property and the related
+ * notify::clock signal can be used. Once the internal clock is not NULL, the
+ * PTP domain's time is known. Alternatively you can wait for this with
+ * gst_ptp_clock_wait_ready().
+ *
+ *
+ * To gather statistics about the PTP clock synchronization,
+ * gst_ptp_statistics_callback_add() can be used. This gives the application
+ * the possibility to collect all kinds of statistics from the clock
+ * synchronization.
+ *
+ * Since: 1.6
+ *
+ */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstptpclock.h"
+
+#ifdef HAVE_PTP
+
+#include "gstptp_private.h"
+
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <gst/base/base.h>
+
+GST_DEBUG_CATEGORY_STATIC (ptp_debug);
+#define GST_CAT_DEFAULT (ptp_debug)
+
+/* IEEE 1588 7.7.3.1 */
+#define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
+
+#define MAX_SKIPPED_UPDATES 5
+
+typedef enum
+{
+  PTP_MESSAGE_TYPE_SYNC = 0x0,
+  PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
+  PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
+  PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
+  PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
+  PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
+  PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
+  PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
+  PTP_MESSAGE_TYPE_SIGNALING = 0xC,
+  PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
+} PtpMessageType;
+
+typedef struct
+{
+  guint64 seconds_field;        /* 48 bits valid */
+  guint32 nanoseconds_field;
+} PtpTimestamp;
+
+#define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
+#define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
+#define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
+
+typedef struct
+{
+  guint64 clock_identity;
+  guint16 port_number;
+} PtpClockIdentity;
+
+static gint
+compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
+{
+  if (a->clock_identity < b->clock_identity)
+    return -1;
+  else if (a->clock_identity > b->clock_identity)
+    return 1;
+
+  if (a->port_number < b->port_number)
+    return -1;
+  else if (a->port_number > b->port_number)
+    return 1;
+
+  return 0;
+}
+
+typedef struct
+{
+  guint8 clock_class;
+  guint8 clock_accuracy;
+  guint16 offset_scaled_log_variance;
+} PtpClockQuality;
+
+typedef struct
+{
+  guint8 transport_specific;
+  PtpMessageType message_type;
+  /* guint8 reserved; */
+  guint8 version_ptp;
+  guint16 message_length;
+  guint8 domain_number;
+  /* guint8 reserved; */
+  guint16 flag_field;
+  gint64 correction_field;      /* 48.16 fixed point nanoseconds */
+  /* guint32 reserved; */
+  PtpClockIdentity source_port_identity;
+  guint16 sequence_id;
+  guint8 control_field;
+  gint8 log_message_interval;
+
+  union
+  {
+    struct
+    {
+      PtpTimestamp origin_timestamp;
+      gint16 current_utc_offset;
+      /* guint8 reserved; */
+      guint8 grandmaster_priority_1;
+      PtpClockQuality grandmaster_clock_quality;
+      guint8 grandmaster_priority_2;
+      guint64 grandmaster_identity;
+      guint16 steps_removed;
+      guint8 time_source;
+    } announce;
+
+    struct
+    {
+      PtpTimestamp origin_timestamp;
+    } sync;
+
+    struct
+    {
+      PtpTimestamp precise_origin_timestamp;
+    } follow_up;
+
+    struct
+    {
+      PtpTimestamp origin_timestamp;
+    } delay_req;
+
+    struct
+    {
+      PtpTimestamp receive_timestamp;
+      PtpClockIdentity requesting_port_identity;
+    } delay_resp;
+
+  } message_specific;
+} PtpMessage;
+
+static GMutex ptp_lock;
+static GCond ptp_cond;
+static gboolean initted = FALSE;
+static gboolean supported = TRUE;
+static GPid ptp_helper_pid;
+static GThread *ptp_helper_thread;
+static GMainContext *main_context;
+static GMainLoop *main_loop;
+static GIOChannel *stdin_channel, *stdout_channel;
+static GRand *delay_req_rand;
+static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
+
+typedef struct
+{
+  GstClockTime receive_time;
+
+  PtpClockIdentity master_clock_identity;
+
+  guint8 grandmaster_priority_1;
+  PtpClockQuality grandmaster_clock_quality;
+  guint8 grandmaster_priority_2;
+  guint64 grandmaster_identity;
+  guint16 steps_removed;
+  guint8 time_source;
+
+  guint16 sequence_id;
+} PtpAnnounceMessage;
+
+typedef struct
+{
+  PtpClockIdentity master_clock_identity;
+
+  GstClockTime announce_interval;       /* last interval we received */
+  GQueue announce_messages;
+} PtpAnnounceSender;
+
+typedef struct
+{
+  guint domain;
+  PtpClockIdentity master_clock_identity;
+
+  guint16 sync_seqnum;
+  GstClockTime sync_recv_time_local;    /* t2 */
+  GstClockTime sync_send_time_remote;   /* t1, might be -1 if FOLLOW_UP pending */
+  GstClockTime follow_up_recv_time_local;
+
+  GSource *timeout_source;
+  guint16 delay_req_seqnum;
+  GstClockTime delay_req_send_time_local;       /* t3, -1 if we wait for FOLLOW_UP */
+  GstClockTime delay_req_recv_time_remote;      /* t4, -1 if we wait */
+  GstClockTime delay_resp_recv_time_local;
+
+  gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
+  gint64 correction_field_delay;        /* sum of the correction fields of DELAY_RESP */
+} PtpPendingSync;
+
+static void
+ptp_pending_sync_free (PtpPendingSync * sync)
+{
+  if (sync->timeout_source)
+    g_source_destroy (sync->timeout_source);
+  g_free (sync);
+}
+
+typedef struct
+{
+  guint domain;
+
+  GstClockTime last_ptp_time;
+  GstClockTime last_local_time;
+  gint skipped_updates;
+
+  /* Used for selecting the master/grandmaster */
+  GList *announce_senders;
+
+  /* Last selected master clock */
+  gboolean have_master_clock;
+  PtpClockIdentity master_clock_identity;
+  guint64 grandmaster_identity;
+
+  /* Last SYNC or FOLLOW_UP timestamp we received */
+  GstClockTime last_ptp_sync_time;
+  GstClockTime sync_interval;
+
+  GstClockTime mean_path_delay;
+  GstClockTime last_delay_req, min_delay_req_interval;
+  guint16 last_delay_req_seqnum;
+
+  GQueue pending_syncs;
+
+  GstClock *domain_clock;
+} PtpDomainData;
+
+static GList *domain_data;
+static GMutex domain_clocks_lock;
+static GList *domain_clocks;
+
+/* Protected by PTP lock */
+static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
+static GHookList domain_stats_hooks;
+static gint domain_stats_n_hooks;
+static gboolean domain_stats_hooks_initted = FALSE;
+
+/* Converts log2 seconds to GstClockTime */
+static GstClockTime
+log2_to_clock_time (gint l)
+{
+  if (l < 0)
+    return GST_SECOND >> (-l);
+  else
+    return GST_SECOND << l;
+}
+
+static void
+dump_ptp_message (PtpMessage * msg)
+{
+  GST_TRACE ("PTP message:");
+  GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
+  GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
+  GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
+  GST_TRACE ("\tmessage_length: %u", msg->message_length);
+  GST_TRACE ("\tdomain_number: %u", msg->domain_number);
+  GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
+  GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
+      (msg->correction_field / 65536),
+      (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
+  GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
+      msg->source_port_identity.clock_identity,
+      msg->source_port_identity.port_number);
+  GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
+  GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
+  GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
+
+  switch (msg->message_type) {
+    case PTP_MESSAGE_TYPE_ANNOUNCE:
+      GST_TRACE ("\tANNOUNCE:");
+      GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+          msg->message_specific.announce.origin_timestamp.seconds_field,
+          msg->message_specific.announce.origin_timestamp.nanoseconds_field);
+      GST_TRACE ("\t\tcurrent_utc_offset: %d",
+          msg->message_specific.announce.current_utc_offset);
+      GST_TRACE ("\t\tgrandmaster_priority_1: %u",
+          msg->message_specific.announce.grandmaster_priority_1);
+      GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
+          msg->message_specific.announce.grandmaster_clock_quality.clock_class,
+          msg->message_specific.announce.
+          grandmaster_clock_quality.clock_accuracy,
+          msg->message_specific.announce.
+          grandmaster_clock_quality.offset_scaled_log_variance);
+      GST_TRACE ("\t\tgrandmaster_priority_2: %u",
+          msg->message_specific.announce.grandmaster_priority_2);
+      GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
+          msg->message_specific.announce.grandmaster_identity);
+      GST_TRACE ("\t\tsteps_removed: %u",
+          msg->message_specific.announce.steps_removed);
+      GST_TRACE ("\t\ttime_source: 0x%02x",
+          msg->message_specific.announce.time_source);
+      break;
+    case PTP_MESSAGE_TYPE_SYNC:
+      GST_TRACE ("\tSYNC:");
+      GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+          msg->message_specific.sync.origin_timestamp.seconds_field,
+          msg->message_specific.sync.origin_timestamp.nanoseconds_field);
+      break;
+    case PTP_MESSAGE_TYPE_FOLLOW_UP:
+      GST_TRACE ("\tFOLLOW_UP:");
+      GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+          msg->message_specific.follow_up.
+          precise_origin_timestamp.seconds_field,
+          msg->message_specific.follow_up.
+          precise_origin_timestamp.nanoseconds_field);
+      break;
+    case PTP_MESSAGE_TYPE_DELAY_REQ:
+      GST_TRACE ("\tDELAY_REQ:");
+      GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+          msg->message_specific.delay_req.origin_timestamp.seconds_field,
+          msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
+      break;
+    case PTP_MESSAGE_TYPE_DELAY_RESP:
+      GST_TRACE ("\tDELAY_RESP:");
+      GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
+          msg->message_specific.delay_resp.receive_timestamp.seconds_field,
+          msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
+      GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
+          "x %u",
+          msg->message_specific.delay_resp.
+          requesting_port_identity.clock_identity,
+          msg->message_specific.delay_resp.
+          requesting_port_identity.port_number);
+      break;
+    default:
+      break;
+  }
+  GST_TRACE (" ");
+}
+
+/* IEEE 1588-2008 5.3.3 */
+static gboolean
+parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
+{
+  g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
+
+  timestamp->seconds_field =
+      (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
+      gst_byte_reader_get_uint16_be_unchecked (reader);
+  timestamp->nanoseconds_field =
+      gst_byte_reader_get_uint32_be_unchecked (reader);
+
+  if (timestamp->nanoseconds_field >= 1000000000)
+    return FALSE;
+
+  return TRUE;
+}
+
+/* IEEE 1588-2008 13.3 */
+static gboolean
+parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
+{
+  guint8 b;
+
+  g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
+
+  b = gst_byte_reader_get_uint8_unchecked (reader);
+  msg->transport_specific = b >> 4;
+  msg->message_type = b & 0x0f;
+
+  b = gst_byte_reader_get_uint8_unchecked (reader);
+  msg->version_ptp = b & 0x0f;
+  if (msg->version_ptp != 2) {
+    GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
+    return FALSE;
+  }
+
+  msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
+  if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
+    GST_WARNING ("Not enough data (%u < %u)",
+        gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
+    return FALSE;
+  }
+
+  msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
+  gst_byte_reader_skip_unchecked (reader, 1);
+
+  msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
+  msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
+  gst_byte_reader_skip_unchecked (reader, 4);
+
+  msg->source_port_identity.clock_identity =
+      gst_byte_reader_get_uint64_be_unchecked (reader);
+  msg->source_port_identity.port_number =
+      gst_byte_reader_get_uint16_be_unchecked (reader);
+
+  msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
+  msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
+  msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
+
+  return TRUE;
+}
+
+/* IEEE 1588-2008 13.5 */
+static gboolean
+parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
+{
+  g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
+
+  if (gst_byte_reader_get_remaining (reader) < 20)
+    return FALSE;
+
+  if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
+          reader))
+    return FALSE;
+
+  msg->message_specific.announce.current_utc_offset =
+      gst_byte_reader_get_uint16_be_unchecked (reader);
+  gst_byte_reader_skip_unchecked (reader, 1);
+
+  msg->message_specific.announce.grandmaster_priority_1 =
+      gst_byte_reader_get_uint8_unchecked (reader);
+  msg->message_specific.announce.grandmaster_clock_quality.clock_class =
+      gst_byte_reader_get_uint8_unchecked (reader);
+  msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
+      gst_byte_reader_get_uint8_unchecked (reader);
+  msg->message_specific.announce.
+      grandmaster_clock_quality.offset_scaled_log_variance =
+      gst_byte_reader_get_uint16_be_unchecked (reader);
+  msg->message_specific.announce.grandmaster_priority_2 =
+      gst_byte_reader_get_uint8_unchecked (reader);
+  msg->message_specific.announce.grandmaster_identity =
+      gst_byte_reader_get_uint64_be_unchecked (reader);
+  msg->message_specific.announce.steps_removed =
+      gst_byte_reader_get_uint16_be_unchecked (reader);
+  msg->message_specific.announce.time_source =
+      gst_byte_reader_get_uint8_unchecked (reader);
+
+  return TRUE;
+}
+
+/* IEEE 1588-2008 13.6 */
+static gboolean
+parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
+{
+  g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
+
+  if (gst_byte_reader_get_remaining (reader) < 10)
+    return FALSE;
+
+  if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
+          reader))
+    return FALSE;
+
+  return TRUE;
+}
+
+/* IEEE 1588-2008 13.6 */
+static gboolean
+parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
+{
+  g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
+
+  if (gst_byte_reader_get_remaining (reader) < 10)
+    return FALSE;
+
+  if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
+          reader))
+    return FALSE;
+
+  return TRUE;
+}
+
+/* IEEE 1588-2008 13.7 */
+static gboolean
+parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
+{
+  g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
+
+  if (gst_byte_reader_get_remaining (reader) < 10)
+    return FALSE;
+
+  if (!parse_ptp_timestamp (&msg->message_specific.
+          follow_up.precise_origin_timestamp, reader))
+    return FALSE;
+
+  return TRUE;
+}
+
+/* IEEE 1588-2008 13.8 */
+static gboolean
+parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
+{
+  g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
+      FALSE);
+
+  if (gst_byte_reader_get_remaining (reader) < 20)
+    return FALSE;
+
+  if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
+          reader))
+    return FALSE;
+
+  msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
+      gst_byte_reader_get_uint64_be_unchecked (reader);
+  msg->message_specific.delay_resp.requesting_port_identity.port_number =
+      gst_byte_reader_get_uint16_be_unchecked (reader);
+
+  return TRUE;
+}
+
+static gboolean
+parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
+{
+  GstByteReader reader;
+  gboolean ret = FALSE;
+
+  gst_byte_reader_init (&reader, data, size);
+
+  if (!parse_ptp_message_header (msg, &reader)) {
+    GST_WARNING ("Failed to parse PTP message header");
+    return FALSE;
+  }
+
+  switch (msg->message_type) {
+    case PTP_MESSAGE_TYPE_SYNC:
+      ret = parse_ptp_message_sync (msg, &reader);
+      break;
+    case PTP_MESSAGE_TYPE_FOLLOW_UP:
+      ret = parse_ptp_message_follow_up (msg, &reader);
+      break;
+    case PTP_MESSAGE_TYPE_DELAY_REQ:
+      ret = parse_ptp_message_delay_req (msg, &reader);
+      break;
+    case PTP_MESSAGE_TYPE_DELAY_RESP:
+      ret = parse_ptp_message_delay_resp (msg, &reader);
+      break;
+    case PTP_MESSAGE_TYPE_ANNOUNCE:
+      ret = parse_ptp_message_announce (msg, &reader);
+      break;
+    default:
+      /* ignore for now */
+      break;
+  }
+
+  return ret;
+}
+
+static gint
+compare_announce_message (const PtpAnnounceMessage * a,
+    const PtpAnnounceMessage * b)
+{
+  /* IEEE 1588 Figure 27 */
+  if (a->grandmaster_identity == b->grandmaster_identity) {
+    if (a->steps_removed + 1 < b->steps_removed)
+      return -1;
+    else if (a->steps_removed > b->steps_removed + 1)
+      return 1;
+
+    /* Error cases are filtered out earlier */
+    if (a->steps_removed < b->steps_removed)
+      return -1;
+    else if (a->steps_removed > b->steps_removed)
+      return 1;
+
+    /* Error cases are filtered out earlier */
+    if (a->master_clock_identity.clock_identity <
+        b->master_clock_identity.clock_identity)
+      return -1;
+    else if (a->master_clock_identity.clock_identity >
+        b->master_clock_identity.clock_identity)
+      return 1;
+
+    /* Error cases are filtered out earlier */
+    if (a->master_clock_identity.port_number <
+        b->master_clock_identity.port_number)
+      return -1;
+    else if (a->master_clock_identity.port_number >
+        b->master_clock_identity.port_number)
+      return 1;
+    else
+      g_assert_not_reached ();
+
+    return 0;
+  }
+
+  if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
+    return -1;
+  else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
+    return 1;
+
+  if (a->grandmaster_clock_quality.clock_class <
+      b->grandmaster_clock_quality.clock_class)
+    return -1;
+  else if (a->grandmaster_clock_quality.clock_class >
+      b->grandmaster_clock_quality.clock_class)
+    return 1;
+
+  if (a->grandmaster_clock_quality.clock_accuracy <
+      b->grandmaster_clock_quality.clock_accuracy)
+    return -1;
+  else if (a->grandmaster_clock_quality.clock_accuracy >
+      b->grandmaster_clock_quality.clock_accuracy)
+    return 1;
+
+  if (a->grandmaster_clock_quality.offset_scaled_log_variance <
+      b->grandmaster_clock_quality.offset_scaled_log_variance)
+    return -1;
+  else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
+      b->grandmaster_clock_quality.offset_scaled_log_variance)
+    return 1;
+
+  if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
+    return -1;
+  else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
+    return 1;
+
+  if (a->grandmaster_identity < b->grandmaster_identity)
+    return -1;
+  else if (a->grandmaster_identity > b->grandmaster_identity)
+    return 1;
+  else
+    g_assert_not_reached ();
+
+  return 0;
+}
+
+static void
+select_best_master_clock (PtpDomainData * domain, GstClockTime now)
+{
+  GList *qualified_messages = NULL;
+  GList *l, *m;
+  PtpAnnounceMessage *best = NULL;
+
+  /* IEEE 1588 9.3.2.5 */
+  for (l = domain->announce_senders; l; l = l->next) {
+    PtpAnnounceSender *sender = l->data;
+    GstClockTime window = 4 * sender->announce_interval;
+    gint count = 0;
+
+    for (m = sender->announce_messages.head; m; m = m->next) {
+      PtpAnnounceMessage *msg = m->data;
+
+      if (now - msg->receive_time <= window)
+        count++;
+    }
+
+    /* Only include the newest message of announce senders that had at least 2
+     * announce messages in the last 4 announce intervals. Which also means
+     * that we wait at least 4 announce intervals before we select a master
+     * clock. Until then we just report based on the newest SYNC we received
+     */
+    if (count >= 2) {
+      qualified_messages =
+          g_list_prepend (qualified_messages,
+          g_queue_peek_tail (&sender->announce_messages));
+    }
+  }
+
+  if (!qualified_messages) {
+    GST_DEBUG
+        ("No qualified announce messages for domain %u, can't select a master clock",
+        domain->domain);
+    domain->have_master_clock = FALSE;
+    return;
+  }
+
+  for (l = qualified_messages; l; l = l->next) {
+    PtpAnnounceMessage *msg = l->data;
+
+    if (!best || compare_announce_message (msg, best) < 0)
+      best = msg;
+  }
+
+  if (domain->have_master_clock
+      && compare_clock_identity (&domain->master_clock_identity,
+          &best->master_clock_identity) == 0) {
+    GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
+  } else {
+    GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
+        "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
+        domain->domain, best->master_clock_identity.clock_identity,
+        best->master_clock_identity.port_number, best->grandmaster_identity);
+
+    domain->have_master_clock = TRUE;
+    domain->grandmaster_identity = best->grandmaster_identity;
+
+    /* Opportunistic master clock selection likely gave us the same master
+     * clock before, no need to reset all statistics */
+    if (compare_clock_identity (&domain->master_clock_identity,
+            &best->master_clock_identity) != 0) {
+      memcpy (&domain->master_clock_identity, &best->master_clock_identity,
+          sizeof (PtpClockIdentity));
+      domain->mean_path_delay = 0;
+      domain->last_delay_req = 0;
+      domain->min_delay_req_interval = 0;
+      domain->sync_interval = 0;
+      domain->last_ptp_sync_time = 0;
+      domain->skipped_updates = 0;
+      g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
+          NULL);
+      g_queue_clear (&domain->pending_syncs);
+    }
+
+    if (g_atomic_int_get (&domain_stats_n_hooks)) {
+      GstStructure *stats =
+          gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
+          "domain", G_TYPE_UINT, domain->domain,
+          "master-clock-id", G_TYPE_UINT64,
+          domain->master_clock_identity.clock_identity,
+          "master-clock-port", G_TYPE_UINT,
+          domain->master_clock_identity.port_number,
+          "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
+          NULL);
+      emit_ptp_statistics (domain->domain, stats);
+      gst_structure_free (stats);
+    }
+  }
+}
+
+static void
+handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
+{
+  GList *l;
+  PtpDomainData *domain = NULL;
+  PtpAnnounceSender *sender = NULL;
+  PtpAnnounceMessage *announce;
+
+  /* IEEE1588 9.3.2.2 e)
+   * Don't consider messages with the alternate master flag set
+   */
+  if ((msg->flag_field & 0x0100))
+    return;
+
+  /* IEEE 1588 9.3.2.5 d)
+   * Don't consider announce messages with steps_removed>=255
+   */
+  if (msg->message_specific.announce.steps_removed >= 255)
+    return;
+
+  for (l = domain_data; l; l = l->next) {
+    PtpDomainData *tmp = l->data;
+
+    if (tmp->domain == msg->domain_number) {
+      domain = tmp;
+      break;
+    }
+  }
+
+  if (!domain) {
+    gchar *clock_name;
+
+    domain = g_new0 (PtpDomainData, 1);
+    domain->domain = msg->domain_number;
+    clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
+    domain->domain_clock =
+        g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
+    g_free (clock_name);
+    g_queue_init (&domain->pending_syncs);
+    domain_data = g_list_prepend (domain_data, domain);
+
+    g_mutex_lock (&domain_clocks_lock);
+    domain_clocks = g_list_prepend (domain_clocks, domain);
+    g_mutex_unlock (&domain_clocks_lock);
+
+    if (g_atomic_int_get (&domain_stats_n_hooks)) {
+      GstStructure *stats =
+          gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
+          G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
+          domain->domain_clock, NULL);
+      emit_ptp_statistics (domain->domain, stats);
+      gst_structure_free (stats);
+    }
+  }
+
+  for (l = domain->announce_senders; l; l = l->next) {
+    PtpAnnounceSender *tmp = l->data;
+
+    if (compare_clock_identity (&tmp->master_clock_identity,
+            &msg->source_port_identity) == 0) {
+      sender = tmp;
+      break;
+    }
+  }
+
+  if (!sender) {
+    sender = g_new0 (PtpAnnounceSender, 1);
+
+    memcpy (&sender->master_clock_identity, &msg->source_port_identity,
+        sizeof (PtpClockIdentity));
+    g_queue_init (&sender->announce_messages);
+    domain->announce_senders =
+        g_list_prepend (domain->announce_senders, sender);
+  }
+
+  for (l = sender->announce_messages.head; l; l = l->next) {
+    PtpAnnounceMessage *tmp = l->data;
+
+    /* IEEE 1588 9.3.2.5 c)
+     * Don't consider identical messages, i.e. duplicates
+     */
+    if (tmp->sequence_id == msg->sequence_id)
+      return;
+  }
+
+  sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
+
+  announce = g_new0 (PtpAnnounceMessage, 1);
+  announce->receive_time = receive_time;
+  announce->sequence_id = msg->sequence_id;
+  memcpy (&announce->master_clock_identity, &msg->source_port_identity,
+      sizeof (PtpClockIdentity));
+  announce->grandmaster_identity =
+      msg->message_specific.announce.grandmaster_identity;
+  announce->grandmaster_priority_1 =
+      msg->message_specific.announce.grandmaster_priority_1;
+  announce->grandmaster_clock_quality.clock_class =
+      msg->message_specific.announce.grandmaster_clock_quality.clock_class;
+  announce->grandmaster_clock_quality.clock_accuracy =
+      msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
+  announce->grandmaster_clock_quality.offset_scaled_log_variance =
+      msg->message_specific.announce.
+      grandmaster_clock_quality.offset_scaled_log_variance;
+  announce->grandmaster_priority_2 =
+      msg->message_specific.announce.grandmaster_priority_2;
+  announce->steps_removed = msg->message_specific.announce.steps_removed;
+  announce->time_source = msg->message_specific.announce.time_source;
+  g_queue_push_tail (&sender->announce_messages, announce);
+
+  select_best_master_clock (domain, receive_time);
+}
+
+static gboolean
+send_delay_req_timeout (PtpPendingSync * sync)
+{
+  StdIOHeader header = { 0, };
+  guint8 delay_req[44];
+  GstByteWriter writer;
+  GIOStatus status;
+  gsize written;
+  GError *err = NULL;
+
+  header.type = TYPE_EVENT;
+  header.size = 44;
+
+  gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
+  gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
+  gst_byte_writer_put_uint8_unchecked (&writer, 2);
+  gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
+  gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
+  gst_byte_writer_put_uint8_unchecked (&writer, 0);
+  gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
+  gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
+  gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
+  gst_byte_writer_put_uint64_be_unchecked (&writer,
+      ptp_clock_id.clock_identity);
+  gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
+  gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
+  gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
+  gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
+  gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
+  gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
+
+  status =
+      g_io_channel_write_chars (stdout_channel, (gchar *) & header,
+      sizeof (header), &written, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_warning ("Failed to write to stdout: %s", err->message);
+    return G_SOURCE_REMOVE;
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdout");
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_warning ("Unexpected stdout write status: %d", status);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (written != sizeof (header)) {
+    g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  }
+
+  sync->delay_req_send_time_local = gst_util_get_timestamp ();
+
+  status =
+      g_io_channel_write_chars (stdout_channel,
+      (const gchar *) delay_req, 44, &written, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    g_warning ("Failed to write to stdout: %s", err->message);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status == G_IO_STATUS_EOF) {
+    g_message ("EOF on stdout");
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status != G_IO_STATUS_NORMAL) {
+    g_warning ("Unexpected stdout write status: %d", status);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (written != 44) {
+    g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  }
+
+  return G_SOURCE_REMOVE;
+}
+
+static gboolean
+send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
+{
+  GstClockTime now = gst_util_get_timestamp ();
+  guint timeout;
+  GSource *timeout_source;
+
+  if (domain->last_delay_req != 0
+      && domain->last_delay_req + domain->min_delay_req_interval > now)
+    return FALSE;
+
+  domain->last_delay_req = now;
+  sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
+
+  /* IEEE 1588 9.5.11.2 */
+  if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
+    timeout = 0;
+  else
+    timeout =
+        g_rand_int_range (delay_req_rand, 0,
+        (domain->min_delay_req_interval * 2) / GST_MSECOND);
+
+  sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
+  g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
+      sync, NULL);
+  g_source_attach (timeout_source, main_context);
+
+  return TRUE;
+}
+
+/* Filtering of outliers for RTT and time calculations inspired
+ * by the code from gstnetclientclock.c
+ */
+static void
+update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
+{
+  GstClockTime internal_time, external_time, rate_num, rate_den;
+  GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
+      orig_rate_den;
+  GstClockTime corrected_ptp_time, corrected_local_time;
+  GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
+  gdouble r_squared;
+  gboolean synced, now_synced;
+  GstClockTimeDiff discont = 0;
+  GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE, new_estimated_ptp_time;
+
+  /* We check this here and when updating the mean path delay, because
+   * we can get here without a delay response too */
+  if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
+      && sync->follow_up_recv_time_local >
+      sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
+    GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
+        " > 2 * %" GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (sync->follow_up_recv_time_local),
+        GST_TIME_ARGS (domain->mean_path_delay));
+    goto out;
+  }
+
+  /* IEEE 1588 11.2 */
+  corrected_ptp_time =
+      sync->sync_send_time_remote +
+      (sync->correction_field_sync + 32768) / 65536;
+  corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
+
+  /* Set an initial local-remote relation */
+  if (domain->last_ptp_time == 0)
+    gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
+        corrected_ptp_time, 1, 1);
+
+  /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
+   * estimate with our present knowledge about the clock
+   */
+  /* Store what the clock produced as 'now' before this update */
+  gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
+      &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
+  internal_time = orig_internal_time;
+  external_time = orig_external_time;
+  rate_num = orig_rate_num;
+  rate_den = orig_rate_den;
+
+  /* 3/4 RTT window around the estimation */
+  max_discont = domain->mean_path_delay * 3 / 2;
+
+  /* Check if the estimated sync time is inside our window */
+  estimated_ptp_time_min = corrected_local_time - max_discont;
+  estimated_ptp_time_min =
+      gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
+      estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
+  estimated_ptp_time_max = corrected_local_time + max_discont;
+  estimated_ptp_time_max =
+      gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
+      estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
+
+  synced = (estimated_ptp_time_min < corrected_ptp_time
+      && corrected_ptp_time < estimated_ptp_time_max);
+
+  GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
+      GST_TIME_FORMAT, domain->domain,
+      GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
+
+  GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
+      GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
+      GST_TIME_ARGS (corrected_ptp_time),
+      GST_TIME_ARGS (estimated_ptp_time_max));
+
+  if (gst_clock_add_observation_unapplied (domain->domain_clock,
+          corrected_local_time, corrected_ptp_time, &r_squared,
+          &internal_time, &external_time, &rate_num, &rate_den)) {
+    GST_DEBUG ("Regression gave r_squared: %f", r_squared);
+
+    /* Old estimated PTP time based on receive time and path delay */
+    estimated_ptp_time = corrected_local_time;
+    estimated_ptp_time =
+        gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+        (domain->domain_clock), estimated_ptp_time, orig_internal_time,
+        orig_external_time, orig_rate_num, orig_rate_den);
+
+    /* New estimated PTP time based on receive time and path delay */
+    new_estimated_ptp_time = corrected_local_time;
+    new_estimated_ptp_time =
+        gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+        (domain->domain_clock), new_estimated_ptp_time, internal_time,
+        external_time, rate_num, rate_den);
+
+    discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
+    if (synced && ABS (discont) > max_discont) {
+      GstClockTimeDiff offset;
+      GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
+          ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
+          (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
+          GST_TIME_ARGS (max_discont));
+      if (discont > 0) {        /* Too large a forward step - add a -ve offset */
+        offset = max_discont - discont;
+        if (-offset > external_time)
+          external_time = 0;
+        else
+          external_time += offset;
+      } else {                  /* Too large a backward step - add a +ve offset */
+        offset = -(max_discont + discont);
+        external_time += offset;
+      }
+
+      discont += offset;
+    } else {
+      GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
+          (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
+          GST_TIME_ARGS (max_discont));
+    }
+
+    /* Check if the estimated sync time is now (still) inside our window */
+    estimated_ptp_time_min = corrected_local_time - max_discont;
+    estimated_ptp_time_min =
+        gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+        (domain->domain_clock), estimated_ptp_time_min, internal_time,
+        external_time, rate_num, rate_den);
+    estimated_ptp_time_max = corrected_local_time + max_discont;
+    estimated_ptp_time_max =
+        gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+        (domain->domain_clock), estimated_ptp_time_max, internal_time,
+        external_time, rate_num, rate_den);
+
+    now_synced = (estimated_ptp_time_min < corrected_ptp_time
+        && corrected_ptp_time < estimated_ptp_time_max);
+
+    GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
+        GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
+        GST_TIME_ARGS (corrected_ptp_time),
+        GST_TIME_ARGS (estimated_ptp_time_max));
+
+    if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
+      gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
+          internal_time, external_time, rate_num, rate_den);
+      domain->skipped_updates = 0;
+
+      domain->last_ptp_time = corrected_ptp_time;
+      domain->last_local_time = corrected_local_time;
+    } else {
+      domain->skipped_updates++;
+    }
+  } else {
+    domain->last_ptp_time = corrected_ptp_time;
+    domain->last_local_time = corrected_local_time;
+  }
+
+out:
+
+  if (g_atomic_int_get (&domain_stats_n_hooks)) {
+    GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
+        "domain", G_TYPE_UINT, domain->domain,
+        "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
+        "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
+        "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
+        "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
+        "discontinuity", G_TYPE_INT64, discont,
+        "synced", G_TYPE_BOOLEAN, synced,
+        "r-squared", G_TYPE_DOUBLE, r_squared,
+        "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
+        "external-time", GST_TYPE_CLOCK_TIME, external_time,
+        "rate-num", G_TYPE_UINT64, rate_num,
+        "rate-den", G_TYPE_UINT64, rate_den,
+        "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
+        NULL);
+    emit_ptp_statistics (domain->domain, stats);
+    gst_structure_free (stats);
+  }
+
+}
+
+static gboolean
+update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
+{
+  GstClockTime mean_path_delay, delay_req_delay;
+  gboolean ret;
+
+  /* IEEE 1588 11.3 */
+  mean_path_delay =
+      (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
+      sync->sync_recv_time_local - sync->delay_req_send_time_local -
+      (sync->correction_field_sync + sync->correction_field_delay +
+          32768) / 65536) / 2;
+
+  /* Track an average round trip time, for a bit of smoothing */
+  /* Always update before discarding a sample, so genuine changes in
+   * the network get picked up, eventually */
+  if (domain->mean_path_delay == 0)
+    domain->mean_path_delay = mean_path_delay;
+  else if (mean_path_delay < domain->mean_path_delay)   /* Shorter RTTs carry more weight than longer */
+    domain->mean_path_delay =
+        (3 * domain->mean_path_delay + mean_path_delay) / 4;
+  else
+    domain->mean_path_delay =
+        (15 * domain->mean_path_delay + mean_path_delay) / 16;
+
+  if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
+      domain->mean_path_delay != 0
+      && sync->follow_up_recv_time_local >
+      sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
+    GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
+        " > 2 * %" GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (sync->follow_up_recv_time_local),
+        GST_TIME_ARGS (domain->mean_path_delay));
+    ret = FALSE;
+    goto out;
+  }
+
+  if (mean_path_delay > 2 * domain->mean_path_delay) {
+    GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
+        " > 2 * %" GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (mean_path_delay),
+        GST_TIME_ARGS (domain->mean_path_delay));
+    ret = FALSE;
+    goto out;
+  }
+
+  delay_req_delay =
+      sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
+  /* delay_req_delay is a RTT, so 2 times the path delay */
+  if (delay_req_delay > 4 * domain->mean_path_delay) {
+    GST_WARNING ("Delay-request-response delay for domain %u too big: %"
+        GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (delay_req_delay),
+        GST_TIME_ARGS (domain->mean_path_delay));
+    ret = FALSE;
+    goto out;
+  }
+
+  ret = TRUE;
+
+  GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
+      GST_TIME_FORMAT ")", domain->domain,
+      GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
+  GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
+      domain->domain, GST_TIME_ARGS (delay_req_delay));
+
+out:
+  if (g_atomic_int_get (&domain_stats_n_hooks)) {
+    GstStructure *stats =
+        gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
+        "domain", G_TYPE_UINT, domain->domain,
+        "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
+        "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
+        "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
+    emit_ptp_statistics (domain->domain, stats);
+    gst_structure_free (stats);
+  }
+
+  return ret;
+}
+
+static void
+handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
+{
+  GList *l;
+  PtpDomainData *domain = NULL;
+  PtpPendingSync *sync = NULL;
+
+  /* Don't consider messages with the alternate master flag set */
+  if ((msg->flag_field & 0x0100))
+    return;
+
+  for (l = domain_data; l; l = l->next) {
+    PtpDomainData *tmp = l->data;
+
+    if (msg->domain_number == tmp->domain) {
+      domain = tmp;
+      break;
+    }
+  }
+
+  if (!domain) {
+    gchar *clock_name;
+    domain = g_new0 (PtpDomainData, 1);
+    domain->domain = msg->domain_number;
+    clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
+    domain->domain_clock =
+        g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
+    g_free (clock_name);
+    g_queue_init (&domain->pending_syncs);
+    domain_data = g_list_prepend (domain_data, domain);
+
+    g_mutex_lock (&domain_clocks_lock);
+    domain_clocks = g_list_prepend (domain_clocks, domain);
+    g_mutex_unlock (&domain_clocks_lock);
+  }
+
+  /* If we have a master clock, ignore this message if it's not coming from there */
+  if (domain->have_master_clock
+      && compare_clock_identity (&domain->master_clock_identity,
+          &msg->source_port_identity) != 0)
+    return;
+
+  /* Opportunistic selection of master clock */
+  if (!domain->have_master_clock)
+    memcpy (&domain->master_clock_identity, &msg->source_port_identity,
+        sizeof (PtpClockIdentity));
+
+  domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
+
+  /* Check if duplicated */
+  for (l = domain->pending_syncs.head; l; l = l->next) {
+    PtpPendingSync *tmp = l->data;
+
+    if (tmp->sync_seqnum == msg->sequence_id)
+      return;
+  }
+
+  if (msg->message_specific.sync.origin_timestamp.seconds_field >
+      GST_CLOCK_TIME_NONE / GST_SECOND) {
+    GST_FIXME ("Unsupported sync message seconds field value: %"
+        G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
+        msg->message_specific.sync.origin_timestamp.seconds_field,
+        GST_CLOCK_TIME_NONE / GST_SECOND);
+    return;
+  }
+
+  sync = g_new0 (PtpPendingSync, 1);
+  sync->domain = domain->domain;
+  sync->sync_seqnum = msg->sequence_id;
+  sync->sync_recv_time_local = receive_time;
+  sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
+  sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
+  sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
+  sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
+  sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
+
+  /* 0.5 correction factor for division later */
+  sync->correction_field_sync = msg->correction_field;
+
+  if ((msg->flag_field & 0x0200)) {
+    /* Wait for FOLLOW_UP */
+  } else {
+    sync->sync_send_time_remote =
+        PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
+        sync.origin_timestamp);
+
+    if (domain->last_ptp_sync_time != 0
+        && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
+      GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
+          GST_TIME_FORMAT, domain->domain,
+          GST_TIME_ARGS (domain->last_ptp_sync_time),
+          GST_TIME_ARGS (sync->sync_send_time_remote));
+      ptp_pending_sync_free (sync);
+      sync = NULL;
+      return;
+    }
+    domain->last_ptp_sync_time = sync->sync_send_time_remote;
+
+    if (send_delay_req (domain, sync)) {
+      /* Sent delay request */
+    } else {
+      update_ptp_time (domain, sync);
+      ptp_pending_sync_free (sync);
+      sync = NULL;
+    }
+  }
+
+  if (sync)
+    g_queue_push_tail (&domain->pending_syncs, sync);
+}
+
+static void
+handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
+{
+  GList *l;
+  PtpDomainData *domain = NULL;
+  PtpPendingSync *sync = NULL;
+
+  /* Don't consider messages with the alternate master flag set */
+  if ((msg->flag_field & 0x0100))
+    return;
+
+  for (l = domain_data; l; l = l->next) {
+    PtpDomainData *tmp = l->data;
+
+    if (msg->domain_number == tmp->domain) {
+      domain = tmp;
+      break;
+    }
+  }
+
+  if (!domain)
+    return;
+
+  /* If we have a master clock, ignore this message if it's not coming from there */
+  if (domain->have_master_clock
+      && compare_clock_identity (&domain->master_clock_identity,
+          &msg->source_port_identity) != 0)
+    return;
+
+  /* Check if we know about this one */
+  for (l = domain->pending_syncs.head; l; l = l->next) {
+    PtpPendingSync *tmp = l->data;
+
+    if (tmp->sync_seqnum == msg->sequence_id) {
+      sync = tmp;
+      break;
+    }
+  }
+
+  if (!sync)
+    return;
+
+  /* Got a FOLLOW_UP for this already */
+  if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
+    return;
+
+  if (sync->sync_recv_time_local >= receive_time) {
+    GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
+        GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (sync->sync_recv_time_local),
+        GST_TIME_ARGS (receive_time));
+    g_queue_remove (&domain->pending_syncs, sync);
+    ptp_pending_sync_free (sync);
+    return;
+  }
+
+  sync->correction_field_sync += msg->correction_field;
+  sync->sync_send_time_remote =
+      PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
+      follow_up.precise_origin_timestamp);
+  sync->follow_up_recv_time_local = receive_time;
+
+  if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
+    GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
+        GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (domain->last_ptp_sync_time),
+        GST_TIME_ARGS (sync->sync_send_time_remote));
+    g_queue_remove (&domain->pending_syncs, sync);
+    ptp_pending_sync_free (sync);
+    sync = NULL;
+    return;
+  }
+  domain->last_ptp_sync_time = sync->sync_send_time_remote;
+
+  if (send_delay_req (domain, sync)) {
+    /* Sent delay request */
+  } else {
+    update_ptp_time (domain, sync);
+    g_queue_remove (&domain->pending_syncs, sync);
+    ptp_pending_sync_free (sync);
+    sync = NULL;
+  }
+}
+
+static void
+handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
+{
+  GList *l;
+  PtpDomainData *domain = NULL;
+  PtpPendingSync *sync = NULL;
+
+  /* Don't consider messages with the alternate master flag set */
+  if ((msg->flag_field & 0x0100))
+    return;
+
+  for (l = domain_data; l; l = l->next) {
+    PtpDomainData *tmp = l->data;
+
+    if (msg->domain_number == tmp->domain) {
+      domain = tmp;
+      break;
+    }
+  }
+
+  if (!domain)
+    return;
+
+  /* If we have a master clock, ignore this message if it's not coming from there */
+  if (domain->have_master_clock
+      && compare_clock_identity (&domain->master_clock_identity,
+          &msg->source_port_identity) != 0)
+    return;
+
+  /* Not for us */
+  if (msg->message_specific.delay_resp.
+      requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
+      || msg->message_specific.delay_resp.
+      requesting_port_identity.port_number != ptp_clock_id.port_number)
+    return;
+
+  domain->min_delay_req_interval =
+      log2_to_clock_time (msg->log_message_interval);
+
+  /* Check if we know about this one */
+  for (l = domain->pending_syncs.head; l; l = l->next) {
+    PtpPendingSync *tmp = l->data;
+
+    if (tmp->delay_req_seqnum == msg->sequence_id) {
+      sync = tmp;
+      break;
+    }
+  }
+
+  if (!sync)
+    return;
+
+  /* Got a DELAY_RESP for this already */
+  if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
+    return;
+
+  if (sync->delay_req_send_time_local > receive_time) {
+    GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
+        GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (sync->delay_req_send_time_local),
+        GST_TIME_ARGS (receive_time));
+    g_queue_remove (&domain->pending_syncs, sync);
+    ptp_pending_sync_free (sync);
+    return;
+  }
+
+  sync->correction_field_delay = msg->correction_field;
+
+  sync->delay_req_recv_time_remote =
+      PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
+      delay_resp.receive_timestamp);
+  sync->delay_resp_recv_time_local = receive_time;
+
+  if (domain->mean_path_delay != 0
+      && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
+    GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
+        GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
+        GST_TIME_ARGS (sync->sync_send_time_remote),
+        GST_TIME_ARGS (sync->delay_req_recv_time_remote));
+    g_queue_remove (&domain->pending_syncs, sync);
+    ptp_pending_sync_free (sync);
+    return;
+  }
+
+  if (update_mean_path_delay (domain, sync))
+    update_ptp_time (domain, sync);
+  g_queue_remove (&domain->pending_syncs, sync);
+  ptp_pending_sync_free (sync);
+}
+
+static void
+handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
+{
+  /* Ignore our own messages */
+  if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
+      msg->source_port_identity.port_number == ptp_clock_id.port_number)
+    return;
+
+  switch (msg->message_type) {
+    case PTP_MESSAGE_TYPE_ANNOUNCE:
+      handle_announce_message (msg, receive_time);
+      break;
+    case PTP_MESSAGE_TYPE_SYNC:
+      handle_sync_message (msg, receive_time);
+      break;
+    case PTP_MESSAGE_TYPE_FOLLOW_UP:
+      handle_follow_up_message (msg, receive_time);
+      break;
+    case PTP_MESSAGE_TYPE_DELAY_RESP:
+      handle_delay_resp_message (msg, receive_time);
+      break;
+    default:
+      break;
+  }
+}
+
+static gboolean
+have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
+    gpointer user_data)
+{
+  GIOStatus status;
+  StdIOHeader header;
+  gchar buffer[8192];
+  GError *err = NULL;
+  gsize read;
+
+  if ((condition & G_IO_STATUS_EOF)) {
+    GST_ERROR ("Got EOF on stdin");
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  }
+
+  status =
+      g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
+      &read, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    GST_ERROR ("Failed to read from stdin: %s", err->message);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status == G_IO_STATUS_EOF) {
+    GST_ERROR ("Got EOF on stdin");
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status != G_IO_STATUS_NORMAL) {
+    GST_ERROR ("Unexpected stdin read status: %d", status);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (read != sizeof (header)) {
+    GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (header.size > 8192) {
+    GST_ERROR ("Unexpected size: %u", header.size);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  }
+
+  status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
+  if (status == G_IO_STATUS_ERROR) {
+    GST_ERROR ("Failed to read from stdin: %s", err->message);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status == G_IO_STATUS_EOF) {
+    GST_ERROR ("EOF on stdin");
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (status != G_IO_STATUS_NORMAL) {
+    GST_ERROR ("Unexpected stdin read status: %d", status);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  } else if (read != header.size) {
+    GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+    g_main_loop_quit (main_loop);
+    return G_SOURCE_REMOVE;
+  }
+
+  switch (header.type) {
+    case TYPE_EVENT:
+    case TYPE_GENERAL:{
+      GstClockTime receive_time = gst_util_get_timestamp ();
+      PtpMessage msg;
+
+      if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
+        dump_ptp_message (&msg);
+        handle_ptp_message (&msg, receive_time);
+      }
+      break;
+    }
+    default:
+    case TYPE_CLOCK_ID:{
+      if (header.size != 8) {
+        GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
+        g_main_loop_quit (main_loop);
+        return G_SOURCE_REMOVE;
+      }
+      g_mutex_lock (&ptp_lock);
+      ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
+      ptp_clock_id.port_number = getpid ();
+      GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
+          ptp_clock_id.clock_identity, ptp_clock_id.port_number);
+      g_cond_signal (&ptp_cond);
+      g_mutex_unlock (&ptp_lock);
+      break;
+    }
+  }
+
+  return G_SOURCE_CONTINUE;
+}
+
+/* Cleanup all announce messages and announce message senders
+ * that are timed out by now, and clean up all pending syncs
+ * that are missing their FOLLOW_UP or DELAY_RESP */
+static gboolean
+cleanup_cb (gpointer data)
+{
+  GstClockTime now = gst_util_get_timestamp ();
+  GList *l, *m, *n;
+
+  for (l = domain_data; l; l = l->next) {
+    PtpDomainData *domain = l->data;
+
+    for (n = domain->announce_senders; n;) {
+      PtpAnnounceSender *sender = n->data;
+      gboolean timed_out = TRUE;
+
+      /* Keep only 5 messages per sender around */
+      while (g_queue_get_length (&sender->announce_messages) > 5) {
+        PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
+        g_free (msg);
+      }
+
+      for (m = sender->announce_messages.head; m; m = m->next) {
+        PtpAnnounceMessage *msg = m->data;
+
+        if (msg->receive_time +
+            sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
+          timed_out = FALSE;
+          break;
+        }
+      }
+
+      if (timed_out) {
+        GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
+            sender->master_clock_identity.clock_identity,
+            sender->master_clock_identity.port_number);
+        g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
+        g_queue_clear (&sender->announce_messages);
+      }
+
+      if (g_queue_get_length (&sender->announce_messages) == 0) {
+        GList *tmp = n->next;
+
+        if (compare_clock_identity (&sender->master_clock_identity,
+                &domain->master_clock_identity) == 0)
+          GST_WARNING ("currently selected master clock timed out");
+        g_free (sender);
+        domain->announce_senders =
+            g_list_delete_link (domain->announce_senders, n);
+        n = tmp;
+      } else {
+        n = n->next;
+      }
+    }
+    select_best_master_clock (domain, now);
+
+    /* Clean up any pending syncs */
+    for (n = domain->pending_syncs.head; n;) {
+      PtpPendingSync *sync = n->data;
+      gboolean timed_out = FALSE;
+
+      /* Time out pending syncs after 4 sync intervals or 10 seconds,
+       * and pending delay reqs after 4 delay req intervals or 10 seconds
+       */
+      if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
+          ((domain->min_delay_req_interval != 0
+                  && sync->delay_req_send_time_local +
+                  4 * domain->min_delay_req_interval < now)
+              || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
+        timed_out = TRUE;
+      } else if ((domain->sync_interval != 0
+              && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
+          || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
+        timed_out = TRUE;
+      }
+
+      if (timed_out) {
+        GList *tmp = n->next;
+        ptp_pending_sync_free (sync);
+        g_queue_delete_link (&domain->pending_syncs, n);
+        n = tmp;
+      } else {
+        n = n->next;
+      }
+    }
+  }
+
+  return G_SOURCE_CONTINUE;
+}
+
+static gpointer
+ptp_helper_main (gpointer data)
+{
+  GSource *cleanup_source;
+
+  GST_DEBUG ("Starting PTP helper loop");
+
+  /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
+  cleanup_source = g_timeout_source_new_seconds (5);
+  g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
+  g_source_attach (cleanup_source, main_context);
+  g_source_unref (cleanup_source);
+
+  g_main_loop_run (main_loop);
+  GST_DEBUG ("Stopped PTP helper loop");
+
+  g_mutex_lock (&ptp_lock);
+  ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
+  ptp_clock_id.port_number = 0;
+  initted = FALSE;
+  g_cond_signal (&ptp_cond);
+  g_mutex_unlock (&ptp_lock);
+
+  return NULL;
+}
+
+/**
+ * gst_ptp_is_supported:
+ *
+ * Check if PTP clocks are generally supported on this system, and if previous
+ * initializations did not fail.
+ *
+ * Returns: %TRUE if PTP clocks are generally supported on this system, and
+ * previous initializations did not fail.
+ *
+ * Since: 1.6
+ */
+gboolean
+gst_ptp_is_supported (void)
+{
+  return supported;
+}
+
+/**
+ * gst_ptp_is_initialized:
+ *
+ * Check if the GStreamer PTP clock subsystem is initialized.
+ *
+ * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
+ *
+ * Since: 1.6
+ */
+gboolean
+gst_ptp_is_initialized (void)
+{
+  return initted;
+}
+
+/**
+ * gst_ptp_init:
+ * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
+ * @interfaces: (transfer none) (array zero-terminated=1): network interfaces to run the clock on
+ *
+ * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
+ * slave-only mode for all domains on the given @interfaces with the
+ * given @clock_id.
+ *
+ * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
+ * generated from the MAC address of the first network interface.
+ *
+ *
+ * This function is automatically called by gst_ptp_clock_new() with default
+ * parameters if it wasn't called before.
+ *
+ * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
+ *
+ * Since: 1.6
+ */
+gboolean
+gst_ptp_init (guint64 clock_id, gchar ** interfaces)
+{
+  gboolean ret;
+  const gchar *env;
+  gchar **argv = NULL;
+  gint argc, argc_c;
+  gint fd_r, fd_w;
+  GError *err = NULL;
+  GSource *stdin_source;
+
+  GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
+
+  g_mutex_lock (&ptp_lock);
+  if (!supported) {
+    GST_ERROR ("PTP not supported");
+    ret = FALSE;
+    goto done;
+  }
+
+  if (initted) {
+    GST_DEBUG ("PTP already initialized");
+    ret = TRUE;
+    goto done;
+  }
+
+  if (ptp_helper_pid) {
+    GST_DEBUG ("PTP currently initializing");
+    goto wait;
+  }
+
+  if (!domain_stats_hooks_initted) {
+    g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
+    domain_stats_hooks_initted = TRUE;
+  }
+
+  argc = 1;
+  if (clock_id != GST_PTP_CLOCK_ID_NONE)
+    argc += 2;
+  if (interfaces != NULL)
+    argc += 2 * g_strv_length (interfaces);
+
+  argv = g_new0 (gchar *, argc + 2);
+  argc_c = 0;
+
+  env = g_getenv ("GST_PTP_HELPER_1_0");
+  if (env == NULL)
+    env = g_getenv ("GST_PTP_HELPER");
+  if (env != NULL && *env != '\0') {
+    GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
+    argv[argc_c++] = g_strdup (env);
+  } else {
+    argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
+  }
+
+  if (clock_id != GST_PTP_CLOCK_ID_NONE) {
+    argv[argc_c++] = g_strdup ("-c");
+    argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
+  }
+
+  if (interfaces != NULL) {
+    gchar **ptr = interfaces;
+
+    while (*ptr) {
+      argv[argc_c++] = g_strdup ("-i");
+      argv[argc_c++] = g_strdup (*ptr);
+      ptr++;
+    }
+  }
+
+  main_context = g_main_context_new ();
+  main_loop = g_main_loop_new (main_context, FALSE);
+
+  ptp_helper_thread =
+      g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
+  if (!ptp_helper_thread) {
+    GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
+    g_clear_error (&err);
+    ret = FALSE;
+    goto done;
+  }
+
+  if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
+          &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
+    GST_ERROR ("Failed to start ptp helper process: %s", err->message);
+    g_clear_error (&err);
+    ret = FALSE;
+    supported = FALSE;
+    goto done;
+  }
+
+  stdin_channel = g_io_channel_unix_new (fd_r);
+  g_io_channel_set_encoding (stdin_channel, NULL, NULL);
+  g_io_channel_set_buffered (stdin_channel, FALSE);
+  g_io_channel_set_close_on_unref (stdin_channel, TRUE);
+  stdin_source =
+      g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
+  g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
+      NULL);
+  g_source_attach (stdin_source, main_context);
+  g_source_unref (stdin_source);
+
+  /* Create stdout channel */
+  stdout_channel = g_io_channel_unix_new (fd_w);
+  g_io_channel_set_encoding (stdout_channel, NULL, NULL);
+  g_io_channel_set_close_on_unref (stdout_channel, TRUE);
+  g_io_channel_set_buffered (stdout_channel, FALSE);
+
+  delay_req_rand = g_rand_new ();
+
+  initted = TRUE;
+
+wait:
+  GST_DEBUG ("Waiting for PTP to be initialized");
+
+  while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
+    g_cond_wait (&ptp_cond, &ptp_lock);
+
+  ret = initted;
+  if (ret) {
+    GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
+        ptp_clock_id.clock_identity, ptp_clock_id.port_number);
+  } else {
+    GST_ERROR ("Failed to initialize");
+    supported = FALSE;
+  }
+
+done:
+  g_strfreev (argv);
+
+  if (!ret) {
+    if (ptp_helper_pid) {
+      kill (ptp_helper_pid, SIGKILL);
+      waitpid (ptp_helper_pid, NULL, 0);
+      g_spawn_close_pid (ptp_helper_pid);
+    }
+    ptp_helper_pid = 0;
+
+    if (stdin_channel)
+      g_io_channel_unref (stdin_channel);
+    stdin_channel = NULL;
+    if (stdout_channel)
+      g_io_channel_unref (stdout_channel);
+    stdout_channel = NULL;
+
+    if (main_loop && ptp_helper_thread) {
+      g_main_loop_quit (main_loop);
+      g_thread_join (ptp_helper_thread);
+    }
+    ptp_helper_thread = NULL;
+    if (main_loop)
+      g_main_loop_unref (main_loop);
+    main_loop = NULL;
+    if (main_context)
+      g_main_context_unref (main_context);
+    main_context = NULL;
+
+    if (delay_req_rand)
+      g_rand_free (delay_req_rand);
+    delay_req_rand = NULL;
+  }
+
+  g_mutex_unlock (&ptp_lock);
+
+  return ret;
+}
+
+/**
+ * gst_ptp_deinit:
+ *
+ * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
+ * are any remaining GstPtpClock instances, they won't be further synchronized
+ * to the PTP network clock.
+ *
+ * Since: 1.6
+ */
+void
+gst_ptp_deinit (void)
+{
+  GList *l, *m;
+
+  g_mutex_lock (&ptp_lock);
+
+  if (ptp_helper_pid) {
+    kill (ptp_helper_pid, SIGKILL);
+    waitpid (ptp_helper_pid, NULL, 0);
+    g_spawn_close_pid (ptp_helper_pid);
+  }
+  ptp_helper_pid = 0;
+
+  if (stdin_channel)
+    g_io_channel_unref (stdin_channel);
+  stdin_channel = NULL;
+  if (stdout_channel)
+    g_io_channel_unref (stdout_channel);
+  stdout_channel = NULL;
+
+  if (main_loop && ptp_helper_thread) {
+    GThread *tmp = ptp_helper_thread;
+    ptp_helper_thread = NULL;
+    g_mutex_unlock (&ptp_lock);
+    g_main_loop_quit (main_loop);
+    g_thread_join (tmp);
+    g_mutex_lock (&ptp_lock);
+  }
+  if (main_loop)
+    g_main_loop_unref (main_loop);
+  main_loop = NULL;
+  if (main_context)
+    g_main_context_unref (main_context);
+  main_context = NULL;
+
+  if (delay_req_rand)
+    g_rand_free (delay_req_rand);
+  delay_req_rand = NULL;
+
+  for (l = domain_data; l; l = l->next) {
+    PtpDomainData *domain = l->data;
+
+    for (m = domain->announce_senders; m; m = m->next) {
+      PtpAnnounceSender *sender = m->data;
+
+      g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
+      g_queue_clear (&sender->announce_messages);
+      g_free (sender);
+    }
+    g_list_free (domain->announce_senders);
+
+    g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
+        NULL);
+    g_queue_clear (&domain->pending_syncs);
+    gst_object_unref (domain->domain_clock);
+    g_free (domain);
+  }
+  g_list_free (domain_data);
+  domain_data = NULL;
+  g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
+  g_list_free (domain_clocks);
+  domain_clocks = NULL;
+
+  ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
+  ptp_clock_id.port_number = 0;
+
+  initted = FALSE;
+
+  g_mutex_unlock (&ptp_lock);
+}
+
+#define DEFAULT_DOMAIN 0
+
+enum
+{
+  PROP_0,
+  PROP_DOMAIN,
+  PROP_INTERNAL_CLOCK
+};
+
+#define GST_PTP_CLOCK_GET_PRIVATE(obj)  \
+  (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
+
+struct _GstPtpClockPrivate
+{
+  guint domain;
+  GstClock *domain_clock;
+  gulong domain_stats_id;
+};
+
+#define gst_ptp_clock_parent_class parent_class
+G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
+
+static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+static void gst_ptp_clock_finalize (GObject * object);
+
+static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
+
+static void
+gst_ptp_clock_class_init (GstPtpClockClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstClockClass *clock_class;
+
+  gobject_class = G_OBJECT_CLASS (klass);
+  clock_class = GST_CLOCK_CLASS (klass);
+
+  g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
+
+  gobject_class->finalize = gst_ptp_clock_finalize;
+  gobject_class->get_property = gst_ptp_clock_get_property;
+  gobject_class->set_property = gst_ptp_clock_set_property;
+
+  g_object_class_install_property (gobject_class, PROP_DOMAIN,
+      g_param_spec_uint ("domain", "Domain",
+          "The PTP domain", 0, G_MAXUINT8,
+          DEFAULT_DOMAIN,
+          G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
+      g_param_spec_object ("internal-clock", "Internal Clock",
+          "Internal clock", GST_TYPE_CLOCK,
+          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+  clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
+}
+
+static void
+gst_ptp_clock_init (GstPtpClock * self)
+{
+  GstPtpClockPrivate *priv;
+
+  self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
+
+  GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
+  GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
+
+  priv->domain = DEFAULT_DOMAIN;
+}
+
+static gboolean
+gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
+{
+  gboolean got_clock = TRUE;
+
+  if (G_UNLIKELY (!self->priv->domain_clock)) {
+    g_mutex_lock (&domain_clocks_lock);
+    if (!self->priv->domain_clock) {
+      GList *l;
+
+      got_clock = FALSE;
+
+      for (l = domain_clocks; l; l = l->next) {
+        PtpDomainData *clock_data = l->data;
+
+        if (clock_data->domain == self->priv->domain
+            && clock_data->last_ptp_time != 0) {
+          self->priv->domain_clock = clock_data->domain_clock;
+          got_clock = TRUE;
+          break;
+        }
+      }
+    }
+    g_mutex_unlock (&domain_clocks_lock);
+    if (got_clock) {
+      g_object_notify (G_OBJECT (self), "internal-clock");
+      gst_clock_set_synced (GST_CLOCK (self), TRUE);
+    }
+  }
+
+  return got_clock;
+}
+
+static gboolean
+gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
+    gpointer user_data)
+{
+  GstPtpClock *self = user_data;
+
+  if (domain != self->priv->domain
+      || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
+    return TRUE;
+
+  /* Let's set our internal clock */
+  if (!gst_ptp_clock_ensure_domain_clock (self))
+    return TRUE;
+
+  self->priv->domain_stats_id = 0;
+
+  return FALSE;
+}
+
+static void
+gst_ptp_clock_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstPtpClock *self = GST_PTP_CLOCK (object);
+
+  switch (prop_id) {
+    case PROP_DOMAIN:
+      self->priv->domain = g_value_get_uint (value);
+      gst_ptp_clock_ensure_domain_clock (self);
+      if (!self->priv->domain_clock)
+        self->priv->domain_stats_id =
+            gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
+            NULL);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_ptp_clock_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstPtpClock *self = GST_PTP_CLOCK (object);
+
+  switch (prop_id) {
+    case PROP_DOMAIN:
+      g_value_set_uint (value, self->priv->domain);
+      break;
+    case PROP_INTERNAL_CLOCK:
+      gst_ptp_clock_ensure_domain_clock (self);
+      g_value_set_object (value, self->priv->domain_clock);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+static void
+gst_ptp_clock_finalize (GObject * object)
+{
+  GstPtpClock *self = GST_PTP_CLOCK (object);
+
+  if (self->priv->domain_stats_id)
+    gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
+
+  G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
+}
+
+static GstClockTime
+gst_ptp_clock_get_internal_time (GstClock * clock)
+{
+  GstPtpClock *self = GST_PTP_CLOCK (clock);
+
+  gst_ptp_clock_ensure_domain_clock (self);
+
+  if (!self->priv->domain_clock) {
+    GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
+        self->priv->domain);
+    return GST_CLOCK_TIME_NONE;
+  }
+
+  return gst_clock_get_time (self->priv->domain_clock);
+}
+
+/**
+ * gst_ptp_clock_new:
+ * @name: Name of the clock
+ * @domain: PTP domain
+ *
+ * Creates a new PTP clock instance that exports the PTP time of the master
+ * clock in @domain. This clock can be slaved to other clocks as needed.
+ *
+ * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
+ * default parameters.
+ *
+ *
+ * This clock only returns valid timestamps after it received the first
+ * times from the PTP master clock on the network. Once this happens the
+ * GstPtpClock::internal-clock property will become non-NULL. You can connect
+ * to the notify::internal-clock signal to get notified about this, or
+ * alternatively use gst_ptp_clock_wait_ready() to wait for this to happen.
+ *
+ * Since: 1.6
+ */
+GstClock *
+gst_ptp_clock_new (const gchar * name, guint domain)
+{
+  g_return_val_if_fail (name != NULL, NULL);
+  g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
+
+  if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
+    GST_ERROR ("Failed to initialize PTP");
+    return NULL;
+  }
+
+  return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
+      NULL);
+}
+
+typedef struct
+{
+  guint8 domain;
+  const GstStructure *stats;
+} DomainStatsMarshalData;
+
+static void
+domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
+{
+  GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
+
+  if (!callback (data->domain, data->stats, hook->data))
+    g_hook_destroy (&domain_stats_hooks, hook->hook_id);
+}
+
+static void
+emit_ptp_statistics (guint8 domain, const GstStructure * stats)
+{
+  DomainStatsMarshalData data = { domain, stats };
+
+  g_mutex_lock (&ptp_lock);
+  g_hook_list_marshal (&domain_stats_hooks, TRUE,
+      (GHookMarshaller) domain_stats_marshaller, &data);
+  g_mutex_unlock (&ptp_lock);
+}
+
+/**
+ * gst_ptp_statistics_callback_add:
+ * @callback: GstPtpStatisticsCallback to call
+ * @user_data: Data to pass to the callback
+ * @destroy_data: GDestroyNotify to destroy the data
+ *
+ * Installs a new statistics callback for gathering PTP statistics. See
+ * GstPtpStatisticsCallback for a list of statistics that are provided.
+ *
+ * Returns: Id for the callback that can be passed to
+ * gst_ptp_statistics_callback_remove()
+ *
+ * Since: 1.6
+ */
+gulong
+gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
+    gpointer user_data, GDestroyNotify destroy_data)
+{
+  GHook *hook;
+
+  g_mutex_lock (&ptp_lock);
+
+  if (!domain_stats_hooks_initted) {
+    g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
+    domain_stats_hooks_initted = TRUE;
+  }
+
+  hook = g_hook_alloc (&domain_stats_hooks);
+  hook->func = callback;
+  hook->data = user_data;
+  hook->destroy = destroy_data;
+  g_hook_prepend (&domain_stats_hooks, hook);
+  g_atomic_int_add (&domain_stats_n_hooks, 1);
+
+  g_mutex_unlock (&ptp_lock);
+
+  return hook->hook_id;
+}
+
+/**
+ * gst_ptp_statistics_callback_remove:
+ * @id: Callback id to remove
+ *
+ * Removes a PTP statistics callback that was previously added with
+ * gst_ptp_statistics_callback_add().
+ *
+ * Since: 1.6
+ */
+void
+gst_ptp_statistics_callback_remove (gulong id)
+{
+  g_mutex_lock (&ptp_lock);
+  if (g_hook_destroy (&domain_stats_hooks, id))
+    g_atomic_int_add (&domain_stats_n_hooks, -1);
+  g_mutex_unlock (&ptp_lock);
+}
+
+#else /* HAVE_PTP */
+
+GType
+gst_ptp_clock_get_type (void)
+{
+  return G_TYPE_INVALID;
+}
+
+gboolean
+gst_ptp_is_supported (void)
+{
+  return FALSE;
+}
+
+gboolean
+gst_ptp_is_initialized (void)
+{
+  return FALSE;
+}
+
+gboolean
+gst_ptp_init (guint64 clock_id, gchar ** interfaces)
+{
+  return FALSE;
+}
+
+void
+gst_ptp_deinit (void)
+{
+}
+
+GstClock *
+gst_ptp_clock_new (const gchar * name, guint domain)
+{
+  return NULL;
+}
+
+gboolean
+gst_ptp_clock_wait_ready (GstPtpClock * self, GstClockTime timeout)
+{
+  return FALSE;
+}
+
+gulong
+gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
+    gpointer user_data, GDestroyNotify destroy_data)
+{
+  return 0;
+}
+
+void
+gst_ptp_statistics_callback_remove (gulong id)
+{
+  return;
+}
+
+#endif
diff --git a/libs/gst/net/gstptpclock.h b/libs/gst/net/gstptpclock.h
new file mode 100644 (file)
index 0000000..f50b83e
--- /dev/null
@@ -0,0 +1,142 @@
+/* GStreamer
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __GST_PTP_CLOCK_H__
+#define __GST_PTP_CLOCK_H__
+
+#include <gst/gst.h>
+#include <gst/gstsystemclock.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_PTP_CLOCK \
+  (gst_ptp_clock_get_type())
+#define GST_PTP_CLOCK(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PTP_CLOCK,GstPtpClock))
+#define GST_PTP_CLOCK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PTP_CLOCK,GstPtpClockClass))
+#define GST_IS_PTP_CLOCK(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PTP_CLOCK))
+#define GST_IS_PTP_CLOCK_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PTP_CLOCK))
+
+typedef struct _GstPtpClock GstPtpClock;
+typedef struct _GstPtpClockClass GstPtpClockClass;
+typedef struct _GstPtpClockPrivate GstPtpClockPrivate;
+
+/**
+ * GstPtpClock:
+ *
+ * Opaque #GstPtpClock structure.
+ */
+struct _GstPtpClock {
+  GstSystemClock clock;
+
+  /*< private >*/
+  GstPtpClockPrivate *priv;
+
+  gpointer _gst_reserved[GST_PADDING];
+};
+
+struct _GstPtpClockClass {
+  GstSystemClockClass parent_class;
+
+  /*< private >*/
+  gpointer _gst_reserved[GST_PADDING];
+};
+
+/**
+ * GST_PTP_CLOCK_ID_NONE:
+ * PTP clock identification that can be passed to gst_ptp_init() to
+ * automatically select one based on the MAC address of interfaces
+ */
+#define GST_PTP_CLOCK_ID_NONE ((guint64) -1)
+
+GType           gst_ptp_clock_get_type             (void);
+
+gboolean        gst_ptp_is_supported               (void);
+gboolean        gst_ptp_is_initialized             (void);
+gboolean        gst_ptp_init                       (guint64 clock_id,
+                                                    gchar ** interfaces);
+void            gst_ptp_deinit                     (void);
+
+#define GST_PTP_STATISTICS_NEW_DOMAIN_FOUND           "GstPtpStatisticsNewDomainFound"
+#define GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED "GstPtpStatisticsBestMasterClockSelected"
+#define GST_PTP_STATISTICS_PATH_DELAY_MEASURED        "GstPtpStatisticsPathDelayMeasured"
+#define GST_PTP_STATISTICS_TIME_UPDATED               "GstPtpStatisticsTimeUpdated"
+
+/**
+ * GstPtpStatisticsCallback:
+ * @domain: PTP domain identifier
+ * @stats: New statistics
+ * @user_data: Data passed to gst_ptp_statistics_callback_add()
+ *
+ * The statistics can be the following structures:
+ *
+ * GST_PTP_STATISTICS_NEW_DOMAIN_FOUND:
+ * "domain"                G_TYPE_UINT          The domain identifier of the domain
+ * "clock"                 GST_TYPE_CLOCK       The internal clock that is slaved to the
+ *                                              PTP domain
+ *
+ * GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED:
+ * "domain"                G_TYPE_UINT          The domain identifier of the domain
+ * "master-clock-id"       G_TYPE_UINT64        PTP clock identifier of the selected master
+ *                                              clock
+ * "master-clock-port"     G_TYPE_UINT          PTP port number of the selected master clock
+ * "grandmaster-clock-id"  G_TYPE_UINT64        PTP clock identifier of the grandmaster clock
+ *
+ * GST_PTP_STATISTICS_PATH_DELAY_MEASURED:
+ * "domain"                G_TYPE_UINT          The domain identifier of the domain
+ * "mean-path-delay-avg"   GST_TYPE_CLOCK_TIME  Average mean path delay
+ * "mean-path-delay"       GST_TYPE_CLOCK_TIME  Latest mean path delay
+ * "delay-request-delay"   GST_TYPE_CLOCK_TIME  Delay of DELAY_REQ / DELAY_RESP messages
+ *
+ * GST_PTP_STATISTICS_TIME_UPDATED:
+ * "domain"                G_TYPE_UINT          The domain identifier of the domain
+ * "mean-path-delay-avg"   GST_TYPE_CLOCK_TIME  Average mean path delay
+ * "local-time"            GST_TYPE_CLOCK_TIME  Local time that corresponds to ptp-time
+ * "ptp-time"              GST_TYPE_CLOCK_TIME  Newly measured PTP time at local-time
+ * "estimated-ptp-time"    GST_TYPE_CLOCK_TIME  Estimated PTP time based on previous measurements
+ * "discontinuity"         G_TYPE_INT64         Difference between estimated and measured PTP time
+ * "synced"                G_TYPE_BOOLEAN       Currently synced to the remote clock
+ * "r-squared"             G_TYPE_DOUBLE        R² of clock estimation regression
+ * "internal-time"         GST_TYPE_CLOCK_TIME  Internal time clock parameter
+ * "external-time"         GST_TYPE_CLOCK_TIME  External time clock parameter
+ * "rate-num"              G_TYPE_UINT64        Internal/external rate numerator
+ * "rate-den"              G_TYPE_UINT64        Internal/external rate denominator
+ * "rate"                  G_TYPE_DOUBLE        Internal/external rate
+ *
+ * If %FALSE is returned, the callback is removed and never called again.
+ *
+ */
+typedef gboolean  (*GstPtpStatisticsCallback)      (guint8 domain,
+                                                    const GstStructure * stats,
+                                                    gpointer user_data);
+gulong          gst_ptp_statistics_callback_add    (GstPtpStatisticsCallback callback,
+                                                    gpointer user_data, GDestroyNotify destroy_data);
+void            gst_ptp_statistics_callback_remove (gulong id);
+
+GstClock*       gst_ptp_clock_new                  (const gchar *name,
+                                                    guint domain);
+
+G_END_DECLS
+
+#endif /* __GST_PTP_CLOCK_H__ */
+
index 1ef53d6..8af032e 100644 (file)
@@ -27,5 +27,6 @@
 #include <gst/net/gstnetclientclock.h>
 #include <gst/net/gstnettimepacket.h>
 #include <gst/net/gstnettimeprovider.h>
+#include <gst/net/gstptpclock.h>
 
 #endif /* __GST_NET__H__ */
index f3cfd8d..6918f90 100644 (file)
@@ -11,6 +11,7 @@ always_dirs = \
        manual     \
        memory   \
        netclock \
+       ptp \
        stepping \
        streamiddemux \
        streams
diff --git a/tests/examples/ptp/.gitignore b/tests/examples/ptp/.gitignore
new file mode 100644 (file)
index 0000000..b5b83be
--- /dev/null
@@ -0,0 +1 @@
+ptp-print-times
diff --git a/tests/examples/ptp/Makefile.am b/tests/examples/ptp/Makefile.am
new file mode 100644 (file)
index 0000000..d76b4ad
--- /dev/null
@@ -0,0 +1,7 @@
+noinst_PROGRAMS = ptp-print-times
+
+ptp_print_times_LDADD = \
+       $(top_builddir)/libs/gst/net/libgstnet-@GST_API_VERSION@.la \
+       $(GST_OBJ_LIBS)
+ptp_print_times_CFLAGS = $(GST_OBJ_CFLAGS)
+
diff --git a/tests/examples/ptp/ptp-print-times.c b/tests/examples/ptp/ptp-print-times.c
new file mode 100644 (file)
index 0000000..e6c666f
--- /dev/null
@@ -0,0 +1,100 @@
+/* GStreamer
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/* Create a PTP client clock and print times and statistics.
+ *
+ * When running this from a GStreamer build tree, you will have to set
+ * GST_PTP_HELPER to libs/gst/helpers/.libs/gst-ptp-helper and also
+ * make sure that it has the right permissions (setuid root or appropriate
+ * capabilities
+ *
+ * You can test this with any PTP compatible clock, e.g. ptpd from here: http://ptpd.sourceforge.net/
+ *
+ * For testing the accuracy, you can use the PTP reflector available from
+ * http://code.centricular.com/ptp-clock-reflector/ or here
+ * https://github.com/sdroege/ptp-clock-reflector
+ */
+
+#include <gst/gst.h>
+#include <gst/net/net.h>
+
+static gint domain = 0;
+static gboolean stats = FALSE;
+
+static GOptionEntry opt_entries[] = {
+  {"domain", 'd', 0, G_OPTION_ARG_INT, &domain,
+      "PTP domain", NULL},
+  {"stats", 's', 0, G_OPTION_ARG_NONE, &stats,
+      "Print PTP statistics", NULL},
+  {NULL}
+};
+
+static gboolean
+stats_cb (guint8 d, const GstStructure * stats, gpointer user_data)
+{
+  if (d == domain) {
+    gchar *stats_str = gst_structure_to_string (stats);
+    g_print ("Got stats: %s\n", stats_str);
+    g_free (stats_str);
+  }
+
+  return TRUE;
+}
+
+gint
+main (gint argc, gchar ** argv)
+{
+  GOptionContext *opt_ctx;
+  GstClock *clock;
+  GError *err = NULL;
+
+  opt_ctx = g_option_context_new ("- GStreamer PTP clock test app");
+  g_option_context_add_main_entries (opt_ctx, opt_entries, NULL);
+  g_option_context_add_group (opt_ctx, gst_init_get_option_group ());
+  if (!g_option_context_parse (opt_ctx, &argc, &argv, &err))
+    g_error ("Error parsing options: %s", err->message);
+  g_option_context_free (opt_ctx);
+
+  if (!gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL))
+    g_error ("failed to init ptp");
+
+  if (stats)
+    gst_ptp_statistics_callback_add (stats_cb, NULL, NULL);
+
+  clock = gst_ptp_clock_new ("test-clock", domain);
+
+  gst_clock_wait_for_sync (GST_CLOCK (clock), GST_CLOCK_TIME_NONE);
+
+  while (TRUE) {
+    GstClockTime local, remote;
+    GstClockTimeDiff diff;
+
+    local = g_get_real_time () * 1000;
+    remote = gst_clock_get_time (clock);
+    diff = GST_CLOCK_DIFF (local, remote);
+
+    g_print ("local: %" GST_TIME_FORMAT " ptp: %" GST_TIME_FORMAT " diff: %s%"
+        GST_TIME_FORMAT "\n", GST_TIME_ARGS (local), GST_TIME_ARGS (remote),
+        (diff < 0 ? "-" : " "), GST_TIME_ARGS (ABS (diff)));
+    g_usleep (100000);
+  }
+
+  return 0;
+}
index 8e3f327..0434c32 100644 (file)
@@ -16,3 +16,11 @@ EXPORTS
        gst_net_time_packet_serialize
        gst_net_time_provider_get_type
        gst_net_time_provider_new
+       gst_ptp_clock_get_type
+       gst_ptp_clock_new
+       gst_ptp_deinit
+       gst_ptp_init
+       gst_ptp_is_initialized
+       gst_ptp_is_supported
+       gst_ptp_statistics_callback_add
+       gst_ptp_statistics_callback_remove