--- /dev/null
+/* GStreamer
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/* Helper process that runs setuid root or with appropriate privileges to
+ * listen on ports < 1024, do multicast operations and get MAC addresses of
+ * interfaces. Privileges are dropped after these operations are done.
+ *
+ * It listens on the PTP multicast group on port 319 and 320 and forwards
+ * everything received there to stdout, while forwarding everything received
+ * on stdout to those sockets.
+ * Additionally it provides the MAC address of a network interface via stdout
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <errno.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <string.h>
+
+#ifdef HAVE_PTP_HELPER_SETUID
+#include <grp.h>
+#include <pwd.h>
+#endif
+
+#ifdef HAVE_PTP_HELPER_CAPABILITIES
+#include <sys/capability.h>
+#endif
+
+#include <glib.h>
+#include <gio/gio.h>
+
+#include <gst/gst.h>
+#include <gst/net/gstptp_private.h>
+
+#define PTP_MULTICAST_GROUP "224.0.1.129"
+#define PTP_EVENT_PORT 319
+#define PTP_GENERAL_PORT 320
+
+static gchar **ifaces = NULL;
+static gboolean verbose = FALSE;
+static guint64 clock_id = (guint64) - 1;
+static guint8 clock_id_array[8];
+
+static GOptionEntry opt_entries[] = {
+ {"interface", 'i', 0, G_OPTION_ARG_STRING_ARRAY, &ifaces,
+ "Interface to listen on", NULL},
+ {"clock-id", 'c', 0, G_OPTION_ARG_INT64, &clock_id,
+ "PTP clock id", NULL},
+ {"verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose,
+ "Be verbose", NULL},
+ {NULL}
+};
+
+static GSocketAddress *event_saddr, *general_saddr;
+static GSocket *socket_event, *socket_general;
+static GIOChannel *stdin_channel, *stdout_channel;
+
+static gboolean
+have_socket_data_cb (GSocket * socket, GIOCondition condition,
+ gpointer user_data)
+{
+ gchar buffer[8192];
+ gssize read;
+ gsize written;
+ GError *err = NULL;
+ GIOStatus status;
+ StdIOHeader header = { 0, };
+
+ read = g_socket_receive (socket, buffer, sizeof (buffer), NULL, &err);
+ if (read == -1)
+ g_error ("Failed to read from socket: %s", err->message);
+
+ if (verbose)
+ g_message ("Received %" G_GSSIZE_FORMAT " bytes from %s socket", read,
+ (socket == socket_event ? "event" : "general"));
+
+ header.size = read;
+ header.type = (socket == socket_event) ? TYPE_EVENT : TYPE_GENERAL;
+
+ status =
+ g_io_channel_write_chars (stdout_channel, (gchar *) & header,
+ sizeof (header), &written, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_error ("Failed to write to stdout: %s", err->message);
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdout");
+ exit (0);
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_error ("Unexpected stdout write status: %d", status);
+ } else if (written != sizeof (header)) {
+ g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+ }
+
+ status =
+ g_io_channel_write_chars (stdout_channel, buffer, read, &written, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_error ("Failed to write to stdout: %s", err->message);
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdout");
+ exit (0);
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_error ("Unexpected stdout write status: %d", status);
+ } else if (written != read) {
+ g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+ }
+
+ return G_SOURCE_CONTINUE;
+}
+
+static gboolean
+have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
+ gpointer user_data)
+{
+ GIOStatus status;
+ StdIOHeader header = { 0, };
+ gchar buffer[8192];
+ GError *err = NULL;
+ gsize read;
+ gssize written;
+
+ if ((condition & G_IO_STATUS_EOF)) {
+ g_message ("EOF on stdin");
+ exit (0);
+ }
+
+ status =
+ g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
+ &read, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_error ("Failed to read from stdin: %s", err->message);
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdin");
+ exit (0);
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_error ("Unexpected stdin read status: %d", status);
+ } else if (read != sizeof (header)) {
+ g_error ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+ } else if (header.size > 8192) {
+ g_error ("Unexpected size: %u", header.size);
+ }
+
+ status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_error ("Failed to read from stdin: %s", err->message);
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdin");
+ exit (0);
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_error ("Unexpected stdin read status: %d", status);
+ } else if (read != header.size) {
+ g_error ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+ }
+
+ switch (header.type) {
+ case TYPE_EVENT:
+ case TYPE_GENERAL:
+ written =
+ g_socket_send_to (header.type ==
+ TYPE_EVENT ? socket_event : socket_general,
+ (header.type == TYPE_EVENT ? event_saddr : general_saddr), buffer,
+ header.size, NULL, &err);
+ if (written == -1)
+ g_error ("Failed to write to socket: %s", err->message);
+ else if (written != header.size)
+ g_error ("Unexpected write size: %" G_GSSIZE_FORMAT, written);
+
+ if (verbose)
+ g_message ("Sent %" G_GSSIZE_FORMAT " bytes to %s socket", read,
+ (header.type == TYPE_EVENT ? "event" : "general"));
+ break;
+ default:
+ break;
+ }
+
+ return G_SOURCE_CONTINUE;
+}
+
+static void
+setup_sockets (void)
+{
+ GInetAddress *bind_addr, *mcast_addr;
+ GSocketAddress *bind_saddr;
+ GSource *socket_event_source, *socket_general_source;
+ gchar **probed_ifaces = NULL;
+ GError *err = NULL;
+
+ /* Create sockets */
+ socket_event =
+ g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
+ G_SOCKET_PROTOCOL_UDP, &err);
+ if (!socket_event)
+ g_error ("Couldn't create event socket: %s", err->message);
+
+ socket_general =
+ g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_DATAGRAM,
+ G_SOCKET_PROTOCOL_UDP, &err);
+ if (!socket_general)
+ g_error ("Couldn't create general socket: %s", err->message);
+
+ /* Bind sockets */
+ bind_addr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+ bind_saddr = g_inet_socket_address_new (bind_addr, PTP_EVENT_PORT);
+ if (!g_socket_bind (socket_event, bind_saddr, TRUE, &err))
+ g_error ("Couldn't bind event socket: %s", err->message);
+ g_object_unref (bind_saddr);
+ bind_saddr = g_inet_socket_address_new (bind_addr, PTP_GENERAL_PORT);
+ if (!g_socket_bind (socket_general, bind_saddr, TRUE, &err))
+ g_error ("Couldn't bind general socket: %s", err->message);
+ g_object_unref (bind_saddr);
+ g_object_unref (bind_addr);
+
+ /* Probe all non-loopback interfaces */
+ if (!ifaces) {
+ struct ifreq ifr;
+ struct ifconf ifc;
+ gchar buf[8192];
+
+ ifc.ifc_len = sizeof (buf);
+ ifc.ifc_buf = buf;
+ if (ioctl (g_socket_get_fd (socket_event), SIOCGIFCONF, &ifc) != -1) {
+ struct ifreq *it = ifc.ifc_req;
+ const struct ifreq *const end =
+ it + (ifc.ifc_len / sizeof (struct ifreq));
+ guint idx = 0;
+
+ probed_ifaces = g_new0 (gchar *, ifc.ifc_len + 1);
+
+ for (; it != end; ++it) {
+ strcpy (ifr.ifr_name, it->ifr_name);
+ if (ioctl (g_socket_get_fd (socket_event), SIOCGIFFLAGS, &ifr) == 0) {
+ if ((ifr.ifr_flags & IFF_LOOPBACK))
+ continue;
+ probed_ifaces[idx] = g_strdup (it->ifr_name);
+ idx++;
+ } else {
+ g_warning ("can't get flags of interface '%s'", it->ifr_name);
+ probed_ifaces[idx] = g_strdup (it->ifr_name);
+ idx++;
+ }
+ }
+
+ if (idx != 0)
+ ifaces = probed_ifaces;
+ }
+ }
+
+ /* Get a clock id from the MAC address if none was given */
+ if (clock_id == (guint64) - 1) {
+ struct ifreq ifr;
+ gboolean success = FALSE;
+
+ if (ifaces) {
+ gchar **ptr = ifaces;
+
+ while (*ptr) {
+ strcpy (ifr.ifr_name, *ptr);
+ if (ioctl (g_socket_get_fd (socket_event), SIOCGIFHWADDR, &ifr) == 0) {
+ clock_id_array[0] = ifr.ifr_hwaddr.sa_data[0];
+ clock_id_array[1] = ifr.ifr_hwaddr.sa_data[1];
+ clock_id_array[2] = ifr.ifr_hwaddr.sa_data[2];
+ clock_id_array[3] = 0xff;
+ clock_id_array[4] = 0xfe;
+ clock_id_array[5] = ifr.ifr_hwaddr.sa_data[3];
+ clock_id_array[6] = ifr.ifr_hwaddr.sa_data[4];
+ clock_id_array[7] = ifr.ifr_hwaddr.sa_data[5];
+ success = TRUE;
+ break;
+ }
+ }
+
+ ptr++;
+ } else {
+ struct ifconf ifc;
+ gchar buf[8192];
+
+ ifc.ifc_len = sizeof (buf);
+ ifc.ifc_buf = buf;
+ if (ioctl (g_socket_get_fd (socket_event), SIOCGIFCONF, &ifc) != -1) {
+ struct ifreq *it = ifc.ifc_req;
+ const struct ifreq *const end =
+ it + (ifc.ifc_len / sizeof (struct ifreq));
+
+ for (; it != end; ++it) {
+ strcpy (ifr.ifr_name, it->ifr_name);
+ if (ioctl (g_socket_get_fd (socket_event), SIOCGIFFLAGS, &ifr) == 0) {
+ if ((ifr.ifr_flags & IFF_LOOPBACK))
+ continue;
+
+ if (ioctl (g_socket_get_fd (socket_event), SIOCGIFHWADDR,
+ &ifr) == 0) {
+ clock_id_array[0] = ifr.ifr_hwaddr.sa_data[0];
+ clock_id_array[1] = ifr.ifr_hwaddr.sa_data[1];
+ clock_id_array[2] = ifr.ifr_hwaddr.sa_data[2];
+ clock_id_array[3] = 0xff;
+ clock_id_array[4] = 0xfe;
+ clock_id_array[5] = ifr.ifr_hwaddr.sa_data[3];
+ clock_id_array[6] = ifr.ifr_hwaddr.sa_data[4];
+ clock_id_array[7] = ifr.ifr_hwaddr.sa_data[5];
+ success = TRUE;
+ break;
+ }
+ } else {
+ g_warning ("can't get flags of interface '%s'", it->ifr_name);
+ }
+ }
+ }
+ }
+
+ if (!success) {
+ g_warning ("can't get any MAC address, using random clock id");
+ clock_id = (((guint64) g_random_int ()) << 32) | (g_random_int ());
+ GST_WRITE_UINT64_BE (clock_id_array, clock_id);
+ clock_id_array[3] = 0xff;
+ clock_id_array[4] = 0xfe;
+ }
+ } else {
+ GST_WRITE_UINT64_BE (clock_id_array, clock_id);
+ }
+
+ /* Join multicast groups */
+ mcast_addr = g_inet_address_new_from_string (PTP_MULTICAST_GROUP);
+ if (ifaces) {
+ gchar **ptr = ifaces;
+ gboolean success = FALSE;
+
+ while (*ptr) {
+ gint c = 0;
+ if (!g_socket_join_multicast_group (socket_event, mcast_addr, FALSE, *ptr,
+ &err))
+ g_warning ("Couldn't join multicast group on interface '%s': %s",
+ *ptr, err->message);
+ else
+ c++;
+
+ if (!g_socket_join_multicast_group (socket_general, mcast_addr, FALSE,
+ *ptr, &err))
+ g_warning ("Couldn't join multicast group on interface '%s': %s",
+ *ptr, err->message);
+ else
+ c++;
+
+ if (c == 2)
+ success = TRUE;
+ ptr++;
+ }
+
+ if (!success) {
+ /* Join multicast group without any interface */
+ if (!g_socket_join_multicast_group (socket_event, mcast_addr, FALSE, NULL,
+ &err))
+ g_error ("Couldn't join multicast group: %s", err->message);
+ if (!g_socket_join_multicast_group (socket_general, mcast_addr, FALSE,
+ NULL, &err))
+ g_error ("Couldn't join multicast group: %s", err->message);
+ }
+ } else {
+ /* Join multicast group without any interface */
+ if (!g_socket_join_multicast_group (socket_event, mcast_addr, FALSE, NULL,
+ &err))
+ g_error ("Couldn't join multicast group: %s", err->message);
+ if (!g_socket_join_multicast_group (socket_general, mcast_addr, FALSE, NULL,
+ &err))
+ g_error ("Couldn't join multicast group: %s", err->message);
+ }
+
+ event_saddr = g_inet_socket_address_new (mcast_addr, PTP_EVENT_PORT);
+ general_saddr = g_inet_socket_address_new (mcast_addr, PTP_GENERAL_PORT);
+
+ /* Create socket sources */
+ socket_event_source =
+ g_socket_create_source (socket_event, G_IO_IN | G_IO_PRI, NULL);
+ g_source_set_priority (socket_event_source, G_PRIORITY_HIGH);
+ g_source_set_callback (socket_event_source, (GSourceFunc) have_socket_data_cb,
+ NULL, NULL);
+ g_source_attach (socket_event_source, NULL);
+ socket_general_source =
+ g_socket_create_source (socket_general, G_IO_IN | G_IO_PRI, NULL);
+ g_source_set_priority (socket_general_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (socket_general_source,
+ (GSourceFunc) have_socket_data_cb, NULL, NULL);
+ g_source_attach (socket_general_source, NULL);
+
+ g_strfreev (probed_ifaces);
+}
+
+static void
+drop_privileges (void)
+{
+#ifdef HAVE_PTP_HELPER_SETUID
+ /* Switch to the given user/group */
+#ifdef HAVE_PTP_HELPER_SETUID_GROUP
+ {
+ struct group *grp;
+
+ grp = getgrnam (HAVE_PTP_HELPER_SETUID_GROUP);
+ if (!grp)
+ g_error ("Failed to get group information '%s': %s",
+ HAVE_PTP_HELPER_SETUID_GROUP, g_strerror (errno));
+
+ if (setgid (grp->gr_gid) != 0)
+ g_error ("Failed to change to group '%s': %s",
+ HAVE_PTP_HELPER_SETUID_GROUP, g_strerror (errno));
+ }
+#endif
+
+#ifdef HAVE_PTP_HELPER_SETUID_USER
+ {
+ struct passwd *pwd;
+
+ pwd = getpwnam (HAVE_PTP_HELPER_SETUID_USER);
+ if (!pwd)
+ g_error ("Failed to get user information '%s': %s",
+ HAVE_PTP_HELPER_SETUID_USER, g_strerror (errno));
+
+#ifndef HAVE_PTP_HELPER_SETUID_GROUP
+ if (setgid (pwd->pw_gid) != 0)
+ g_error ("Failed to change to user group '%s': %s",
+ HAVE_PTP_HELPER_SETUID_USER, g_strerror (errno));
+#endif
+
+ if (setuid (pwd->pw_uid) != 0)
+ g_error ("Failed to change to user '%s': %s", HAVE_PTP_HELPER_SETUID_USER,
+ g_strerror (errno));
+ }
+#endif
+#endif
+#ifdef HAVE_PTP_HELPER_CAPABILITIES
+ /* Drop all capabilities */
+ {
+ cap_t caps;
+
+ caps = cap_get_proc ();
+ if (caps == 0)
+ g_error ("Failed to get process caps: %s", g_strerror (errno));
+ if (cap_clear (caps) != 0)
+ g_error ("Failed to clear caps: %s", g_strerror (errno));
+ if (cap_set_proc (caps) != 0)
+ g_error ("Failed to set process caps: %s", g_strerror (errno));
+ }
+#endif
+}
+
+static void
+setup_stdio_channels (void)
+{
+ GSource *stdin_source;
+
+ /* Create stdin source */
+ stdin_channel = g_io_channel_unix_new (STDIN_FILENO);
+ if (g_io_channel_set_encoding (stdin_channel, NULL,
+ NULL) == G_IO_STATUS_ERROR)
+ g_error ("Failed to set stdin to binary encoding");
+ g_io_channel_set_buffered (stdin_channel, FALSE);
+ stdin_source =
+ g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
+ g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
+ NULL);
+ g_source_attach (stdin_source, NULL);
+
+ /* Create stdout channel */
+ stdout_channel = g_io_channel_unix_new (STDOUT_FILENO);
+ if (g_io_channel_set_encoding (stdout_channel, NULL,
+ NULL) == G_IO_STATUS_ERROR)
+ g_error ("Failed to set stdout to binary encoding");
+ g_io_channel_set_buffered (stdout_channel, FALSE);
+}
+
+static void
+write_clock_id (void)
+{
+ GError *err = NULL;
+ GIOStatus status;
+ StdIOHeader header = { 0, };
+ gsize written;
+
+ /* Write clock id to stdout */
+
+ header.type = TYPE_CLOCK_ID;
+ header.size = 8;
+ status =
+ g_io_channel_write_chars (stdout_channel, (gchar *) & header,
+ sizeof (header), &written, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_error ("Failed to write to stdout: %s", err->message);
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdout");
+ exit (0);
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_error ("Unexpected stdout write status: %d", status);
+ } else if (written != sizeof (header)) {
+ g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+ }
+
+ status =
+ g_io_channel_write_chars (stdout_channel,
+ (const gchar *) clock_id_array, sizeof (clock_id_array), &written, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_error ("Failed to write to stdout: %s", err->message);
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdout");
+ exit (0);
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_error ("Unexpected stdout write status: %d", status);
+ } else if (written != sizeof (clock_id_array)) {
+ g_error ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+ }
+}
+
+gint
+main (gint argc, gchar ** argv)
+{
+ GOptionContext *opt_ctx;
+ GMainLoop *loop;
+ GError *err = NULL;
+
+ opt_ctx = g_option_context_new ("- GStreamer PTP helper process");
+ g_option_context_add_main_entries (opt_ctx, opt_entries, NULL);
+ if (!g_option_context_parse (opt_ctx, &argc, &argv, &err))
+ g_error ("Error parsing options: %s", err->message);
+ g_option_context_free (opt_ctx);
+
+ setup_sockets ();
+ drop_privileges ();
+ setup_stdio_channels ();
+ write_clock_id ();
+
+ /* Get running */
+ loop = g_main_loop_new (NULL, FALSE);
+ g_main_loop_run (loop);
+
+ /* We never exit cleanly, so don't do cleanup */
+ g_assert_not_reached ();
+
+ return 0;
+}
--- /dev/null
+/* GStreamer
+ * Copyright (C) 2015 Sebastian Dröge <sebastian@centricular.com>
+ *
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+/**
+ * SECTION:gstptpclock
+ * @short_description: Special clock that synchronizes to a remote time
+ * provider via PTP (IEEE1588:2008).
+ * @see_also: #GstClock, #GstNetClientClock, #GstPipeline
+ *
+ * GstPtpClock implements a PTP (IEEE1588:2008) ordinary clock in slave-only
+ * mode, that allows a GStreamer pipeline to synchronize to a PTP network
+ * clock in some specific domain.
+ *
+ * The PTP subsystem can be initialized with gst_ptp_init(), which then starts
+ * a helper process to do the actual communication via the PTP ports. This is
+ * required as PTP listens on ports < 1024 and thus requires special
+ * privileges. Once this helper process is started, the main process will
+ * synchronize to all PTP domains that are detected on the selected
+ * interfaces.
+ *
+ * gst_ptp_clock_new() then allows to create a GstClock that provides the PTP
+ * time from a master clock inside a specific PTP domain. This clock will only
+ * return valid timestamps once the timestamps in the PTP domain are known. To
+ * check this, the GstPtpClock::internal-clock property and the related
+ * notify::clock signal can be used. Once the internal clock is not NULL, the
+ * PTP domain's time is known. Alternatively you can wait for this with
+ * gst_ptp_clock_wait_ready().
+ *
+ *
+ * To gather statistics about the PTP clock synchronization,
+ * gst_ptp_statistics_callback_add() can be used. This gives the application
+ * the possibility to collect all kinds of statistics from the clock
+ * synchronization.
+ *
+ * Since: 1.6
+ *
+ */
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "gstptpclock.h"
+
+#ifdef HAVE_PTP
+
+#include "gstptp_private.h"
+
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <gst/base/base.h>
+
+GST_DEBUG_CATEGORY_STATIC (ptp_debug);
+#define GST_CAT_DEFAULT (ptp_debug)
+
+/* IEEE 1588 7.7.3.1 */
+#define PTP_ANNOUNCE_RECEIPT_TIMEOUT 4
+
+#define MAX_SKIPPED_UPDATES 5
+
+typedef enum
+{
+ PTP_MESSAGE_TYPE_SYNC = 0x0,
+ PTP_MESSAGE_TYPE_DELAY_REQ = 0x1,
+ PTP_MESSAGE_TYPE_PDELAY_REQ = 0x2,
+ PTP_MESSAGE_TYPE_PDELAY_RESP = 0x3,
+ PTP_MESSAGE_TYPE_FOLLOW_UP = 0x8,
+ PTP_MESSAGE_TYPE_DELAY_RESP = 0x9,
+ PTP_MESSAGE_TYPE_PDELAY_RESP_FOLLOW_UP = 0xA,
+ PTP_MESSAGE_TYPE_ANNOUNCE = 0xB,
+ PTP_MESSAGE_TYPE_SIGNALING = 0xC,
+ PTP_MESSAGE_TYPE_MANAGEMENT = 0xD
+} PtpMessageType;
+
+typedef struct
+{
+ guint64 seconds_field; /* 48 bits valid */
+ guint32 nanoseconds_field;
+} PtpTimestamp;
+
+#define PTP_TIMESTAMP_TO_GST_CLOCK_TIME(ptp) (ptp.seconds_field * GST_SECOND + ptp.nanoseconds_field)
+#define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_SECONDS(gst) (((GstClockTime) gst) / GST_SECOND)
+#define GST_CLOCK_TIME_TO_PTP_TIMESTAMP_NANOSECONDS(gst) (((GstClockTime) gst) % GST_SECOND)
+
+typedef struct
+{
+ guint64 clock_identity;
+ guint16 port_number;
+} PtpClockIdentity;
+
+static gint
+compare_clock_identity (const PtpClockIdentity * a, const PtpClockIdentity * b)
+{
+ if (a->clock_identity < b->clock_identity)
+ return -1;
+ else if (a->clock_identity > b->clock_identity)
+ return 1;
+
+ if (a->port_number < b->port_number)
+ return -1;
+ else if (a->port_number > b->port_number)
+ return 1;
+
+ return 0;
+}
+
+typedef struct
+{
+ guint8 clock_class;
+ guint8 clock_accuracy;
+ guint16 offset_scaled_log_variance;
+} PtpClockQuality;
+
+typedef struct
+{
+ guint8 transport_specific;
+ PtpMessageType message_type;
+ /* guint8 reserved; */
+ guint8 version_ptp;
+ guint16 message_length;
+ guint8 domain_number;
+ /* guint8 reserved; */
+ guint16 flag_field;
+ gint64 correction_field; /* 48.16 fixed point nanoseconds */
+ /* guint32 reserved; */
+ PtpClockIdentity source_port_identity;
+ guint16 sequence_id;
+ guint8 control_field;
+ gint8 log_message_interval;
+
+ union
+ {
+ struct
+ {
+ PtpTimestamp origin_timestamp;
+ gint16 current_utc_offset;
+ /* guint8 reserved; */
+ guint8 grandmaster_priority_1;
+ PtpClockQuality grandmaster_clock_quality;
+ guint8 grandmaster_priority_2;
+ guint64 grandmaster_identity;
+ guint16 steps_removed;
+ guint8 time_source;
+ } announce;
+
+ struct
+ {
+ PtpTimestamp origin_timestamp;
+ } sync;
+
+ struct
+ {
+ PtpTimestamp precise_origin_timestamp;
+ } follow_up;
+
+ struct
+ {
+ PtpTimestamp origin_timestamp;
+ } delay_req;
+
+ struct
+ {
+ PtpTimestamp receive_timestamp;
+ PtpClockIdentity requesting_port_identity;
+ } delay_resp;
+
+ } message_specific;
+} PtpMessage;
+
+static GMutex ptp_lock;
+static GCond ptp_cond;
+static gboolean initted = FALSE;
+static gboolean supported = TRUE;
+static GPid ptp_helper_pid;
+static GThread *ptp_helper_thread;
+static GMainContext *main_context;
+static GMainLoop *main_loop;
+static GIOChannel *stdin_channel, *stdout_channel;
+static GRand *delay_req_rand;
+static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
+
+typedef struct
+{
+ GstClockTime receive_time;
+
+ PtpClockIdentity master_clock_identity;
+
+ guint8 grandmaster_priority_1;
+ PtpClockQuality grandmaster_clock_quality;
+ guint8 grandmaster_priority_2;
+ guint64 grandmaster_identity;
+ guint16 steps_removed;
+ guint8 time_source;
+
+ guint16 sequence_id;
+} PtpAnnounceMessage;
+
+typedef struct
+{
+ PtpClockIdentity master_clock_identity;
+
+ GstClockTime announce_interval; /* last interval we received */
+ GQueue announce_messages;
+} PtpAnnounceSender;
+
+typedef struct
+{
+ guint domain;
+ PtpClockIdentity master_clock_identity;
+
+ guint16 sync_seqnum;
+ GstClockTime sync_recv_time_local; /* t2 */
+ GstClockTime sync_send_time_remote; /* t1, might be -1 if FOLLOW_UP pending */
+ GstClockTime follow_up_recv_time_local;
+
+ GSource *timeout_source;
+ guint16 delay_req_seqnum;
+ GstClockTime delay_req_send_time_local; /* t3, -1 if we wait for FOLLOW_UP */
+ GstClockTime delay_req_recv_time_remote; /* t4, -1 if we wait */
+ GstClockTime delay_resp_recv_time_local;
+
+ gint64 correction_field_sync; /* sum of the correction fields of SYNC/FOLLOW_UP */
+ gint64 correction_field_delay; /* sum of the correction fields of DELAY_RESP */
+} PtpPendingSync;
+
+static void
+ptp_pending_sync_free (PtpPendingSync * sync)
+{
+ if (sync->timeout_source)
+ g_source_destroy (sync->timeout_source);
+ g_free (sync);
+}
+
+typedef struct
+{
+ guint domain;
+
+ GstClockTime last_ptp_time;
+ GstClockTime last_local_time;
+ gint skipped_updates;
+
+ /* Used for selecting the master/grandmaster */
+ GList *announce_senders;
+
+ /* Last selected master clock */
+ gboolean have_master_clock;
+ PtpClockIdentity master_clock_identity;
+ guint64 grandmaster_identity;
+
+ /* Last SYNC or FOLLOW_UP timestamp we received */
+ GstClockTime last_ptp_sync_time;
+ GstClockTime sync_interval;
+
+ GstClockTime mean_path_delay;
+ GstClockTime last_delay_req, min_delay_req_interval;
+ guint16 last_delay_req_seqnum;
+
+ GQueue pending_syncs;
+
+ GstClock *domain_clock;
+} PtpDomainData;
+
+static GList *domain_data;
+static GMutex domain_clocks_lock;
+static GList *domain_clocks;
+
+/* Protected by PTP lock */
+static void emit_ptp_statistics (guint8 domain, const GstStructure * stats);
+static GHookList domain_stats_hooks;
+static gint domain_stats_n_hooks;
+static gboolean domain_stats_hooks_initted = FALSE;
+
+/* Converts log2 seconds to GstClockTime */
+static GstClockTime
+log2_to_clock_time (gint l)
+{
+ if (l < 0)
+ return GST_SECOND >> (-l);
+ else
+ return GST_SECOND << l;
+}
+
+static void
+dump_ptp_message (PtpMessage * msg)
+{
+ GST_TRACE ("PTP message:");
+ GST_TRACE ("\ttransport_specific: %u", msg->transport_specific);
+ GST_TRACE ("\tmessage_type: 0x%01x", msg->message_type);
+ GST_TRACE ("\tversion_ptp: %u", msg->version_ptp);
+ GST_TRACE ("\tmessage_length: %u", msg->message_length);
+ GST_TRACE ("\tdomain_number: %u", msg->domain_number);
+ GST_TRACE ("\tflag_field: 0x%04x", msg->flag_field);
+ GST_TRACE ("\tcorrection_field: %" G_GINT64_FORMAT ".%03u",
+ (msg->correction_field / 65536),
+ (guint) ((msg->correction_field & 0xffff) * 1000) / 65536);
+ GST_TRACE ("\tsource_port_identity: 0x%016" G_GINT64_MODIFIER "x %u",
+ msg->source_port_identity.clock_identity,
+ msg->source_port_identity.port_number);
+ GST_TRACE ("\tsequence_id: %u", msg->sequence_id);
+ GST_TRACE ("\tcontrol_field: 0x%02x", msg->control_field);
+ GST_TRACE ("\tmessage_interval: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (log2_to_clock_time (msg->log_message_interval)));
+
+ switch (msg->message_type) {
+ case PTP_MESSAGE_TYPE_ANNOUNCE:
+ GST_TRACE ("\tANNOUNCE:");
+ GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+ msg->message_specific.announce.origin_timestamp.seconds_field,
+ msg->message_specific.announce.origin_timestamp.nanoseconds_field);
+ GST_TRACE ("\t\tcurrent_utc_offset: %d",
+ msg->message_specific.announce.current_utc_offset);
+ GST_TRACE ("\t\tgrandmaster_priority_1: %u",
+ msg->message_specific.announce.grandmaster_priority_1);
+ GST_TRACE ("\t\tgrandmaster_clock_quality: 0x%02x 0x%02x %u",
+ msg->message_specific.announce.grandmaster_clock_quality.clock_class,
+ msg->message_specific.announce.
+ grandmaster_clock_quality.clock_accuracy,
+ msg->message_specific.announce.
+ grandmaster_clock_quality.offset_scaled_log_variance);
+ GST_TRACE ("\t\tgrandmaster_priority_2: %u",
+ msg->message_specific.announce.grandmaster_priority_2);
+ GST_TRACE ("\t\tgrandmaster_identity: 0x%016" G_GINT64_MODIFIER "x",
+ msg->message_specific.announce.grandmaster_identity);
+ GST_TRACE ("\t\tsteps_removed: %u",
+ msg->message_specific.announce.steps_removed);
+ GST_TRACE ("\t\ttime_source: 0x%02x",
+ msg->message_specific.announce.time_source);
+ break;
+ case PTP_MESSAGE_TYPE_SYNC:
+ GST_TRACE ("\tSYNC:");
+ GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+ msg->message_specific.sync.origin_timestamp.seconds_field,
+ msg->message_specific.sync.origin_timestamp.nanoseconds_field);
+ break;
+ case PTP_MESSAGE_TYPE_FOLLOW_UP:
+ GST_TRACE ("\tFOLLOW_UP:");
+ GST_TRACE ("\t\tprecise_origin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+ msg->message_specific.follow_up.
+ precise_origin_timestamp.seconds_field,
+ msg->message_specific.follow_up.
+ precise_origin_timestamp.nanoseconds_field);
+ break;
+ case PTP_MESSAGE_TYPE_DELAY_REQ:
+ GST_TRACE ("\tDELAY_REQ:");
+ GST_TRACE ("\t\torigin_timestamp: %" G_GUINT64_FORMAT ".%09u",
+ msg->message_specific.delay_req.origin_timestamp.seconds_field,
+ msg->message_specific.delay_req.origin_timestamp.nanoseconds_field);
+ break;
+ case PTP_MESSAGE_TYPE_DELAY_RESP:
+ GST_TRACE ("\tDELAY_RESP:");
+ GST_TRACE ("\t\treceive_timestamp: %" G_GUINT64_FORMAT ".%09u",
+ msg->message_specific.delay_resp.receive_timestamp.seconds_field,
+ msg->message_specific.delay_resp.receive_timestamp.nanoseconds_field);
+ GST_TRACE ("\t\trequesting_port_identity: 0x%016" G_GINT64_MODIFIER
+ "x %u",
+ msg->message_specific.delay_resp.
+ requesting_port_identity.clock_identity,
+ msg->message_specific.delay_resp.
+ requesting_port_identity.port_number);
+ break;
+ default:
+ break;
+ }
+ GST_TRACE (" ");
+}
+
+/* IEEE 1588-2008 5.3.3 */
+static gboolean
+parse_ptp_timestamp (PtpTimestamp * timestamp, GstByteReader * reader)
+{
+ g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 10, FALSE);
+
+ timestamp->seconds_field =
+ (((guint64) gst_byte_reader_get_uint32_be_unchecked (reader)) << 16) |
+ gst_byte_reader_get_uint16_be_unchecked (reader);
+ timestamp->nanoseconds_field =
+ gst_byte_reader_get_uint32_be_unchecked (reader);
+
+ if (timestamp->nanoseconds_field >= 1000000000)
+ return FALSE;
+
+ return TRUE;
+}
+
+/* IEEE 1588-2008 13.3 */
+static gboolean
+parse_ptp_message_header (PtpMessage * msg, GstByteReader * reader)
+{
+ guint8 b;
+
+ g_return_val_if_fail (gst_byte_reader_get_remaining (reader) >= 34, FALSE);
+
+ b = gst_byte_reader_get_uint8_unchecked (reader);
+ msg->transport_specific = b >> 4;
+ msg->message_type = b & 0x0f;
+
+ b = gst_byte_reader_get_uint8_unchecked (reader);
+ msg->version_ptp = b & 0x0f;
+ if (msg->version_ptp != 2) {
+ GST_WARNING ("Unsupported PTP message version (%u != 2)", msg->version_ptp);
+ return FALSE;
+ }
+
+ msg->message_length = gst_byte_reader_get_uint16_be_unchecked (reader);
+ if (gst_byte_reader_get_remaining (reader) + 4 < msg->message_length) {
+ GST_WARNING ("Not enough data (%u < %u)",
+ gst_byte_reader_get_remaining (reader) + 4, msg->message_length);
+ return FALSE;
+ }
+
+ msg->domain_number = gst_byte_reader_get_uint8_unchecked (reader);
+ gst_byte_reader_skip_unchecked (reader, 1);
+
+ msg->flag_field = gst_byte_reader_get_uint16_be_unchecked (reader);
+ msg->correction_field = gst_byte_reader_get_uint64_be_unchecked (reader);
+ gst_byte_reader_skip_unchecked (reader, 4);
+
+ msg->source_port_identity.clock_identity =
+ gst_byte_reader_get_uint64_be_unchecked (reader);
+ msg->source_port_identity.port_number =
+ gst_byte_reader_get_uint16_be_unchecked (reader);
+
+ msg->sequence_id = gst_byte_reader_get_uint16_be_unchecked (reader);
+ msg->control_field = gst_byte_reader_get_uint8_unchecked (reader);
+ msg->log_message_interval = gst_byte_reader_get_uint8_unchecked (reader);
+
+ return TRUE;
+}
+
+/* IEEE 1588-2008 13.5 */
+static gboolean
+parse_ptp_message_announce (PtpMessage * msg, GstByteReader * reader)
+{
+ g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_ANNOUNCE, FALSE);
+
+ if (gst_byte_reader_get_remaining (reader) < 20)
+ return FALSE;
+
+ if (!parse_ptp_timestamp (&msg->message_specific.announce.origin_timestamp,
+ reader))
+ return FALSE;
+
+ msg->message_specific.announce.current_utc_offset =
+ gst_byte_reader_get_uint16_be_unchecked (reader);
+ gst_byte_reader_skip_unchecked (reader, 1);
+
+ msg->message_specific.announce.grandmaster_priority_1 =
+ gst_byte_reader_get_uint8_unchecked (reader);
+ msg->message_specific.announce.grandmaster_clock_quality.clock_class =
+ gst_byte_reader_get_uint8_unchecked (reader);
+ msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy =
+ gst_byte_reader_get_uint8_unchecked (reader);
+ msg->message_specific.announce.
+ grandmaster_clock_quality.offset_scaled_log_variance =
+ gst_byte_reader_get_uint16_be_unchecked (reader);
+ msg->message_specific.announce.grandmaster_priority_2 =
+ gst_byte_reader_get_uint8_unchecked (reader);
+ msg->message_specific.announce.grandmaster_identity =
+ gst_byte_reader_get_uint64_be_unchecked (reader);
+ msg->message_specific.announce.steps_removed =
+ gst_byte_reader_get_uint16_be_unchecked (reader);
+ msg->message_specific.announce.time_source =
+ gst_byte_reader_get_uint8_unchecked (reader);
+
+ return TRUE;
+}
+
+/* IEEE 1588-2008 13.6 */
+static gboolean
+parse_ptp_message_sync (PtpMessage * msg, GstByteReader * reader)
+{
+ g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_SYNC, FALSE);
+
+ if (gst_byte_reader_get_remaining (reader) < 10)
+ return FALSE;
+
+ if (!parse_ptp_timestamp (&msg->message_specific.sync.origin_timestamp,
+ reader))
+ return FALSE;
+
+ return TRUE;
+}
+
+/* IEEE 1588-2008 13.6 */
+static gboolean
+parse_ptp_message_delay_req (PtpMessage * msg, GstByteReader * reader)
+{
+ g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_REQ, FALSE);
+
+ if (gst_byte_reader_get_remaining (reader) < 10)
+ return FALSE;
+
+ if (!parse_ptp_timestamp (&msg->message_specific.delay_req.origin_timestamp,
+ reader))
+ return FALSE;
+
+ return TRUE;
+}
+
+/* IEEE 1588-2008 13.7 */
+static gboolean
+parse_ptp_message_follow_up (PtpMessage * msg, GstByteReader * reader)
+{
+ g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_FOLLOW_UP, FALSE);
+
+ if (gst_byte_reader_get_remaining (reader) < 10)
+ return FALSE;
+
+ if (!parse_ptp_timestamp (&msg->message_specific.
+ follow_up.precise_origin_timestamp, reader))
+ return FALSE;
+
+ return TRUE;
+}
+
+/* IEEE 1588-2008 13.8 */
+static gboolean
+parse_ptp_message_delay_resp (PtpMessage * msg, GstByteReader * reader)
+{
+ g_return_val_if_fail (msg->message_type == PTP_MESSAGE_TYPE_DELAY_RESP,
+ FALSE);
+
+ if (gst_byte_reader_get_remaining (reader) < 20)
+ return FALSE;
+
+ if (!parse_ptp_timestamp (&msg->message_specific.delay_resp.receive_timestamp,
+ reader))
+ return FALSE;
+
+ msg->message_specific.delay_resp.requesting_port_identity.clock_identity =
+ gst_byte_reader_get_uint64_be_unchecked (reader);
+ msg->message_specific.delay_resp.requesting_port_identity.port_number =
+ gst_byte_reader_get_uint16_be_unchecked (reader);
+
+ return TRUE;
+}
+
+static gboolean
+parse_ptp_message (PtpMessage * msg, const guint8 * data, gsize size)
+{
+ GstByteReader reader;
+ gboolean ret = FALSE;
+
+ gst_byte_reader_init (&reader, data, size);
+
+ if (!parse_ptp_message_header (msg, &reader)) {
+ GST_WARNING ("Failed to parse PTP message header");
+ return FALSE;
+ }
+
+ switch (msg->message_type) {
+ case PTP_MESSAGE_TYPE_SYNC:
+ ret = parse_ptp_message_sync (msg, &reader);
+ break;
+ case PTP_MESSAGE_TYPE_FOLLOW_UP:
+ ret = parse_ptp_message_follow_up (msg, &reader);
+ break;
+ case PTP_MESSAGE_TYPE_DELAY_REQ:
+ ret = parse_ptp_message_delay_req (msg, &reader);
+ break;
+ case PTP_MESSAGE_TYPE_DELAY_RESP:
+ ret = parse_ptp_message_delay_resp (msg, &reader);
+ break;
+ case PTP_MESSAGE_TYPE_ANNOUNCE:
+ ret = parse_ptp_message_announce (msg, &reader);
+ break;
+ default:
+ /* ignore for now */
+ break;
+ }
+
+ return ret;
+}
+
+static gint
+compare_announce_message (const PtpAnnounceMessage * a,
+ const PtpAnnounceMessage * b)
+{
+ /* IEEE 1588 Figure 27 */
+ if (a->grandmaster_identity == b->grandmaster_identity) {
+ if (a->steps_removed + 1 < b->steps_removed)
+ return -1;
+ else if (a->steps_removed > b->steps_removed + 1)
+ return 1;
+
+ /* Error cases are filtered out earlier */
+ if (a->steps_removed < b->steps_removed)
+ return -1;
+ else if (a->steps_removed > b->steps_removed)
+ return 1;
+
+ /* Error cases are filtered out earlier */
+ if (a->master_clock_identity.clock_identity <
+ b->master_clock_identity.clock_identity)
+ return -1;
+ else if (a->master_clock_identity.clock_identity >
+ b->master_clock_identity.clock_identity)
+ return 1;
+
+ /* Error cases are filtered out earlier */
+ if (a->master_clock_identity.port_number <
+ b->master_clock_identity.port_number)
+ return -1;
+ else if (a->master_clock_identity.port_number >
+ b->master_clock_identity.port_number)
+ return 1;
+ else
+ g_assert_not_reached ();
+
+ return 0;
+ }
+
+ if (a->grandmaster_priority_1 < b->grandmaster_priority_1)
+ return -1;
+ else if (a->grandmaster_priority_1 > b->grandmaster_priority_1)
+ return 1;
+
+ if (a->grandmaster_clock_quality.clock_class <
+ b->grandmaster_clock_quality.clock_class)
+ return -1;
+ else if (a->grandmaster_clock_quality.clock_class >
+ b->grandmaster_clock_quality.clock_class)
+ return 1;
+
+ if (a->grandmaster_clock_quality.clock_accuracy <
+ b->grandmaster_clock_quality.clock_accuracy)
+ return -1;
+ else if (a->grandmaster_clock_quality.clock_accuracy >
+ b->grandmaster_clock_quality.clock_accuracy)
+ return 1;
+
+ if (a->grandmaster_clock_quality.offset_scaled_log_variance <
+ b->grandmaster_clock_quality.offset_scaled_log_variance)
+ return -1;
+ else if (a->grandmaster_clock_quality.offset_scaled_log_variance >
+ b->grandmaster_clock_quality.offset_scaled_log_variance)
+ return 1;
+
+ if (a->grandmaster_priority_2 < b->grandmaster_priority_2)
+ return -1;
+ else if (a->grandmaster_priority_2 > b->grandmaster_priority_2)
+ return 1;
+
+ if (a->grandmaster_identity < b->grandmaster_identity)
+ return -1;
+ else if (a->grandmaster_identity > b->grandmaster_identity)
+ return 1;
+ else
+ g_assert_not_reached ();
+
+ return 0;
+}
+
+static void
+select_best_master_clock (PtpDomainData * domain, GstClockTime now)
+{
+ GList *qualified_messages = NULL;
+ GList *l, *m;
+ PtpAnnounceMessage *best = NULL;
+
+ /* IEEE 1588 9.3.2.5 */
+ for (l = domain->announce_senders; l; l = l->next) {
+ PtpAnnounceSender *sender = l->data;
+ GstClockTime window = 4 * sender->announce_interval;
+ gint count = 0;
+
+ for (m = sender->announce_messages.head; m; m = m->next) {
+ PtpAnnounceMessage *msg = m->data;
+
+ if (now - msg->receive_time <= window)
+ count++;
+ }
+
+ /* Only include the newest message of announce senders that had at least 2
+ * announce messages in the last 4 announce intervals. Which also means
+ * that we wait at least 4 announce intervals before we select a master
+ * clock. Until then we just report based on the newest SYNC we received
+ */
+ if (count >= 2) {
+ qualified_messages =
+ g_list_prepend (qualified_messages,
+ g_queue_peek_tail (&sender->announce_messages));
+ }
+ }
+
+ if (!qualified_messages) {
+ GST_DEBUG
+ ("No qualified announce messages for domain %u, can't select a master clock",
+ domain->domain);
+ domain->have_master_clock = FALSE;
+ return;
+ }
+
+ for (l = qualified_messages; l; l = l->next) {
+ PtpAnnounceMessage *msg = l->data;
+
+ if (!best || compare_announce_message (msg, best) < 0)
+ best = msg;
+ }
+
+ if (domain->have_master_clock
+ && compare_clock_identity (&domain->master_clock_identity,
+ &best->master_clock_identity) == 0) {
+ GST_DEBUG ("Master clock in domain %u did not change", domain->domain);
+ } else {
+ GST_DEBUG ("Selected master clock for domain %u: 0x%016" G_GINT64_MODIFIER
+ "x %u with grandmaster clock 0x%016" G_GINT64_MODIFIER "x",
+ domain->domain, best->master_clock_identity.clock_identity,
+ best->master_clock_identity.port_number, best->grandmaster_identity);
+
+ domain->have_master_clock = TRUE;
+ domain->grandmaster_identity = best->grandmaster_identity;
+
+ /* Opportunistic master clock selection likely gave us the same master
+ * clock before, no need to reset all statistics */
+ if (compare_clock_identity (&domain->master_clock_identity,
+ &best->master_clock_identity) != 0) {
+ memcpy (&domain->master_clock_identity, &best->master_clock_identity,
+ sizeof (PtpClockIdentity));
+ domain->mean_path_delay = 0;
+ domain->last_delay_req = 0;
+ domain->min_delay_req_interval = 0;
+ domain->sync_interval = 0;
+ domain->last_ptp_sync_time = 0;
+ domain->skipped_updates = 0;
+ g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
+ NULL);
+ g_queue_clear (&domain->pending_syncs);
+ }
+
+ if (g_atomic_int_get (&domain_stats_n_hooks)) {
+ GstStructure *stats =
+ gst_structure_new (GST_PTP_STATISTICS_BEST_MASTER_CLOCK_SELECTED,
+ "domain", G_TYPE_UINT, domain->domain,
+ "master-clock-id", G_TYPE_UINT64,
+ domain->master_clock_identity.clock_identity,
+ "master-clock-port", G_TYPE_UINT,
+ domain->master_clock_identity.port_number,
+ "grandmaster-clock-id", G_TYPE_UINT64, domain->grandmaster_identity,
+ NULL);
+ emit_ptp_statistics (domain->domain, stats);
+ gst_structure_free (stats);
+ }
+ }
+}
+
+static void
+handle_announce_message (PtpMessage * msg, GstClockTime receive_time)
+{
+ GList *l;
+ PtpDomainData *domain = NULL;
+ PtpAnnounceSender *sender = NULL;
+ PtpAnnounceMessage *announce;
+
+ /* IEEE1588 9.3.2.2 e)
+ * Don't consider messages with the alternate master flag set
+ */
+ if ((msg->flag_field & 0x0100))
+ return;
+
+ /* IEEE 1588 9.3.2.5 d)
+ * Don't consider announce messages with steps_removed>=255
+ */
+ if (msg->message_specific.announce.steps_removed >= 255)
+ return;
+
+ for (l = domain_data; l; l = l->next) {
+ PtpDomainData *tmp = l->data;
+
+ if (tmp->domain == msg->domain_number) {
+ domain = tmp;
+ break;
+ }
+ }
+
+ if (!domain) {
+ gchar *clock_name;
+
+ domain = g_new0 (PtpDomainData, 1);
+ domain->domain = msg->domain_number;
+ clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
+ domain->domain_clock =
+ g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
+ g_free (clock_name);
+ g_queue_init (&domain->pending_syncs);
+ domain_data = g_list_prepend (domain_data, domain);
+
+ g_mutex_lock (&domain_clocks_lock);
+ domain_clocks = g_list_prepend (domain_clocks, domain);
+ g_mutex_unlock (&domain_clocks_lock);
+
+ if (g_atomic_int_get (&domain_stats_n_hooks)) {
+ GstStructure *stats =
+ gst_structure_new (GST_PTP_STATISTICS_NEW_DOMAIN_FOUND, "domain",
+ G_TYPE_UINT, domain->domain, "clock", GST_TYPE_CLOCK,
+ domain->domain_clock, NULL);
+ emit_ptp_statistics (domain->domain, stats);
+ gst_structure_free (stats);
+ }
+ }
+
+ for (l = domain->announce_senders; l; l = l->next) {
+ PtpAnnounceSender *tmp = l->data;
+
+ if (compare_clock_identity (&tmp->master_clock_identity,
+ &msg->source_port_identity) == 0) {
+ sender = tmp;
+ break;
+ }
+ }
+
+ if (!sender) {
+ sender = g_new0 (PtpAnnounceSender, 1);
+
+ memcpy (&sender->master_clock_identity, &msg->source_port_identity,
+ sizeof (PtpClockIdentity));
+ g_queue_init (&sender->announce_messages);
+ domain->announce_senders =
+ g_list_prepend (domain->announce_senders, sender);
+ }
+
+ for (l = sender->announce_messages.head; l; l = l->next) {
+ PtpAnnounceMessage *tmp = l->data;
+
+ /* IEEE 1588 9.3.2.5 c)
+ * Don't consider identical messages, i.e. duplicates
+ */
+ if (tmp->sequence_id == msg->sequence_id)
+ return;
+ }
+
+ sender->announce_interval = log2_to_clock_time (msg->log_message_interval);
+
+ announce = g_new0 (PtpAnnounceMessage, 1);
+ announce->receive_time = receive_time;
+ announce->sequence_id = msg->sequence_id;
+ memcpy (&announce->master_clock_identity, &msg->source_port_identity,
+ sizeof (PtpClockIdentity));
+ announce->grandmaster_identity =
+ msg->message_specific.announce.grandmaster_identity;
+ announce->grandmaster_priority_1 =
+ msg->message_specific.announce.grandmaster_priority_1;
+ announce->grandmaster_clock_quality.clock_class =
+ msg->message_specific.announce.grandmaster_clock_quality.clock_class;
+ announce->grandmaster_clock_quality.clock_accuracy =
+ msg->message_specific.announce.grandmaster_clock_quality.clock_accuracy;
+ announce->grandmaster_clock_quality.offset_scaled_log_variance =
+ msg->message_specific.announce.
+ grandmaster_clock_quality.offset_scaled_log_variance;
+ announce->grandmaster_priority_2 =
+ msg->message_specific.announce.grandmaster_priority_2;
+ announce->steps_removed = msg->message_specific.announce.steps_removed;
+ announce->time_source = msg->message_specific.announce.time_source;
+ g_queue_push_tail (&sender->announce_messages, announce);
+
+ select_best_master_clock (domain, receive_time);
+}
+
+static gboolean
+send_delay_req_timeout (PtpPendingSync * sync)
+{
+ StdIOHeader header = { 0, };
+ guint8 delay_req[44];
+ GstByteWriter writer;
+ GIOStatus status;
+ gsize written;
+ GError *err = NULL;
+
+ header.type = TYPE_EVENT;
+ header.size = 44;
+
+ gst_byte_writer_init_with_data (&writer, delay_req, 44, FALSE);
+ gst_byte_writer_put_uint8_unchecked (&writer, PTP_MESSAGE_TYPE_DELAY_REQ);
+ gst_byte_writer_put_uint8_unchecked (&writer, 2);
+ gst_byte_writer_put_uint16_be_unchecked (&writer, 44);
+ gst_byte_writer_put_uint8_unchecked (&writer, sync->domain);
+ gst_byte_writer_put_uint8_unchecked (&writer, 0);
+ gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
+ gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
+ gst_byte_writer_put_uint32_be_unchecked (&writer, 0);
+ gst_byte_writer_put_uint64_be_unchecked (&writer,
+ ptp_clock_id.clock_identity);
+ gst_byte_writer_put_uint16_be_unchecked (&writer, ptp_clock_id.port_number);
+ gst_byte_writer_put_uint16_be_unchecked (&writer, sync->delay_req_seqnum);
+ gst_byte_writer_put_uint8_unchecked (&writer, 0x01);
+ gst_byte_writer_put_uint8_unchecked (&writer, 0x7f);
+ gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
+ gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
+
+ status =
+ g_io_channel_write_chars (stdout_channel, (gchar *) & header,
+ sizeof (header), &written, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_warning ("Failed to write to stdout: %s", err->message);
+ return G_SOURCE_REMOVE;
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdout");
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_warning ("Unexpected stdout write status: %d", status);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (written != sizeof (header)) {
+ g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ }
+
+ sync->delay_req_send_time_local = gst_util_get_timestamp ();
+
+ status =
+ g_io_channel_write_chars (stdout_channel,
+ (const gchar *) delay_req, 44, &written, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ g_warning ("Failed to write to stdout: %s", err->message);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status == G_IO_STATUS_EOF) {
+ g_message ("EOF on stdout");
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status != G_IO_STATUS_NORMAL) {
+ g_warning ("Unexpected stdout write status: %d", status);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (written != 44) {
+ g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ }
+
+ return G_SOURCE_REMOVE;
+}
+
+static gboolean
+send_delay_req (PtpDomainData * domain, PtpPendingSync * sync)
+{
+ GstClockTime now = gst_util_get_timestamp ();
+ guint timeout;
+ GSource *timeout_source;
+
+ if (domain->last_delay_req != 0
+ && domain->last_delay_req + domain->min_delay_req_interval > now)
+ return FALSE;
+
+ domain->last_delay_req = now;
+ sync->delay_req_seqnum = domain->last_delay_req_seqnum++;
+
+ /* IEEE 1588 9.5.11.2 */
+ if (domain->last_delay_req == 0 || domain->min_delay_req_interval == 0)
+ timeout = 0;
+ else
+ timeout =
+ g_rand_int_range (delay_req_rand, 0,
+ (domain->min_delay_req_interval * 2) / GST_MSECOND);
+
+ sync->timeout_source = timeout_source = g_timeout_source_new (timeout);
+ g_source_set_priority (timeout_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (timeout_source, (GSourceFunc) send_delay_req_timeout,
+ sync, NULL);
+ g_source_attach (timeout_source, main_context);
+
+ return TRUE;
+}
+
+/* Filtering of outliers for RTT and time calculations inspired
+ * by the code from gstnetclientclock.c
+ */
+static void
+update_ptp_time (PtpDomainData * domain, PtpPendingSync * sync)
+{
+ GstClockTime internal_time, external_time, rate_num, rate_den;
+ GstClockTime orig_internal_time, orig_external_time, orig_rate_num,
+ orig_rate_den;
+ GstClockTime corrected_ptp_time, corrected_local_time;
+ GstClockTime max_discont, estimated_ptp_time_min, estimated_ptp_time_max;
+ gdouble r_squared;
+ gboolean synced, now_synced;
+ GstClockTimeDiff discont = 0;
+ GstClockTime estimated_ptp_time = GST_CLOCK_TIME_NONE, new_estimated_ptp_time;
+
+ /* We check this here and when updating the mean path delay, because
+ * we can get here without a delay response too */
+ if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE
+ && sync->follow_up_recv_time_local >
+ sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
+ GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
+ " > 2 * %" GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (sync->follow_up_recv_time_local),
+ GST_TIME_ARGS (domain->mean_path_delay));
+ goto out;
+ }
+
+ /* IEEE 1588 11.2 */
+ corrected_ptp_time =
+ sync->sync_send_time_remote +
+ (sync->correction_field_sync + 32768) / 65536;
+ corrected_local_time = sync->sync_recv_time_local - domain->mean_path_delay;
+
+ /* Set an initial local-remote relation */
+ if (domain->last_ptp_time == 0)
+ gst_clock_set_calibration (domain->domain_clock, corrected_local_time,
+ corrected_ptp_time, 1, 1);
+
+ /* Check if the corrected PTP time is +/- 3/4 RTT around what we would
+ * estimate with our present knowledge about the clock
+ */
+ /* Store what the clock produced as 'now' before this update */
+ gst_clock_get_calibration (GST_CLOCK_CAST (domain->domain_clock),
+ &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den);
+ internal_time = orig_internal_time;
+ external_time = orig_external_time;
+ rate_num = orig_rate_num;
+ rate_den = orig_rate_den;
+
+ /* 3/4 RTT window around the estimation */
+ max_discont = domain->mean_path_delay * 3 / 2;
+
+ /* Check if the estimated sync time is inside our window */
+ estimated_ptp_time_min = corrected_local_time - max_discont;
+ estimated_ptp_time_min =
+ gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
+ estimated_ptp_time_min, internal_time, external_time, rate_num, rate_den);
+ estimated_ptp_time_max = corrected_local_time + max_discont;
+ estimated_ptp_time_max =
+ gst_clock_adjust_with_calibration (GST_CLOCK_CAST (domain->domain_clock),
+ estimated_ptp_time_max, internal_time, external_time, rate_num, rate_den);
+
+ synced = (estimated_ptp_time_min < corrected_ptp_time
+ && corrected_ptp_time < estimated_ptp_time_max);
+
+ GST_DEBUG ("Adding observation for domain %u: %" GST_TIME_FORMAT " - %"
+ GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (corrected_ptp_time), GST_TIME_ARGS (corrected_local_time));
+
+ GST_DEBUG ("Synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
+ GST_TIME_FORMAT, synced, GST_TIME_ARGS (estimated_ptp_time_min),
+ GST_TIME_ARGS (corrected_ptp_time),
+ GST_TIME_ARGS (estimated_ptp_time_max));
+
+ if (gst_clock_add_observation_unapplied (domain->domain_clock,
+ corrected_local_time, corrected_ptp_time, &r_squared,
+ &internal_time, &external_time, &rate_num, &rate_den)) {
+ GST_DEBUG ("Regression gave r_squared: %f", r_squared);
+
+ /* Old estimated PTP time based on receive time and path delay */
+ estimated_ptp_time = corrected_local_time;
+ estimated_ptp_time =
+ gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+ (domain->domain_clock), estimated_ptp_time, orig_internal_time,
+ orig_external_time, orig_rate_num, orig_rate_den);
+
+ /* New estimated PTP time based on receive time and path delay */
+ new_estimated_ptp_time = corrected_local_time;
+ new_estimated_ptp_time =
+ gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+ (domain->domain_clock), new_estimated_ptp_time, internal_time,
+ external_time, rate_num, rate_den);
+
+ discont = GST_CLOCK_DIFF (estimated_ptp_time, new_estimated_ptp_time);
+ if (synced && ABS (discont) > max_discont) {
+ GstClockTimeDiff offset;
+ GST_DEBUG ("Too large a discont %s%" GST_TIME_FORMAT
+ ", clamping to 1/4 average RTT = %" GST_TIME_FORMAT,
+ (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
+ GST_TIME_ARGS (max_discont));
+ if (discont > 0) { /* Too large a forward step - add a -ve offset */
+ offset = max_discont - discont;
+ if (-offset > external_time)
+ external_time = 0;
+ else
+ external_time += offset;
+ } else { /* Too large a backward step - add a +ve offset */
+ offset = -(max_discont + discont);
+ external_time += offset;
+ }
+
+ discont += offset;
+ } else {
+ GST_DEBUG ("Discont %s%" GST_TIME_FORMAT " (max: %" GST_TIME_FORMAT ")",
+ (discont < 0 ? "-" : ""), GST_TIME_ARGS (ABS (discont)),
+ GST_TIME_ARGS (max_discont));
+ }
+
+ /* Check if the estimated sync time is now (still) inside our window */
+ estimated_ptp_time_min = corrected_local_time - max_discont;
+ estimated_ptp_time_min =
+ gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+ (domain->domain_clock), estimated_ptp_time_min, internal_time,
+ external_time, rate_num, rate_den);
+ estimated_ptp_time_max = corrected_local_time + max_discont;
+ estimated_ptp_time_max =
+ gst_clock_adjust_with_calibration (GST_CLOCK_CAST
+ (domain->domain_clock), estimated_ptp_time_max, internal_time,
+ external_time, rate_num, rate_den);
+
+ now_synced = (estimated_ptp_time_min < corrected_ptp_time
+ && corrected_ptp_time < estimated_ptp_time_max);
+
+ GST_DEBUG ("Now synced %d: %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT " < %"
+ GST_TIME_FORMAT, now_synced, GST_TIME_ARGS (estimated_ptp_time_min),
+ GST_TIME_ARGS (corrected_ptp_time),
+ GST_TIME_ARGS (estimated_ptp_time_max));
+
+ if (synced || now_synced || domain->skipped_updates > MAX_SKIPPED_UPDATES) {
+ gst_clock_set_calibration (GST_CLOCK_CAST (domain->domain_clock),
+ internal_time, external_time, rate_num, rate_den);
+ domain->skipped_updates = 0;
+
+ domain->last_ptp_time = corrected_ptp_time;
+ domain->last_local_time = corrected_local_time;
+ } else {
+ domain->skipped_updates++;
+ }
+ } else {
+ domain->last_ptp_time = corrected_ptp_time;
+ domain->last_local_time = corrected_local_time;
+ }
+
+out:
+
+ if (g_atomic_int_get (&domain_stats_n_hooks)) {
+ GstStructure *stats = gst_structure_new (GST_PTP_STATISTICS_TIME_UPDATED,
+ "domain", G_TYPE_UINT, domain->domain,
+ "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
+ "local-time", GST_TYPE_CLOCK_TIME, corrected_local_time,
+ "ptp-time", GST_TYPE_CLOCK_TIME, corrected_ptp_time,
+ "estimated-ptp-time", GST_TYPE_CLOCK_TIME, estimated_ptp_time,
+ "discontinuity", G_TYPE_INT64, discont,
+ "synced", G_TYPE_BOOLEAN, synced,
+ "r-squared", G_TYPE_DOUBLE, r_squared,
+ "internal-time", GST_TYPE_CLOCK_TIME, internal_time,
+ "external-time", GST_TYPE_CLOCK_TIME, external_time,
+ "rate-num", G_TYPE_UINT64, rate_num,
+ "rate-den", G_TYPE_UINT64, rate_den,
+ "rate", G_TYPE_DOUBLE, (gdouble) (rate_num) / rate_den,
+ NULL);
+ emit_ptp_statistics (domain->domain, stats);
+ gst_structure_free (stats);
+ }
+
+}
+
+static gboolean
+update_mean_path_delay (PtpDomainData * domain, PtpPendingSync * sync)
+{
+ GstClockTime mean_path_delay, delay_req_delay;
+ gboolean ret;
+
+ /* IEEE 1588 11.3 */
+ mean_path_delay =
+ (sync->delay_req_recv_time_remote - sync->sync_send_time_remote +
+ sync->sync_recv_time_local - sync->delay_req_send_time_local -
+ (sync->correction_field_sync + sync->correction_field_delay +
+ 32768) / 65536) / 2;
+
+ /* Track an average round trip time, for a bit of smoothing */
+ /* Always update before discarding a sample, so genuine changes in
+ * the network get picked up, eventually */
+ if (domain->mean_path_delay == 0)
+ domain->mean_path_delay = mean_path_delay;
+ else if (mean_path_delay < domain->mean_path_delay) /* Shorter RTTs carry more weight than longer */
+ domain->mean_path_delay =
+ (3 * domain->mean_path_delay + mean_path_delay) / 4;
+ else
+ domain->mean_path_delay =
+ (15 * domain->mean_path_delay + mean_path_delay) / 16;
+
+ if (sync->follow_up_recv_time_local != GST_CLOCK_TIME_NONE &&
+ domain->mean_path_delay != 0
+ && sync->follow_up_recv_time_local >
+ sync->sync_recv_time_local + 2 * domain->mean_path_delay) {
+ GST_WARNING ("Sync-follow-up delay for domain %u too big: %" GST_TIME_FORMAT
+ " > 2 * %" GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (sync->follow_up_recv_time_local),
+ GST_TIME_ARGS (domain->mean_path_delay));
+ ret = FALSE;
+ goto out;
+ }
+
+ if (mean_path_delay > 2 * domain->mean_path_delay) {
+ GST_WARNING ("Mean path delay for domain %u too big: %" GST_TIME_FORMAT
+ " > 2 * %" GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (mean_path_delay),
+ GST_TIME_ARGS (domain->mean_path_delay));
+ ret = FALSE;
+ goto out;
+ }
+
+ delay_req_delay =
+ sync->delay_resp_recv_time_local - sync->delay_req_send_time_local;
+ /* delay_req_delay is a RTT, so 2 times the path delay */
+ if (delay_req_delay > 4 * domain->mean_path_delay) {
+ GST_WARNING ("Delay-request-response delay for domain %u too big: %"
+ GST_TIME_FORMAT " > 4 * %" GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (delay_req_delay),
+ GST_TIME_ARGS (domain->mean_path_delay));
+ ret = FALSE;
+ goto out;
+ }
+
+ ret = TRUE;
+
+ GST_DEBUG ("Got mean path delay for domain %u: %" GST_TIME_FORMAT " (new: %"
+ GST_TIME_FORMAT ")", domain->domain,
+ GST_TIME_ARGS (domain->mean_path_delay), GST_TIME_ARGS (mean_path_delay));
+ GST_DEBUG ("Delay request delay for domain %u: %" GST_TIME_FORMAT,
+ domain->domain, GST_TIME_ARGS (delay_req_delay));
+
+out:
+ if (g_atomic_int_get (&domain_stats_n_hooks)) {
+ GstStructure *stats =
+ gst_structure_new (GST_PTP_STATISTICS_PATH_DELAY_MEASURED,
+ "domain", G_TYPE_UINT, domain->domain,
+ "mean-path-delay-avg", GST_TYPE_CLOCK_TIME, domain->mean_path_delay,
+ "mean-path-delay", GST_TYPE_CLOCK_TIME, mean_path_delay,
+ "delay-request-delay", GST_TYPE_CLOCK_TIME, delay_req_delay, NULL);
+ emit_ptp_statistics (domain->domain, stats);
+ gst_structure_free (stats);
+ }
+
+ return ret;
+}
+
+static void
+handle_sync_message (PtpMessage * msg, GstClockTime receive_time)
+{
+ GList *l;
+ PtpDomainData *domain = NULL;
+ PtpPendingSync *sync = NULL;
+
+ /* Don't consider messages with the alternate master flag set */
+ if ((msg->flag_field & 0x0100))
+ return;
+
+ for (l = domain_data; l; l = l->next) {
+ PtpDomainData *tmp = l->data;
+
+ if (msg->domain_number == tmp->domain) {
+ domain = tmp;
+ break;
+ }
+ }
+
+ if (!domain) {
+ gchar *clock_name;
+ domain = g_new0 (PtpDomainData, 1);
+ domain->domain = msg->domain_number;
+ clock_name = g_strdup_printf ("ptp-clock-%u", domain->domain);
+ domain->domain_clock =
+ g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", clock_name, NULL);
+ g_free (clock_name);
+ g_queue_init (&domain->pending_syncs);
+ domain_data = g_list_prepend (domain_data, domain);
+
+ g_mutex_lock (&domain_clocks_lock);
+ domain_clocks = g_list_prepend (domain_clocks, domain);
+ g_mutex_unlock (&domain_clocks_lock);
+ }
+
+ /* If we have a master clock, ignore this message if it's not coming from there */
+ if (domain->have_master_clock
+ && compare_clock_identity (&domain->master_clock_identity,
+ &msg->source_port_identity) != 0)
+ return;
+
+ /* Opportunistic selection of master clock */
+ if (!domain->have_master_clock)
+ memcpy (&domain->master_clock_identity, &msg->source_port_identity,
+ sizeof (PtpClockIdentity));
+
+ domain->sync_interval = log2_to_clock_time (msg->log_message_interval);
+
+ /* Check if duplicated */
+ for (l = domain->pending_syncs.head; l; l = l->next) {
+ PtpPendingSync *tmp = l->data;
+
+ if (tmp->sync_seqnum == msg->sequence_id)
+ return;
+ }
+
+ if (msg->message_specific.sync.origin_timestamp.seconds_field >
+ GST_CLOCK_TIME_NONE / GST_SECOND) {
+ GST_FIXME ("Unsupported sync message seconds field value: %"
+ G_GUINT64_FORMAT " > %" G_GUINT64_FORMAT,
+ msg->message_specific.sync.origin_timestamp.seconds_field,
+ GST_CLOCK_TIME_NONE / GST_SECOND);
+ return;
+ }
+
+ sync = g_new0 (PtpPendingSync, 1);
+ sync->domain = domain->domain;
+ sync->sync_seqnum = msg->sequence_id;
+ sync->sync_recv_time_local = receive_time;
+ sync->sync_send_time_remote = GST_CLOCK_TIME_NONE;
+ sync->follow_up_recv_time_local = GST_CLOCK_TIME_NONE;
+ sync->delay_req_send_time_local = GST_CLOCK_TIME_NONE;
+ sync->delay_req_recv_time_remote = GST_CLOCK_TIME_NONE;
+ sync->delay_resp_recv_time_local = GST_CLOCK_TIME_NONE;
+
+ /* 0.5 correction factor for division later */
+ sync->correction_field_sync = msg->correction_field;
+
+ if ((msg->flag_field & 0x0200)) {
+ /* Wait for FOLLOW_UP */
+ } else {
+ sync->sync_send_time_remote =
+ PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
+ sync.origin_timestamp);
+
+ if (domain->last_ptp_sync_time != 0
+ && domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
+ GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
+ GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (domain->last_ptp_sync_time),
+ GST_TIME_ARGS (sync->sync_send_time_remote));
+ ptp_pending_sync_free (sync);
+ sync = NULL;
+ return;
+ }
+ domain->last_ptp_sync_time = sync->sync_send_time_remote;
+
+ if (send_delay_req (domain, sync)) {
+ /* Sent delay request */
+ } else {
+ update_ptp_time (domain, sync);
+ ptp_pending_sync_free (sync);
+ sync = NULL;
+ }
+ }
+
+ if (sync)
+ g_queue_push_tail (&domain->pending_syncs, sync);
+}
+
+static void
+handle_follow_up_message (PtpMessage * msg, GstClockTime receive_time)
+{
+ GList *l;
+ PtpDomainData *domain = NULL;
+ PtpPendingSync *sync = NULL;
+
+ /* Don't consider messages with the alternate master flag set */
+ if ((msg->flag_field & 0x0100))
+ return;
+
+ for (l = domain_data; l; l = l->next) {
+ PtpDomainData *tmp = l->data;
+
+ if (msg->domain_number == tmp->domain) {
+ domain = tmp;
+ break;
+ }
+ }
+
+ if (!domain)
+ return;
+
+ /* If we have a master clock, ignore this message if it's not coming from there */
+ if (domain->have_master_clock
+ && compare_clock_identity (&domain->master_clock_identity,
+ &msg->source_port_identity) != 0)
+ return;
+
+ /* Check if we know about this one */
+ for (l = domain->pending_syncs.head; l; l = l->next) {
+ PtpPendingSync *tmp = l->data;
+
+ if (tmp->sync_seqnum == msg->sequence_id) {
+ sync = tmp;
+ break;
+ }
+ }
+
+ if (!sync)
+ return;
+
+ /* Got a FOLLOW_UP for this already */
+ if (sync->sync_send_time_remote != GST_CLOCK_TIME_NONE)
+ return;
+
+ if (sync->sync_recv_time_local >= receive_time) {
+ GST_ERROR ("Got bogus follow up in domain %u: %" GST_TIME_FORMAT " > %"
+ GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (sync->sync_recv_time_local),
+ GST_TIME_ARGS (receive_time));
+ g_queue_remove (&domain->pending_syncs, sync);
+ ptp_pending_sync_free (sync);
+ return;
+ }
+
+ sync->correction_field_sync += msg->correction_field;
+ sync->sync_send_time_remote =
+ PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
+ follow_up.precise_origin_timestamp);
+ sync->follow_up_recv_time_local = receive_time;
+
+ if (domain->last_ptp_sync_time >= sync->sync_send_time_remote) {
+ GST_WARNING ("Backwards PTP times in domain %u: %" GST_TIME_FORMAT " >= %"
+ GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (domain->last_ptp_sync_time),
+ GST_TIME_ARGS (sync->sync_send_time_remote));
+ g_queue_remove (&domain->pending_syncs, sync);
+ ptp_pending_sync_free (sync);
+ sync = NULL;
+ return;
+ }
+ domain->last_ptp_sync_time = sync->sync_send_time_remote;
+
+ if (send_delay_req (domain, sync)) {
+ /* Sent delay request */
+ } else {
+ update_ptp_time (domain, sync);
+ g_queue_remove (&domain->pending_syncs, sync);
+ ptp_pending_sync_free (sync);
+ sync = NULL;
+ }
+}
+
+static void
+handle_delay_resp_message (PtpMessage * msg, GstClockTime receive_time)
+{
+ GList *l;
+ PtpDomainData *domain = NULL;
+ PtpPendingSync *sync = NULL;
+
+ /* Don't consider messages with the alternate master flag set */
+ if ((msg->flag_field & 0x0100))
+ return;
+
+ for (l = domain_data; l; l = l->next) {
+ PtpDomainData *tmp = l->data;
+
+ if (msg->domain_number == tmp->domain) {
+ domain = tmp;
+ break;
+ }
+ }
+
+ if (!domain)
+ return;
+
+ /* If we have a master clock, ignore this message if it's not coming from there */
+ if (domain->have_master_clock
+ && compare_clock_identity (&domain->master_clock_identity,
+ &msg->source_port_identity) != 0)
+ return;
+
+ /* Not for us */
+ if (msg->message_specific.delay_resp.
+ requesting_port_identity.clock_identity != ptp_clock_id.clock_identity
+ || msg->message_specific.delay_resp.
+ requesting_port_identity.port_number != ptp_clock_id.port_number)
+ return;
+
+ domain->min_delay_req_interval =
+ log2_to_clock_time (msg->log_message_interval);
+
+ /* Check if we know about this one */
+ for (l = domain->pending_syncs.head; l; l = l->next) {
+ PtpPendingSync *tmp = l->data;
+
+ if (tmp->delay_req_seqnum == msg->sequence_id) {
+ sync = tmp;
+ break;
+ }
+ }
+
+ if (!sync)
+ return;
+
+ /* Got a DELAY_RESP for this already */
+ if (sync->delay_req_recv_time_remote != GST_CLOCK_TIME_NONE)
+ return;
+
+ if (sync->delay_req_send_time_local > receive_time) {
+ GST_ERROR ("Got bogus delay response in domain %u: %" GST_TIME_FORMAT " > %"
+ GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (sync->delay_req_send_time_local),
+ GST_TIME_ARGS (receive_time));
+ g_queue_remove (&domain->pending_syncs, sync);
+ ptp_pending_sync_free (sync);
+ return;
+ }
+
+ sync->correction_field_delay = msg->correction_field;
+
+ sync->delay_req_recv_time_remote =
+ PTP_TIMESTAMP_TO_GST_CLOCK_TIME (msg->message_specific.
+ delay_resp.receive_timestamp);
+ sync->delay_resp_recv_time_local = receive_time;
+
+ if (domain->mean_path_delay != 0
+ && sync->sync_send_time_remote > sync->delay_req_recv_time_remote) {
+ GST_WARNING ("Sync send time after delay req receive time for domain %u: %"
+ GST_TIME_FORMAT " > %" GST_TIME_FORMAT, domain->domain,
+ GST_TIME_ARGS (sync->sync_send_time_remote),
+ GST_TIME_ARGS (sync->delay_req_recv_time_remote));
+ g_queue_remove (&domain->pending_syncs, sync);
+ ptp_pending_sync_free (sync);
+ return;
+ }
+
+ if (update_mean_path_delay (domain, sync))
+ update_ptp_time (domain, sync);
+ g_queue_remove (&domain->pending_syncs, sync);
+ ptp_pending_sync_free (sync);
+}
+
+static void
+handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
+{
+ /* Ignore our own messages */
+ if (msg->source_port_identity.clock_identity == ptp_clock_id.clock_identity &&
+ msg->source_port_identity.port_number == ptp_clock_id.port_number)
+ return;
+
+ switch (msg->message_type) {
+ case PTP_MESSAGE_TYPE_ANNOUNCE:
+ handle_announce_message (msg, receive_time);
+ break;
+ case PTP_MESSAGE_TYPE_SYNC:
+ handle_sync_message (msg, receive_time);
+ break;
+ case PTP_MESSAGE_TYPE_FOLLOW_UP:
+ handle_follow_up_message (msg, receive_time);
+ break;
+ case PTP_MESSAGE_TYPE_DELAY_RESP:
+ handle_delay_resp_message (msg, receive_time);
+ break;
+ default:
+ break;
+ }
+}
+
+static gboolean
+have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
+ gpointer user_data)
+{
+ GIOStatus status;
+ StdIOHeader header;
+ gchar buffer[8192];
+ GError *err = NULL;
+ gsize read;
+
+ if ((condition & G_IO_STATUS_EOF)) {
+ GST_ERROR ("Got EOF on stdin");
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ }
+
+ status =
+ g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header),
+ &read, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ GST_ERROR ("Failed to read from stdin: %s", err->message);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status == G_IO_STATUS_EOF) {
+ GST_ERROR ("Got EOF on stdin");
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status != G_IO_STATUS_NORMAL) {
+ GST_ERROR ("Unexpected stdin read status: %d", status);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (read != sizeof (header)) {
+ GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (header.size > 8192) {
+ GST_ERROR ("Unexpected size: %u", header.size);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ }
+
+ status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err);
+ if (status == G_IO_STATUS_ERROR) {
+ GST_ERROR ("Failed to read from stdin: %s", err->message);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status == G_IO_STATUS_EOF) {
+ GST_ERROR ("EOF on stdin");
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (status != G_IO_STATUS_NORMAL) {
+ GST_ERROR ("Unexpected stdin read status: %d", status);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ } else if (read != header.size) {
+ GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ }
+
+ switch (header.type) {
+ case TYPE_EVENT:
+ case TYPE_GENERAL:{
+ GstClockTime receive_time = gst_util_get_timestamp ();
+ PtpMessage msg;
+
+ if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) {
+ dump_ptp_message (&msg);
+ handle_ptp_message (&msg, receive_time);
+ }
+ break;
+ }
+ default:
+ case TYPE_CLOCK_ID:{
+ if (header.size != 8) {
+ GST_ERROR ("Unexpected clock id size (%u != 8)", header.size);
+ g_main_loop_quit (main_loop);
+ return G_SOURCE_REMOVE;
+ }
+ g_mutex_lock (&ptp_lock);
+ ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer);
+ ptp_clock_id.port_number = getpid ();
+ GST_DEBUG ("Got clock id 0x%016" G_GINT64_MODIFIER "x %u",
+ ptp_clock_id.clock_identity, ptp_clock_id.port_number);
+ g_cond_signal (&ptp_cond);
+ g_mutex_unlock (&ptp_lock);
+ break;
+ }
+ }
+
+ return G_SOURCE_CONTINUE;
+}
+
+/* Cleanup all announce messages and announce message senders
+ * that are timed out by now, and clean up all pending syncs
+ * that are missing their FOLLOW_UP or DELAY_RESP */
+static gboolean
+cleanup_cb (gpointer data)
+{
+ GstClockTime now = gst_util_get_timestamp ();
+ GList *l, *m, *n;
+
+ for (l = domain_data; l; l = l->next) {
+ PtpDomainData *domain = l->data;
+
+ for (n = domain->announce_senders; n;) {
+ PtpAnnounceSender *sender = n->data;
+ gboolean timed_out = TRUE;
+
+ /* Keep only 5 messages per sender around */
+ while (g_queue_get_length (&sender->announce_messages) > 5) {
+ PtpAnnounceMessage *msg = g_queue_pop_head (&sender->announce_messages);
+ g_free (msg);
+ }
+
+ for (m = sender->announce_messages.head; m; m = m->next) {
+ PtpAnnounceMessage *msg = m->data;
+
+ if (msg->receive_time +
+ sender->announce_interval * PTP_ANNOUNCE_RECEIPT_TIMEOUT > now) {
+ timed_out = FALSE;
+ break;
+ }
+ }
+
+ if (timed_out) {
+ GST_DEBUG ("Announce sender 0x%016" G_GINT64_MODIFIER "x %u timed out",
+ sender->master_clock_identity.clock_identity,
+ sender->master_clock_identity.port_number);
+ g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
+ g_queue_clear (&sender->announce_messages);
+ }
+
+ if (g_queue_get_length (&sender->announce_messages) == 0) {
+ GList *tmp = n->next;
+
+ if (compare_clock_identity (&sender->master_clock_identity,
+ &domain->master_clock_identity) == 0)
+ GST_WARNING ("currently selected master clock timed out");
+ g_free (sender);
+ domain->announce_senders =
+ g_list_delete_link (domain->announce_senders, n);
+ n = tmp;
+ } else {
+ n = n->next;
+ }
+ }
+ select_best_master_clock (domain, now);
+
+ /* Clean up any pending syncs */
+ for (n = domain->pending_syncs.head; n;) {
+ PtpPendingSync *sync = n->data;
+ gboolean timed_out = FALSE;
+
+ /* Time out pending syncs after 4 sync intervals or 10 seconds,
+ * and pending delay reqs after 4 delay req intervals or 10 seconds
+ */
+ if (sync->delay_req_send_time_local != GST_CLOCK_TIME_NONE &&
+ ((domain->min_delay_req_interval != 0
+ && sync->delay_req_send_time_local +
+ 4 * domain->min_delay_req_interval < now)
+ || (sync->delay_req_send_time_local + 10 * GST_SECOND < now))) {
+ timed_out = TRUE;
+ } else if ((domain->sync_interval != 0
+ && sync->sync_recv_time_local + 4 * domain->sync_interval < now)
+ || (sync->sync_recv_time_local + 10 * GST_SECOND < now)) {
+ timed_out = TRUE;
+ }
+
+ if (timed_out) {
+ GList *tmp = n->next;
+ ptp_pending_sync_free (sync);
+ g_queue_delete_link (&domain->pending_syncs, n);
+ n = tmp;
+ } else {
+ n = n->next;
+ }
+ }
+ }
+
+ return G_SOURCE_CONTINUE;
+}
+
+static gpointer
+ptp_helper_main (gpointer data)
+{
+ GSource *cleanup_source;
+
+ GST_DEBUG ("Starting PTP helper loop");
+
+ /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
+ cleanup_source = g_timeout_source_new_seconds (5);
+ g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (cleanup_source, (GSourceFunc) cleanup_cb, NULL, NULL);
+ g_source_attach (cleanup_source, main_context);
+ g_source_unref (cleanup_source);
+
+ g_main_loop_run (main_loop);
+ GST_DEBUG ("Stopped PTP helper loop");
+
+ g_mutex_lock (&ptp_lock);
+ ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
+ ptp_clock_id.port_number = 0;
+ initted = FALSE;
+ g_cond_signal (&ptp_cond);
+ g_mutex_unlock (&ptp_lock);
+
+ return NULL;
+}
+
+/**
+ * gst_ptp_is_supported:
+ *
+ * Check if PTP clocks are generally supported on this system, and if previous
+ * initializations did not fail.
+ *
+ * Returns: %TRUE if PTP clocks are generally supported on this system, and
+ * previous initializations did not fail.
+ *
+ * Since: 1.6
+ */
+gboolean
+gst_ptp_is_supported (void)
+{
+ return supported;
+}
+
+/**
+ * gst_ptp_is_initialized:
+ *
+ * Check if the GStreamer PTP clock subsystem is initialized.
+ *
+ * Returns: %TRUE if the GStreamer PTP clock subsystem is intialized.
+ *
+ * Since: 1.6
+ */
+gboolean
+gst_ptp_is_initialized (void)
+{
+ return initted;
+}
+
+/**
+ * gst_ptp_init:
+ * @clock_id: PTP clock id of this process' clock or %GST_PTP_CLOCK_ID_NONE
+ * @interfaces: (transfer none) (array zero-terminated=1): network interfaces to run the clock on
+ *
+ * Initialize the GStreamer PTP subsystem and create a PTP ordinary clock in
+ * slave-only mode for all domains on the given @interfaces with the
+ * given @clock_id.
+ *
+ * If @clock_id is %GST_PTP_CLOCK_ID_NONE, a clock id is automatically
+ * generated from the MAC address of the first network interface.
+ *
+ *
+ * This function is automatically called by gst_ptp_clock_new() with default
+ * parameters if it wasn't called before.
+ *
+ * Returns: %TRUE if the GStreamer PTP clock subsystem could be initialized.
+ *
+ * Since: 1.6
+ */
+gboolean
+gst_ptp_init (guint64 clock_id, gchar ** interfaces)
+{
+ gboolean ret;
+ const gchar *env;
+ gchar **argv = NULL;
+ gint argc, argc_c;
+ gint fd_r, fd_w;
+ GError *err = NULL;
+ GSource *stdin_source;
+
+ GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
+
+ g_mutex_lock (&ptp_lock);
+ if (!supported) {
+ GST_ERROR ("PTP not supported");
+ ret = FALSE;
+ goto done;
+ }
+
+ if (initted) {
+ GST_DEBUG ("PTP already initialized");
+ ret = TRUE;
+ goto done;
+ }
+
+ if (ptp_helper_pid) {
+ GST_DEBUG ("PTP currently initializing");
+ goto wait;
+ }
+
+ if (!domain_stats_hooks_initted) {
+ g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
+ domain_stats_hooks_initted = TRUE;
+ }
+
+ argc = 1;
+ if (clock_id != GST_PTP_CLOCK_ID_NONE)
+ argc += 2;
+ if (interfaces != NULL)
+ argc += 2 * g_strv_length (interfaces);
+
+ argv = g_new0 (gchar *, argc + 2);
+ argc_c = 0;
+
+ env = g_getenv ("GST_PTP_HELPER_1_0");
+ if (env == NULL)
+ env = g_getenv ("GST_PTP_HELPER");
+ if (env != NULL && *env != '\0') {
+ GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
+ argv[argc_c++] = g_strdup (env);
+ } else {
+ argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
+ }
+
+ if (clock_id != GST_PTP_CLOCK_ID_NONE) {
+ argv[argc_c++] = g_strdup ("-c");
+ argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
+ }
+
+ if (interfaces != NULL) {
+ gchar **ptr = interfaces;
+
+ while (*ptr) {
+ argv[argc_c++] = g_strdup ("-i");
+ argv[argc_c++] = g_strdup (*ptr);
+ ptr++;
+ }
+ }
+
+ main_context = g_main_context_new ();
+ main_loop = g_main_loop_new (main_context, FALSE);
+
+ ptp_helper_thread =
+ g_thread_try_new ("ptp-helper-thread", ptp_helper_main, NULL, &err);
+ if (!ptp_helper_thread) {
+ GST_ERROR ("Failed to start PTP helper thread: %s", err->message);
+ g_clear_error (&err);
+ ret = FALSE;
+ goto done;
+ }
+
+ if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
+ &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
+ GST_ERROR ("Failed to start ptp helper process: %s", err->message);
+ g_clear_error (&err);
+ ret = FALSE;
+ supported = FALSE;
+ goto done;
+ }
+
+ stdin_channel = g_io_channel_unix_new (fd_r);
+ g_io_channel_set_encoding (stdin_channel, NULL, NULL);
+ g_io_channel_set_buffered (stdin_channel, FALSE);
+ g_io_channel_set_close_on_unref (stdin_channel, TRUE);
+ stdin_source =
+ g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
+ g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
+ NULL);
+ g_source_attach (stdin_source, main_context);
+ g_source_unref (stdin_source);
+
+ /* Create stdout channel */
+ stdout_channel = g_io_channel_unix_new (fd_w);
+ g_io_channel_set_encoding (stdout_channel, NULL, NULL);
+ g_io_channel_set_close_on_unref (stdout_channel, TRUE);
+ g_io_channel_set_buffered (stdout_channel, FALSE);
+
+ delay_req_rand = g_rand_new ();
+
+ initted = TRUE;
+
+wait:
+ GST_DEBUG ("Waiting for PTP to be initialized");
+
+ while (ptp_clock_id.clock_identity == GST_PTP_CLOCK_ID_NONE && initted)
+ g_cond_wait (&ptp_cond, &ptp_lock);
+
+ ret = initted;
+ if (ret) {
+ GST_DEBUG ("Initialized and got clock id 0x%016" G_GINT64_MODIFIER "x %u",
+ ptp_clock_id.clock_identity, ptp_clock_id.port_number);
+ } else {
+ GST_ERROR ("Failed to initialize");
+ supported = FALSE;
+ }
+
+done:
+ g_strfreev (argv);
+
+ if (!ret) {
+ if (ptp_helper_pid) {
+ kill (ptp_helper_pid, SIGKILL);
+ waitpid (ptp_helper_pid, NULL, 0);
+ g_spawn_close_pid (ptp_helper_pid);
+ }
+ ptp_helper_pid = 0;
+
+ if (stdin_channel)
+ g_io_channel_unref (stdin_channel);
+ stdin_channel = NULL;
+ if (stdout_channel)
+ g_io_channel_unref (stdout_channel);
+ stdout_channel = NULL;
+
+ if (main_loop && ptp_helper_thread) {
+ g_main_loop_quit (main_loop);
+ g_thread_join (ptp_helper_thread);
+ }
+ ptp_helper_thread = NULL;
+ if (main_loop)
+ g_main_loop_unref (main_loop);
+ main_loop = NULL;
+ if (main_context)
+ g_main_context_unref (main_context);
+ main_context = NULL;
+
+ if (delay_req_rand)
+ g_rand_free (delay_req_rand);
+ delay_req_rand = NULL;
+ }
+
+ g_mutex_unlock (&ptp_lock);
+
+ return ret;
+}
+
+/**
+ * gst_ptp_deinit:
+ *
+ * Deinitialize the GStreamer PTP subsystem and stop the PTP clock. If there
+ * are any remaining GstPtpClock instances, they won't be further synchronized
+ * to the PTP network clock.
+ *
+ * Since: 1.6
+ */
+void
+gst_ptp_deinit (void)
+{
+ GList *l, *m;
+
+ g_mutex_lock (&ptp_lock);
+
+ if (ptp_helper_pid) {
+ kill (ptp_helper_pid, SIGKILL);
+ waitpid (ptp_helper_pid, NULL, 0);
+ g_spawn_close_pid (ptp_helper_pid);
+ }
+ ptp_helper_pid = 0;
+
+ if (stdin_channel)
+ g_io_channel_unref (stdin_channel);
+ stdin_channel = NULL;
+ if (stdout_channel)
+ g_io_channel_unref (stdout_channel);
+ stdout_channel = NULL;
+
+ if (main_loop && ptp_helper_thread) {
+ GThread *tmp = ptp_helper_thread;
+ ptp_helper_thread = NULL;
+ g_mutex_unlock (&ptp_lock);
+ g_main_loop_quit (main_loop);
+ g_thread_join (tmp);
+ g_mutex_lock (&ptp_lock);
+ }
+ if (main_loop)
+ g_main_loop_unref (main_loop);
+ main_loop = NULL;
+ if (main_context)
+ g_main_context_unref (main_context);
+ main_context = NULL;
+
+ if (delay_req_rand)
+ g_rand_free (delay_req_rand);
+ delay_req_rand = NULL;
+
+ for (l = domain_data; l; l = l->next) {
+ PtpDomainData *domain = l->data;
+
+ for (m = domain->announce_senders; m; m = m->next) {
+ PtpAnnounceSender *sender = m->data;
+
+ g_queue_foreach (&sender->announce_messages, (GFunc) g_free, NULL);
+ g_queue_clear (&sender->announce_messages);
+ g_free (sender);
+ }
+ g_list_free (domain->announce_senders);
+
+ g_queue_foreach (&domain->pending_syncs, (GFunc) ptp_pending_sync_free,
+ NULL);
+ g_queue_clear (&domain->pending_syncs);
+ gst_object_unref (domain->domain_clock);
+ g_free (domain);
+ }
+ g_list_free (domain_data);
+ domain_data = NULL;
+ g_list_foreach (domain_clocks, (GFunc) g_free, NULL);
+ g_list_free (domain_clocks);
+ domain_clocks = NULL;
+
+ ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
+ ptp_clock_id.port_number = 0;
+
+ initted = FALSE;
+
+ g_mutex_unlock (&ptp_lock);
+}
+
+#define DEFAULT_DOMAIN 0
+
+enum
+{
+ PROP_0,
+ PROP_DOMAIN,
+ PROP_INTERNAL_CLOCK
+};
+
+#define GST_PTP_CLOCK_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_PTP_CLOCK, GstPtpClockPrivate))
+
+struct _GstPtpClockPrivate
+{
+ guint domain;
+ GstClock *domain_clock;
+ gulong domain_stats_id;
+};
+
+#define gst_ptp_clock_parent_class parent_class
+G_DEFINE_TYPE (GstPtpClock, gst_ptp_clock, GST_TYPE_SYSTEM_CLOCK);
+
+static void gst_ptp_clock_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_ptp_clock_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_ptp_clock_finalize (GObject * object);
+
+static GstClockTime gst_ptp_clock_get_internal_time (GstClock * clock);
+
+static void
+gst_ptp_clock_class_init (GstPtpClockClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstClockClass *clock_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+ clock_class = GST_CLOCK_CLASS (klass);
+
+ g_type_class_add_private (klass, sizeof (GstPtpClockPrivate));
+
+ gobject_class->finalize = gst_ptp_clock_finalize;
+ gobject_class->get_property = gst_ptp_clock_get_property;
+ gobject_class->set_property = gst_ptp_clock_set_property;
+
+ g_object_class_install_property (gobject_class, PROP_DOMAIN,
+ g_param_spec_uint ("domain", "Domain",
+ "The PTP domain", 0, G_MAXUINT8,
+ DEFAULT_DOMAIN,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK,
+ g_param_spec_object ("internal-clock", "Internal Clock",
+ "Internal clock", GST_TYPE_CLOCK,
+ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+ clock_class->get_internal_time = gst_ptp_clock_get_internal_time;
+}
+
+static void
+gst_ptp_clock_init (GstPtpClock * self)
+{
+ GstPtpClockPrivate *priv;
+
+ self->priv = priv = GST_PTP_CLOCK_GET_PRIVATE (self);
+
+ GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER);
+ GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC);
+
+ priv->domain = DEFAULT_DOMAIN;
+}
+
+static gboolean
+gst_ptp_clock_ensure_domain_clock (GstPtpClock * self)
+{
+ gboolean got_clock = TRUE;
+
+ if (G_UNLIKELY (!self->priv->domain_clock)) {
+ g_mutex_lock (&domain_clocks_lock);
+ if (!self->priv->domain_clock) {
+ GList *l;
+
+ got_clock = FALSE;
+
+ for (l = domain_clocks; l; l = l->next) {
+ PtpDomainData *clock_data = l->data;
+
+ if (clock_data->domain == self->priv->domain
+ && clock_data->last_ptp_time != 0) {
+ self->priv->domain_clock = clock_data->domain_clock;
+ got_clock = TRUE;
+ break;
+ }
+ }
+ }
+ g_mutex_unlock (&domain_clocks_lock);
+ if (got_clock) {
+ g_object_notify (G_OBJECT (self), "internal-clock");
+ gst_clock_set_synced (GST_CLOCK (self), TRUE);
+ }
+ }
+
+ return got_clock;
+}
+
+static gboolean
+gst_ptp_clock_stats_callback (guint8 domain, const GstStructure * stats,
+ gpointer user_data)
+{
+ GstPtpClock *self = user_data;
+
+ if (domain != self->priv->domain
+ || !gst_structure_has_name (stats, GST_PTP_STATISTICS_TIME_UPDATED))
+ return TRUE;
+
+ /* Let's set our internal clock */
+ if (!gst_ptp_clock_ensure_domain_clock (self))
+ return TRUE;
+
+ self->priv->domain_stats_id = 0;
+
+ return FALSE;
+}
+
+static void
+gst_ptp_clock_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstPtpClock *self = GST_PTP_CLOCK (object);
+
+ switch (prop_id) {
+ case PROP_DOMAIN:
+ self->priv->domain = g_value_get_uint (value);
+ gst_ptp_clock_ensure_domain_clock (self);
+ if (!self->priv->domain_clock)
+ self->priv->domain_stats_id =
+ gst_ptp_statistics_callback_add (gst_ptp_clock_stats_callback, self,
+ NULL);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_ptp_clock_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstPtpClock *self = GST_PTP_CLOCK (object);
+
+ switch (prop_id) {
+ case PROP_DOMAIN:
+ g_value_set_uint (value, self->priv->domain);
+ break;
+ case PROP_INTERNAL_CLOCK:
+ gst_ptp_clock_ensure_domain_clock (self);
+ g_value_set_object (value, self->priv->domain_clock);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_ptp_clock_finalize (GObject * object)
+{
+ GstPtpClock *self = GST_PTP_CLOCK (object);
+
+ if (self->priv->domain_stats_id)
+ gst_ptp_statistics_callback_remove (self->priv->domain_stats_id);
+
+ G_OBJECT_CLASS (gst_ptp_clock_parent_class)->finalize (object);
+}
+
+static GstClockTime
+gst_ptp_clock_get_internal_time (GstClock * clock)
+{
+ GstPtpClock *self = GST_PTP_CLOCK (clock);
+
+ gst_ptp_clock_ensure_domain_clock (self);
+
+ if (!self->priv->domain_clock) {
+ GST_ERROR_OBJECT (self, "Domain %u has no clock yet and is not synced",
+ self->priv->domain);
+ return GST_CLOCK_TIME_NONE;
+ }
+
+ return gst_clock_get_time (self->priv->domain_clock);
+}
+
+/**
+ * gst_ptp_clock_new:
+ * @name: Name of the clock
+ * @domain: PTP domain
+ *
+ * Creates a new PTP clock instance that exports the PTP time of the master
+ * clock in @domain. This clock can be slaved to other clocks as needed.
+ *
+ * If gst_ptp_init() was not called before, this will call gst_ptp_init() with
+ * default parameters.
+ *
+ *
+ * This clock only returns valid timestamps after it received the first
+ * times from the PTP master clock on the network. Once this happens the
+ * GstPtpClock::internal-clock property will become non-NULL. You can connect
+ * to the notify::internal-clock signal to get notified about this, or
+ * alternatively use gst_ptp_clock_wait_ready() to wait for this to happen.
+ *
+ * Since: 1.6
+ */
+GstClock *
+gst_ptp_clock_new (const gchar * name, guint domain)
+{
+ g_return_val_if_fail (name != NULL, NULL);
+ g_return_val_if_fail (domain <= G_MAXUINT8, NULL);
+
+ if (!initted && !gst_ptp_init (GST_PTP_CLOCK_ID_NONE, NULL)) {
+ GST_ERROR ("Failed to initialize PTP");
+ return NULL;
+ }
+
+ return g_object_new (GST_TYPE_PTP_CLOCK, "name", name, "domain", domain,
+ NULL);
+}
+
+typedef struct
+{
+ guint8 domain;
+ const GstStructure *stats;
+} DomainStatsMarshalData;
+
+static void
+domain_stats_marshaller (GHook * hook, DomainStatsMarshalData * data)
+{
+ GstPtpStatisticsCallback callback = (GstPtpStatisticsCallback) hook->func;
+
+ if (!callback (data->domain, data->stats, hook->data))
+ g_hook_destroy (&domain_stats_hooks, hook->hook_id);
+}
+
+static void
+emit_ptp_statistics (guint8 domain, const GstStructure * stats)
+{
+ DomainStatsMarshalData data = { domain, stats };
+
+ g_mutex_lock (&ptp_lock);
+ g_hook_list_marshal (&domain_stats_hooks, TRUE,
+ (GHookMarshaller) domain_stats_marshaller, &data);
+ g_mutex_unlock (&ptp_lock);
+}
+
+/**
+ * gst_ptp_statistics_callback_add:
+ * @callback: GstPtpStatisticsCallback to call
+ * @user_data: Data to pass to the callback
+ * @destroy_data: GDestroyNotify to destroy the data
+ *
+ * Installs a new statistics callback for gathering PTP statistics. See
+ * GstPtpStatisticsCallback for a list of statistics that are provided.
+ *
+ * Returns: Id for the callback that can be passed to
+ * gst_ptp_statistics_callback_remove()
+ *
+ * Since: 1.6
+ */
+gulong
+gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
+ gpointer user_data, GDestroyNotify destroy_data)
+{
+ GHook *hook;
+
+ g_mutex_lock (&ptp_lock);
+
+ if (!domain_stats_hooks_initted) {
+ g_hook_list_init (&domain_stats_hooks, sizeof (GHook));
+ domain_stats_hooks_initted = TRUE;
+ }
+
+ hook = g_hook_alloc (&domain_stats_hooks);
+ hook->func = callback;
+ hook->data = user_data;
+ hook->destroy = destroy_data;
+ g_hook_prepend (&domain_stats_hooks, hook);
+ g_atomic_int_add (&domain_stats_n_hooks, 1);
+
+ g_mutex_unlock (&ptp_lock);
+
+ return hook->hook_id;
+}
+
+/**
+ * gst_ptp_statistics_callback_remove:
+ * @id: Callback id to remove
+ *
+ * Removes a PTP statistics callback that was previously added with
+ * gst_ptp_statistics_callback_add().
+ *
+ * Since: 1.6
+ */
+void
+gst_ptp_statistics_callback_remove (gulong id)
+{
+ g_mutex_lock (&ptp_lock);
+ if (g_hook_destroy (&domain_stats_hooks, id))
+ g_atomic_int_add (&domain_stats_n_hooks, -1);
+ g_mutex_unlock (&ptp_lock);
+}
+
+#else /* HAVE_PTP */
+
+GType
+gst_ptp_clock_get_type (void)
+{
+ return G_TYPE_INVALID;
+}
+
+gboolean
+gst_ptp_is_supported (void)
+{
+ return FALSE;
+}
+
+gboolean
+gst_ptp_is_initialized (void)
+{
+ return FALSE;
+}
+
+gboolean
+gst_ptp_init (guint64 clock_id, gchar ** interfaces)
+{
+ return FALSE;
+}
+
+void
+gst_ptp_deinit (void)
+{
+}
+
+GstClock *
+gst_ptp_clock_new (const gchar * name, guint domain)
+{
+ return NULL;
+}
+
+gboolean
+gst_ptp_clock_wait_ready (GstPtpClock * self, GstClockTime timeout)
+{
+ return FALSE;
+}
+
+gulong
+gst_ptp_statistics_callback_add (GstPtpStatisticsCallback callback,
+ gpointer user_data, GDestroyNotify destroy_data)
+{
+ return 0;
+}
+
+void
+gst_ptp_statistics_callback_remove (gulong id)
+{
+ return;
+}
+
+#endif