From 33dcec7b748aee42518586c78656e060cc584412 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 4 Jan 2016 16:31:23 +0200 Subject: [PATCH] netclientclock: Only ever run one clock against a specific server If multiple net/NTP clocks are created for the same server, reuse the same internal clock for all of them. This makes sure that we don't flood the server with too many requests and also possibly allows faster synchronization if there already was an earlier synchronized clock when creating a new one. --- libs/gst/net/gstnetclientclock.c | 922 ++++++++++++++++++++++++++------------- 1 file changed, 608 insertions(+), 314 deletions(-) diff --git a/libs/gst/net/gstnetclientclock.c b/libs/gst/net/gstnetclientclock.c index a228465..873b739 100644 --- a/libs/gst/net/gstnetclientclock.c +++ b/libs/gst/net/gstnetclientclock.c @@ -68,6 +68,32 @@ GST_DEBUG_CATEGORY_STATIC (ncc_debug); #define GST_CAT_DEFAULT (ncc_debug) +typedef struct +{ + GstClock *clock; /* GstNetClientInternalClock */ + + GList *clocks; /* GstNetClientClocks */ +} ClockCache; + +G_LOCK_DEFINE_STATIC (clocks_lock); +static GList *clocks = NULL; + +#define GST_TYPE_NET_CLIENT_INTERNAL_CLOCK \ + (gst_net_client_internal_clock_get_type()) +#define GST_NET_CLIENT_INTERNAL_CLOCK(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClock)) +#define GST_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK,GstNetClientInternalClockClass)) +#define GST_IS_NET_CLIENT_INTERNAL_CLOCK(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK)) +#define GST_IS_NET_CLIENT_INTERNAL_CLOCK_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_NET_CLIENT_INTERNAL_CLOCK)) + +typedef struct _GstNetClientInternalClock GstNetClientInternalClock; +typedef struct _GstNetClientInternalClockClass GstNetClientInternalClockClass; + +G_GNUC_INTERNAL GType gst_net_client_internal_clock_get_type (void); + #define DEFAULT_ADDRESS "127.0.0.1" #define DEFAULT_PORT 5637 #define DEFAULT_TIMEOUT GST_SECOND @@ -91,15 +117,13 @@ enum PROP_MINIMUM_UPDATE_INTERVAL, PROP_BUS, PROP_BASE_TIME, - PROP_INTERNAL_CLOCK + PROP_INTERNAL_CLOCK, + PROP_IS_NTP }; -#define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate)) - -struct _GstNetClientClockPrivate +struct _GstNetClientInternalClock { - GstClock *internal_clock; + GstSystemClock clock; GThread *thread; @@ -117,49 +141,48 @@ struct _GstNetClientClockPrivate GstClockTime last_rtts[MEDIAN_PRE_FILTERING_WINDOW]; gint last_rtts_missing; - GstClockTime base_time; - gchar *address; gint port; + gboolean is_ntp; - GstBus *bus; + /* Protected by OBJECT_LOCK */ + GList *busses; +}; - gboolean is_ntp; +struct _GstNetClientInternalClockClass +{ + GstSystemClockClass parent_class; }; #define _do_init \ GST_DEBUG_CATEGORY_INIT (ncc_debug, "netclock", 0, "Network client clock"); -#define gst_net_client_clock_parent_class parent_class -G_DEFINE_TYPE_WITH_CODE (GstNetClientClock, gst_net_client_clock, - GST_TYPE_SYSTEM_CLOCK, _do_init); +G_DEFINE_TYPE_WITH_CODE (GstNetClientInternalClock, + gst_net_client_internal_clock, GST_TYPE_SYSTEM_CLOCK, _do_init); -static void gst_net_client_clock_finalize (GObject * object); -static void gst_net_client_clock_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_net_client_clock_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); -static void gst_net_client_clock_constructed (GObject * object); - -static gboolean gst_net_client_clock_start (GstNetClientClock * self); -static void gst_net_client_clock_stop (GstNetClientClock * self); +static void gst_net_client_internal_clock_finalize (GObject * object); +static void gst_net_client_internal_clock_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec); +static void gst_net_client_internal_clock_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec); +static void gst_net_client_internal_clock_constructed (GObject * object); -static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock); +static gboolean gst_net_client_internal_clock_start (GstNetClientInternalClock * + self); +static void gst_net_client_internal_clock_stop (GstNetClientInternalClock * + self); static void -gst_net_client_clock_class_init (GstNetClientClockClass * klass) +gst_net_client_internal_clock_class_init (GstNetClientInternalClockClass * + klass) { GObjectClass *gobject_class; - GstClockClass *clock_class; gobject_class = G_OBJECT_CLASS (klass); - clock_class = GST_CLOCK_CLASS (klass); - - g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate)); - gobject_class->finalize = gst_net_client_clock_finalize; - gobject_class->get_property = gst_net_client_clock_get_property; - gobject_class->set_property = gst_net_client_clock_set_property; - gobject_class->constructed = gst_net_client_clock_constructed; + gobject_class->finalize = gst_net_client_internal_clock_finalize; + gobject_class->get_property = gst_net_client_internal_clock_get_property; + gobject_class->set_property = gst_net_client_internal_clock_set_property; + gobject_class->constructed = gst_net_client_internal_clock_constructed; g_object_class_install_property (gobject_class, PROP_ADDRESS, g_param_spec_string ("address", "address", @@ -171,156 +194,88 @@ gst_net_client_clock_class_init (GstNetClientClockClass * klass) "The port on which the remote server is listening", 0, G_MAXUINT16, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUS, - g_param_spec_object ("bus", "bus", - "A GstBus on which to send clock status information", GST_TYPE_BUS, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - /** - * GstNetClientClock::round-trip-limit: - * - * Maximum allowed round-trip for packets. If this property is set to a nonzero - * value, all packets with a round-trip interval larger than this limit will be - * ignored. This is useful for networks with severe and fluctuating transport - * delays. Filtering out these packets increases stability of the synchronization. - * On the other hand, the lower the limit, the higher the amount of filtered - * packets. Empirical tests are typically necessary to estimate a good value - * for the limit. - * If the property is set to zero, the limit is disabled. - * - * Since: 1.4 - */ - g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT, - g_param_spec_uint64 ("round-trip-limit", "round-trip limit", - "Maximum tolerable round-trip interval for packets, in nanoseconds " - "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL, - g_param_spec_uint64 ("minimum-update-interval", "minimum update interval", - "Minimum polling interval for packets, in nanoseconds" - "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BASE_TIME, - g_param_spec_uint64 ("base-time", "Base Time", - "Initial time that is reported before synchronization", 0, - G_MAXUINT64, DEFAULT_BASE_TIME, - G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK, - g_param_spec_object ("internal-clock", "Internal Clock", - "Internal clock that directly slaved to the remote clock", - GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - - clock_class->get_internal_time = gst_net_client_clock_get_internal_time; + g_object_class_install_property (gobject_class, PROP_IS_NTP, + g_param_spec_boolean ("is-ntp", "Is NTP", + "The clock is using the NTPv4 protocol", FALSE, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); } static void -gst_net_client_clock_init (GstNetClientClock * self) +gst_net_client_internal_clock_init (GstNetClientInternalClock * self) { - GstNetClientClockPrivate *priv; - - self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self); - - GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER); GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC); - priv->internal_clock = g_object_new (GST_TYPE_SYSTEM_CLOCK, NULL); + self->port = DEFAULT_PORT; + self->address = g_strdup (DEFAULT_ADDRESS); + self->is_ntp = FALSE; - priv->port = DEFAULT_PORT; - priv->address = g_strdup (DEFAULT_ADDRESS); - - gst_clock_set_timeout (priv->internal_clock, DEFAULT_TIMEOUT); + gst_clock_set_timeout (GST_CLOCK (self), DEFAULT_TIMEOUT); - priv->thread = NULL; + self->thread = NULL; - priv->servaddr = NULL; - priv->rtt_avg = GST_CLOCK_TIME_NONE; - priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT; - priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL; - priv->last_remote_poll_interval = GST_CLOCK_TIME_NONE; - priv->skipped_updates = 0; - priv->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW; - priv->base_time = DEFAULT_BASE_TIME; + self->servaddr = NULL; + self->rtt_avg = GST_CLOCK_TIME_NONE; + self->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT; + self->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL; + self->last_remote_poll_interval = GST_CLOCK_TIME_NONE; + self->skipped_updates = 0; + self->last_rtts_missing = MEDIAN_PRE_FILTERING_WINDOW; } static void -gst_net_client_clock_finalize (GObject * object) +gst_net_client_internal_clock_finalize (GObject * object) { - GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object); - if (self->priv->thread) { - gst_net_client_clock_stop (self); + if (self->thread) { + gst_net_client_internal_clock_stop (self); } - g_free (self->priv->address); - self->priv->address = NULL; - - if (self->priv->servaddr != NULL) { - g_object_unref (self->priv->servaddr); - self->priv->servaddr = NULL; - } + g_free (self->address); + self->address = NULL; - if (self->priv->socket != NULL) { - g_socket_close (self->priv->socket, NULL); - g_object_unref (self->priv->socket); - self->priv->socket = NULL; + if (self->servaddr != NULL) { + g_object_unref (self->servaddr); + self->servaddr = NULL; } - if (self->priv->bus != NULL) { - gst_object_unref (self->priv->bus); - self->priv->bus = NULL; + if (self->socket != NULL) { + g_socket_close (self->socket, NULL); + g_object_unref (self->socket); + self->socket = NULL; } - if (self->priv->internal_clock != NULL) { - gst_object_unref (self->priv->internal_clock); - self->priv->internal_clock = NULL; - } + g_warn_if_fail (self->busses == NULL); - G_OBJECT_CLASS (parent_class)->finalize (object); + G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->finalize + (object); } static void -gst_net_client_clock_set_property (GObject * object, guint prop_id, +gst_net_client_internal_clock_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { - GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object); switch (prop_id) { case PROP_ADDRESS: GST_OBJECT_LOCK (self); - g_free (self->priv->address); - self->priv->address = g_value_dup_string (value); - if (self->priv->address == NULL) - self->priv->address = g_strdup (DEFAULT_ADDRESS); + g_free (self->address); + self->address = g_value_dup_string (value); + if (self->address == NULL) + self->address = g_strdup (DEFAULT_ADDRESS); GST_OBJECT_UNLOCK (self); break; case PROP_PORT: GST_OBJECT_LOCK (self); - self->priv->port = g_value_get_int (value); - GST_OBJECT_UNLOCK (self); - break; - case PROP_ROUNDTRIP_LIMIT: - GST_OBJECT_LOCK (self); - self->priv->roundtrip_limit = g_value_get_uint64 (value); + self->port = g_value_get_int (value); GST_OBJECT_UNLOCK (self); break; - case PROP_MINIMUM_UPDATE_INTERVAL: + case PROP_IS_NTP: GST_OBJECT_LOCK (self); - self->priv->minimum_update_interval = g_value_get_uint64 (value); + self->is_ntp = g_value_get_boolean (value); GST_OBJECT_UNLOCK (self); break; - case PROP_BUS: - GST_OBJECT_LOCK (self); - if (self->priv->bus) - gst_object_unref (self->priv->bus); - self->priv->bus = g_value_dup_object (value); - GST_OBJECT_UNLOCK (self); - break; - case PROP_BASE_TIME: - self->priv->base_time = g_value_get_uint64 (value); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -328,40 +283,22 @@ gst_net_client_clock_set_property (GObject * object, guint prop_id, } static void -gst_net_client_clock_get_property (GObject * object, guint prop_id, +gst_net_client_internal_clock_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { - GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object); switch (prop_id) { case PROP_ADDRESS: GST_OBJECT_LOCK (self); - g_value_set_string (value, self->priv->address); + g_value_set_string (value, self->address); GST_OBJECT_UNLOCK (self); break; case PROP_PORT: - g_value_set_int (value, self->priv->port); - break; - case PROP_ROUNDTRIP_LIMIT: - GST_OBJECT_LOCK (self); - g_value_set_uint64 (value, self->priv->roundtrip_limit); - GST_OBJECT_UNLOCK (self); + g_value_set_int (value, self->port); break; - case PROP_MINIMUM_UPDATE_INTERVAL: - GST_OBJECT_LOCK (self); - g_value_set_uint64 (value, self->priv->minimum_update_interval); - GST_OBJECT_UNLOCK (self); - break; - case PROP_BUS: - GST_OBJECT_LOCK (self); - g_value_set_object (value, self->priv->bus); - GST_OBJECT_UNLOCK (self); - break; - case PROP_BASE_TIME: - g_value_set_uint64 (value, self->priv->base_time); - break; - case PROP_INTERNAL_CLOCK: - g_value_set_object (value, self->priv->internal_clock); + case PROP_IS_NTP: + g_value_set_boolean (value, self->is_ntp); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -370,50 +307,20 @@ gst_net_client_clock_get_property (GObject * object, guint prop_id, } static void -gst_net_client_clock_constructed (GObject * object) +gst_net_client_internal_clock_constructed (GObject * object) { - GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); - GstClock *internal_clock = self->priv->internal_clock; - GstClockTime internal; + GstNetClientInternalClock *self = GST_NET_CLIENT_INTERNAL_CLOCK (object); - G_OBJECT_CLASS (parent_class)->constructed (object); + G_OBJECT_CLASS (gst_net_client_internal_clock_parent_class)->constructed + (object); - /* gst_clock_get_time() values are guaranteed to be increasing. because no one - * has called get_time on this clock yet we are free to adjust to any value - * without worrying about worrying about MAX() issues with the clock's - * internal time. - */ - - /* update our internal time so get_time() give something around base_time. - assume that the rate is 1 in the beginning. */ - internal = gst_clock_get_internal_time (internal_clock); - gst_clock_set_calibration (internal_clock, internal, - self->priv->base_time, 1, 1); - - { - GstClockTime now = gst_clock_get_time (internal_clock); - - if (GST_CLOCK_DIFF (now, self->priv->base_time) > 0 || - GST_CLOCK_DIFF (now, self->priv->base_time + GST_SECOND) < 0) { - g_warning ("unable to set the base time, expect sync problems!"); - } - } - - if (!gst_net_client_clock_start (self)) { + if (!gst_net_client_internal_clock_start (self)) { g_warning ("failed to start clock '%s'", GST_OBJECT_NAME (self)); } /* all systems go, cap'n */ } -static GstClockTime -gst_net_client_clock_get_internal_time (GstClock * clock) -{ - GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock); - - return gst_clock_get_time (self->priv->internal_clock); -} - static gint compare_clock_time (const GstClockTime * a, const GstClockTime * b) { @@ -425,18 +332,15 @@ compare_clock_time (const GstClockTime * a, const GstClockTime * b) } static void -gst_net_client_clock_observe_times (GstNetClientClock * self, +gst_net_client_internal_clock_observe_times (GstNetClientInternalClock * self, GstClockTime local_1, GstClockTime remote_1, GstClockTime remote_2, GstClockTime local_2) { - GstNetClientClockPrivate *priv = self->priv; - GstClock *internal_clock = priv->internal_clock; GstClockTime current_timeout = 0; GstClockTime local_avg, remote_avg; gdouble r_squared; GstClock *clock; GstClockTime rtt, rtt_limit, min_update_interval; - GstBus *bus = NULL; /* Use for discont tracking */ GstClockTime time_before = 0; GstClockTime min_guess = 0; @@ -451,19 +355,15 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, gint i; GST_OBJECT_LOCK (self); - rtt_limit = self->priv->roundtrip_limit; + rtt_limit = self->roundtrip_limit; /* If the server told us a poll interval and it's bigger than the * one configured via the property, use the server's */ - if (self->priv->last_remote_poll_interval != GST_CLOCK_TIME_NONE && - self->priv->last_remote_poll_interval > - self->priv->minimum_update_interval) - min_update_interval = self->priv->last_remote_poll_interval; + if (self->last_remote_poll_interval != GST_CLOCK_TIME_NONE && + self->last_remote_poll_interval > self->minimum_update_interval) + min_update_interval = self->last_remote_poll_interval; else - min_update_interval = self->priv->minimum_update_interval; - - if (self->priv->bus) - bus = gst_object_ref (self->priv->bus); + min_update_interval = self->minimum_update_interval; GST_OBJECT_UNLOCK (self); if (local_2 < local_1) { @@ -495,13 +395,13 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, } for (i = 1; i < MEDIAN_PRE_FILTERING_WINDOW; i++) - self->priv->last_rtts[i - 1] = self->priv->last_rtts[i]; - self->priv->last_rtts[i - 1] = rtt; + self->last_rtts[i - 1] = self->last_rtts[i]; + self->last_rtts[i - 1] = rtt; - if (self->priv->last_rtts_missing) { - self->priv->last_rtts_missing--; + if (self->last_rtts_missing) { + self->last_rtts_missing--; } else { - memcpy (&last_rtts, &self->priv->last_rtts, sizeof (last_rtts)); + memcpy (&last_rtts, &self->last_rtts, sizeof (last_rtts)); g_qsort_with_data (&last_rtts, MEDIAN_PRE_FILTERING_WINDOW, sizeof (GstClockTime), (GCompareDataFunc) compare_clock_time, NULL); @@ -524,17 +424,17 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, /* Track an average round trip time, for a bit of smoothing */ /* Always update before discarding a sample, so genuine changes in * the network get picked up, eventually */ - if (priv->rtt_avg == GST_CLOCK_TIME_NONE) - priv->rtt_avg = rtt; - else if (rtt < priv->rtt_avg) /* Shorter RTTs carry more weight than longer */ - priv->rtt_avg = (3 * priv->rtt_avg + rtt) / 4; + if (self->rtt_avg == GST_CLOCK_TIME_NONE) + self->rtt_avg = rtt; + else if (rtt < self->rtt_avg) /* Shorter RTTs carry more weight than longer */ + self->rtt_avg = (3 * self->rtt_avg + rtt) / 4; else - priv->rtt_avg = (15 * priv->rtt_avg + rtt) / 16; + self->rtt_avg = (15 * self->rtt_avg + rtt) / 16; - if (rtt > 2 * priv->rtt_avg) { + if (rtt > 2 * self->rtt_avg) { GST_LOG_OBJECT (self, "Dropping observation, long RTT %" GST_TIME_FORMAT " > 2 * avg %" - GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (priv->rtt_avg)); + GST_TIME_FORMAT, GST_TIME_ARGS (rtt), GST_TIME_ARGS (self->rtt_avg)); goto bogus_observation; } @@ -582,7 +482,7 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, clock = GST_CLOCK_CAST (self); /* Store what the clock produced as 'now' before this update */ - gst_clock_get_calibration (internal_clock, &orig_internal_time, + gst_clock_get_calibration (GST_CLOCK_CAST (self), &orig_internal_time, &orig_external_time, &orig_rate_num, &orig_rate_den); internal_time = orig_internal_time; external_time = orig_external_time; @@ -590,15 +490,15 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, rate_den = orig_rate_den; min_guess = - gst_clock_adjust_with_calibration (internal_clock, local_1, + gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1, internal_time, external_time, rate_num, rate_den); time_before = - gst_clock_adjust_with_calibration (internal_clock, local_2, + gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2, internal_time, external_time, rate_num, rate_den); /* Maximum discontinuity, when we're synched with the master. Could make this a property, * but this value seems to work fine */ - max_discont = priv->rtt_avg / 4; + max_discont = self->rtt_avg / 4; /* If the remote observation was within a max_discont window around our min/max estimates, we're synched */ synched = @@ -606,14 +506,14 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, && GST_CLOCK_DIFF (time_before, remote_avg) < (GstClockTimeDiff) (max_discont)); - if (gst_clock_add_observation_unapplied (internal_clock, + if (gst_clock_add_observation_unapplied (GST_CLOCK_CAST (self), local_avg, remote_avg, &r_squared, &internal_time, &external_time, &rate_num, &rate_den)) { /* Now compare the difference (discont) in the clock * after this observation */ time_discont = GST_CLOCK_DIFF (time_before, - gst_clock_adjust_with_calibration (internal_clock, local_2, + gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_2, internal_time, external_time, rate_num, rate_den)); /* If we were in sync with the remote clock, clamp the allowed @@ -641,24 +541,24 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, /* Check if the new clock params would have made our observation within range */ now_synched = (GST_CLOCK_DIFF (remote_avg, - gst_clock_adjust_with_calibration (internal_clock, + gst_clock_adjust_with_calibration (GST_CLOCK_CAST (self), local_1, internal_time, external_time, rate_num, rate_den)) < (GstClockTimeDiff) (max_discont)) && (GST_CLOCK_DIFF (gst_clock_adjust_with_calibration - (internal_clock, local_2, internal_time, external_time, + (GST_CLOCK_CAST (self), local_2, internal_time, external_time, rate_num, rate_den), remote_avg) < (GstClockTimeDiff) (max_discont)); /* Only update the clock if we had synch or just gained it */ - if (synched || now_synched || priv->skipped_updates > MAX_SKIPPED_UPDATES) { - gst_clock_set_calibration (internal_clock, internal_time, + if (synched || now_synched || self->skipped_updates > MAX_SKIPPED_UPDATES) { + gst_clock_set_calibration (GST_CLOCK_CAST (self), internal_time, external_time, rate_num, rate_den); /* ghetto formula - shorter timeout for bad correlations */ current_timeout = (1e-3 / (1 - MIN (r_squared, 0.99999))) * GST_SECOND; current_timeout = - MIN (current_timeout, gst_clock_get_timeout (internal_clock)); - priv->skipped_updates = 0; + MIN (current_timeout, gst_clock_get_timeout (GST_CLOCK_CAST (self))); + self->skipped_updates = 0; /* FIXME: When do we consider the clock absolutely not synced anymore? */ gst_clock_set_synced (GST_CLOCK (self), TRUE); @@ -669,7 +569,7 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, rate_num = orig_rate_num; rate_den = orig_rate_den; time_discont = 0; - priv->skipped_updates++; + self->skipped_updates++; } } @@ -677,15 +577,17 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, if (rtt < min_update_interval) current_timeout = MAX (min_update_interval - rtt, current_timeout); - if (bus) { + GST_OBJECT_LOCK (self); + if (self->busses) { GstStructure *s; GstMessage *msg; + GList *l; /* Output a stats message, whether we updated the clock or not */ s = gst_structure_new ("gst-netclock-statistics", "synchronised", G_TYPE_BOOLEAN, synched, "rtt", G_TYPE_UINT64, rtt, - "rtt-average", G_TYPE_UINT64, priv->rtt_avg, + "rtt-average", G_TYPE_UINT64, self->rtt_avg, "local", G_TYPE_UINT64, local_avg, "remote", G_TYPE_UINT64, remote_avg, "discontinuity", G_TYPE_INT64, time_discont, @@ -702,30 +604,29 @@ gst_net_client_clock_observe_times (GstNetClientClock * self, (gdouble) (rate_num) / rate_den, "local-clock-offset", G_TYPE_INT64, GST_CLOCK_DIFF (internal_time, external_time), NULL); msg = gst_message_new_element (GST_OBJECT (self), s); - gst_bus_post (bus, msg); + + for (l = self->busses; l; l = l->next) + gst_bus_post (l->data, gst_message_ref (msg)); + gst_message_unref (msg); } + GST_OBJECT_UNLOCK (self); GST_INFO ("next timeout: %" GST_TIME_FORMAT, GST_TIME_ARGS (current_timeout)); - self->priv->timeout_expiration = gst_util_get_timestamp () + current_timeout; + self->timeout_expiration = gst_util_get_timestamp () + current_timeout; - if (bus) - gst_object_unref (bus); return; bogus_observation: - if (bus) - gst_object_unref (bus); /* Schedule a new packet again soon */ - self->priv->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4); + self->timeout_expiration = gst_util_get_timestamp () + (GST_SECOND / 4); return; } static gpointer -gst_net_client_clock_thread (gpointer data) +gst_net_client_internal_clock_thread (gpointer data) { - GstNetClientClock *self = data; - GstClock *internal_clock = self->priv->internal_clock; - GSocket *socket = self->priv->socket; + GstNetClientInternalClock *self = data; + GSocket *socket = self->socket; GError *err = NULL; GST_INFO_OBJECT (self, "net client clock thread running, socket=%p", socket); @@ -733,8 +634,8 @@ gst_net_client_clock_thread (gpointer data) g_socket_set_blocking (socket, TRUE); g_socket_set_timeout (socket, 0); - while (!g_cancellable_is_cancelled (self->priv->cancel)) { - GstClockTime expiration_time = self->priv->timeout_expiration; + while (!g_cancellable_is_cancelled (self->cancel)) { + GstClockTime expiration_time = self->timeout_expiration; GstClockTime now = gst_util_get_timestamp (); gint64 socket_timeout; @@ -747,7 +648,7 @@ gst_net_client_clock_thread (gpointer data) GST_TRACE_OBJECT (self, "timeout: %" G_GINT64_FORMAT "us", socket_timeout); if (!g_socket_condition_timed_wait (socket, G_IO_IN, socket_timeout, - self->priv->cancel, &err)) { + self->cancel, &err)) { /* cancelled, timeout or error */ if (err->code == G_IO_ERROR_CANCELLED) { GST_INFO_OBJECT (self, "cancelled"); @@ -757,19 +658,19 @@ gst_net_client_clock_thread (gpointer data) /* timed out, let's send another packet */ GST_DEBUG_OBJECT (self, "timed out"); - if (self->priv->is_ntp) { + if (self->is_ntp) { GstNtpPacket *packet; packet = gst_ntp_packet_new (NULL, NULL); - packet->transmit_time = gst_clock_get_internal_time (internal_clock); + packet->transmit_time = + gst_clock_get_internal_time (GST_CLOCK_CAST (self)); GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT, GST_TIME_ARGS (packet->transmit_time)); - gst_ntp_packet_send (packet, self->priv->socket, - self->priv->servaddr, NULL); + gst_ntp_packet_send (packet, self->socket, self->servaddr, NULL); g_free (packet); } else { @@ -777,21 +678,22 @@ gst_net_client_clock_thread (gpointer data) packet = gst_net_time_packet_new (NULL); - packet->local_time = gst_clock_get_internal_time (internal_clock); + packet->local_time = + gst_clock_get_internal_time (GST_CLOCK_CAST (self)); GST_DEBUG_OBJECT (self, "sending packet, local time = %" GST_TIME_FORMAT, GST_TIME_ARGS (packet->local_time)); - gst_net_time_packet_send (packet, self->priv->socket, - self->priv->servaddr, NULL); + gst_net_time_packet_send (packet, self->socket, self->servaddr, NULL); g_free (packet); } /* reset timeout (but are expecting a response sooner anyway) */ - self->priv->timeout_expiration = - gst_util_get_timestamp () + gst_clock_get_timeout (internal_clock); + self->timeout_expiration = + gst_util_get_timestamp () + + gst_clock_get_timeout (GST_CLOCK_CAST (self)); } else { GST_DEBUG_OBJECT (self, "socket error: %s", err->message); g_usleep (G_USEC_PER_SEC / 10); /* throttle */ @@ -802,9 +704,9 @@ gst_net_client_clock_thread (gpointer data) /* got packet */ - new_local = gst_clock_get_internal_time (internal_clock); + new_local = gst_clock_get_internal_time (GST_CLOCK_CAST (self)); - if (self->priv->is_ntp) { + if (self->is_ntp) { GstNtpPacket *packet; packet = gst_ntp_packet_receive (socket, NULL, &err); @@ -824,11 +726,12 @@ gst_net_client_clock_thread (gpointer data) /* Remember the last poll interval we ever got from the server */ if (packet->poll_interval != GST_CLOCK_TIME_NONE) - self->priv->last_remote_poll_interval = packet->poll_interval; + self->last_remote_poll_interval = packet->poll_interval; /* observe_times will reset the timeout */ - gst_net_client_clock_observe_times (self, packet->origin_time, - packet->receive_time, packet->transmit_time, new_local); + gst_net_client_internal_clock_observe_times (self, + packet->origin_time, packet->receive_time, packet->transmit_time, + new_local); g_free (packet); } else if (err != NULL) { @@ -844,14 +747,14 @@ gst_net_client_clock_thread (gpointer data) * our minimum poll interval. Otherwise we assume that the server * already told us something sensible and that this error here * was just a spurious error */ - if (self->priv->last_remote_poll_interval == GST_CLOCK_TIME_NONE) - self->priv->minimum_update_interval *= 2; + if (self->last_remote_poll_interval == GST_CLOCK_TIME_NONE) + self->minimum_update_interval *= 2; /* And wait a bit before we send the next packet instead of * sending it immediately */ - self->priv->timeout_expiration = + self->timeout_expiration = gst_util_get_timestamp () + - gst_clock_get_timeout (internal_clock); + gst_clock_get_timeout (GST_CLOCK_CAST (self)); } else { GST_WARNING_OBJECT (self, "receive error: %s", err->message); } @@ -872,7 +775,7 @@ gst_net_client_clock_thread (gpointer data) GST_TIME_ARGS (new_local)); /* observe_times will reset the timeout */ - gst_net_client_clock_observe_times (self, packet->local_time, + gst_net_client_internal_clock_observe_times (self, packet->local_time, packet->remote_time, packet->remote_time, new_local); g_free (packet); @@ -888,7 +791,7 @@ gst_net_client_clock_thread (gpointer data) } static gboolean -gst_net_client_clock_start (GstNetClientClock * self) +gst_net_client_internal_clock_start (GstNetClientInternalClock * self) { GSocketAddress *servaddr; GSocketAddress *myaddr; @@ -901,18 +804,17 @@ gst_net_client_clock_start (GstNetClientClock * self) GResolver *resolver = NULL; GError *err = NULL; - g_return_val_if_fail (self->priv->address != NULL, FALSE); - g_return_val_if_fail (self->priv->servaddr == NULL, FALSE); + g_return_val_if_fail (self->address != NULL, FALSE); + g_return_val_if_fail (self->servaddr == NULL, FALSE); /* create target address */ - inetaddr = g_inet_address_new_from_string (self->priv->address); + inetaddr = g_inet_address_new_from_string (self->address); if (inetaddr == NULL) { GList *results; resolver = g_resolver_get_default (); - results = - g_resolver_lookup_by_name (resolver, self->priv->address, NULL, &err); + results = g_resolver_lookup_by_name (resolver, self->address, NULL, &err); if (!results) goto failed_to_resolve; @@ -923,13 +825,13 @@ gst_net_client_clock_start (GstNetClientClock * self) family = g_inet_address_get_family (inetaddr); - servaddr = g_inet_socket_address_new (inetaddr, self->priv->port); + servaddr = g_inet_socket_address_new (inetaddr, self->port); g_object_unref (inetaddr); g_assert (servaddr != NULL); - GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->priv->address, - self->priv->port); + GST_DEBUG_OBJECT (self, "will communicate with %s:%d", self->address, + self->port); socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &error); @@ -958,15 +860,15 @@ gst_net_client_clock_start (GstNetClientClock * self) g_object_unref (myaddr); - self->priv->cancel = g_cancellable_new (); - self->priv->made_cancel_fd = - g_cancellable_make_pollfd (self->priv->cancel, &dummy_pollfd); + self->cancel = g_cancellable_new (); + self->made_cancel_fd = + g_cancellable_make_pollfd (self->cancel, &dummy_pollfd); - self->priv->socket = socket; - self->priv->servaddr = G_SOCKET_ADDRESS (servaddr); + self->socket = socket; + self->servaddr = G_SOCKET_ADDRESS (servaddr); - self->priv->thread = g_thread_try_new ("GstNetClientClock", - gst_net_client_clock_thread, self, &error); + self->thread = g_thread_try_new ("GstNetClientInternalClock", + gst_net_client_internal_clock_thread, self, &error); if (error != NULL) goto no_thread; @@ -997,7 +899,7 @@ getsockname_error: failed_to_resolve: { GST_ERROR_OBJECT (self, "resolving '%s' failed: %s", - self->priv->address, err->message); + self->address, err->message); g_clear_error (&err); g_object_unref (resolver); return FALSE; @@ -1005,42 +907,434 @@ failed_to_resolve: no_thread: { GST_ERROR_OBJECT (self, "could not create thread: %s", error->message); - g_object_unref (self->priv->servaddr); - self->priv->servaddr = NULL; - g_object_unref (self->priv->socket); - self->priv->socket = NULL; + g_object_unref (self->servaddr); + self->servaddr = NULL; + g_object_unref (self->socket); + self->socket = NULL; g_error_free (error); return FALSE; } } static void -gst_net_client_clock_stop (GstNetClientClock * self) +gst_net_client_internal_clock_stop (GstNetClientInternalClock * self) { - if (self->priv->thread == NULL) + if (self->thread == NULL) return; GST_INFO_OBJECT (self, "stopping..."); - g_cancellable_cancel (self->priv->cancel); + g_cancellable_cancel (self->cancel); - g_thread_join (self->priv->thread); - self->priv->thread = NULL; + g_thread_join (self->thread); + self->thread = NULL; - if (self->priv->made_cancel_fd) - g_cancellable_release_fd (self->priv->cancel); + if (self->made_cancel_fd) + g_cancellable_release_fd (self->cancel); - g_object_unref (self->priv->cancel); - self->priv->cancel = NULL; + g_object_unref (self->cancel); + self->cancel = NULL; - g_object_unref (self->priv->servaddr); - self->priv->servaddr = NULL; + g_object_unref (self->servaddr); + self->servaddr = NULL; - g_object_unref (self->priv->socket); - self->priv->socket = NULL; + g_object_unref (self->socket); + self->socket = NULL; GST_INFO_OBJECT (self, "stopped"); } +#define GST_NET_CLIENT_CLOCK_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_NET_CLIENT_CLOCK, GstNetClientClockPrivate)) + +struct _GstNetClientClockPrivate +{ + GstClock *internal_clock; + + GstClockTime roundtrip_limit; + GstClockTime minimum_update_interval; + + GstClockTime base_time; + + gchar *address; + gint port; + + GstBus *bus; + + gboolean is_ntp; + + gulong synced_id; +}; + +G_DEFINE_TYPE (GstNetClientClock, gst_net_client_clock, GST_TYPE_SYSTEM_CLOCK); + +static void gst_net_client_clock_finalize (GObject * object); +static void gst_net_client_clock_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_net_client_clock_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_net_client_clock_constructed (GObject * object); + +static GstClockTime gst_net_client_clock_get_internal_time (GstClock * clock); + +static void +gst_net_client_clock_class_init (GstNetClientClockClass * klass) +{ + GObjectClass *gobject_class; + GstClockClass *clock_class; + + gobject_class = G_OBJECT_CLASS (klass); + clock_class = GST_CLOCK_CLASS (klass); + + g_type_class_add_private (klass, sizeof (GstNetClientClockPrivate)); + + gobject_class->finalize = gst_net_client_clock_finalize; + gobject_class->get_property = gst_net_client_clock_get_property; + gobject_class->set_property = gst_net_client_clock_set_property; + gobject_class->constructed = gst_net_client_clock_constructed; + + g_object_class_install_property (gobject_class, PROP_ADDRESS, + g_param_spec_string ("address", "address", + "The IP address of the machine providing a time server", + DEFAULT_ADDRESS, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_PORT, + g_param_spec_int ("port", "port", + "The port on which the remote server is listening", 0, G_MAXUINT16, + DEFAULT_PORT, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_BUS, + g_param_spec_object ("bus", "bus", + "A GstBus on which to send clock status information", GST_TYPE_BUS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstNetClientInternalClock::round-trip-limit: + * + * Maximum allowed round-trip for packets. If this property is set to a nonzero + * value, all packets with a round-trip interval larger than this limit will be + * ignored. This is useful for networks with severe and fluctuating transport + * delays. Filtering out these packets increases stability of the synchronization. + * On the other hand, the lower the limit, the higher the amount of filtered + * packets. Empirical tests are typically necessary to estimate a good value + * for the limit. + * If the property is set to zero, the limit is disabled. + * + * Since: 1.4 + */ + g_object_class_install_property (gobject_class, PROP_ROUNDTRIP_LIMIT, + g_param_spec_uint64 ("round-trip-limit", "round-trip limit", + "Maximum tolerable round-trip interval for packets, in nanoseconds " + "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_ROUNDTRIP_LIMIT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MINIMUM_UPDATE_INTERVAL, + g_param_spec_uint64 ("minimum-update-interval", "minimum update interval", + "Minimum polling interval for packets, in nanoseconds" + "(0 = no limit)", 0, G_MAXUINT64, DEFAULT_MINIMUM_UPDATE_INTERVAL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_BASE_TIME, + g_param_spec_uint64 ("base-time", "Base Time", + "Initial time that is reported before synchronization", 0, + G_MAXUINT64, DEFAULT_BASE_TIME, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_INTERNAL_CLOCK, + g_param_spec_object ("internal-clock", "Internal Clock", + "Internal clock that directly slaved to the remote clock", + GST_TYPE_CLOCK, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + clock_class->get_internal_time = gst_net_client_clock_get_internal_time; +} + +static void +gst_net_client_clock_init (GstNetClientClock * self) +{ + GstNetClientClockPrivate *priv; + + self->priv = priv = GST_NET_CLIENT_CLOCK_GET_PRIVATE (self); + + GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_CAN_SET_MASTER); + GST_OBJECT_FLAG_SET (self, GST_CLOCK_FLAG_NEEDS_STARTUP_SYNC); + + priv->port = DEFAULT_PORT; + priv->address = g_strdup (DEFAULT_ADDRESS); + + priv->roundtrip_limit = DEFAULT_ROUNDTRIP_LIMIT; + priv->minimum_update_interval = DEFAULT_MINIMUM_UPDATE_INTERVAL; + priv->base_time = DEFAULT_BASE_TIME; +} + +/* Must be called with clocks_lock */ +static void +update_clock_cache (ClockCache * cache) +{ + GstClockTime roundtrip_limit = 0, minimum_update_interval = 0; + GList *l, *busses = NULL; + + GST_OBJECT_LOCK (cache->clock); + g_list_free_full (GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses, + (GDestroyNotify) gst_object_unref); + + for (l = cache->clocks; l; l = l->next) { + GstNetClientClock *clock = l->data; + + if (clock->priv->bus) + busses = g_list_prepend (busses, gst_object_ref (clock->priv->bus)); + + if (roundtrip_limit == 0) + roundtrip_limit = clock->priv->roundtrip_limit; + else + roundtrip_limit = MAX (roundtrip_limit, clock->priv->roundtrip_limit); + + if (minimum_update_interval == 0) + minimum_update_interval = clock->priv->minimum_update_interval; + else + minimum_update_interval = + MIN (minimum_update_interval, clock->priv->minimum_update_interval); + } + GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->busses = busses; + GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->roundtrip_limit = + roundtrip_limit; + GST_NET_CLIENT_INTERNAL_CLOCK (cache->clock)->minimum_update_interval = + minimum_update_interval; + + GST_OBJECT_UNLOCK (cache->clock); +} + +static void +gst_net_client_clock_finalize (GObject * object) +{ + GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + GList *l; + + if (self->priv->synced_id) + g_signal_handler_disconnect (self, self->priv->synced_id); + self->priv->synced_id = 0; + + G_LOCK (clocks_lock); + for (l = clocks; l; l = l->next) { + ClockCache *cache = l->data; + + if (cache->clock == self->priv->internal_clock) { + cache->clocks = g_list_remove (cache->clocks, self); + + if (cache->clocks) { + update_clock_cache (cache); + } else { + gst_object_unref (cache->clock); + g_free (cache); + clocks = g_list_remove (clocks, cache); + } + break; + } + } + G_UNLOCK (clocks_lock); + + g_free (self->priv->address); + self->priv->address = NULL; + + if (self->priv->bus != NULL) { + gst_object_unref (self->priv->bus); + self->priv->bus = NULL; + } + + G_OBJECT_CLASS (gst_net_client_clock_parent_class)->finalize (object); +} + +static void +gst_net_client_clock_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + gboolean update = FALSE; + + switch (prop_id) { + case PROP_ADDRESS: + GST_OBJECT_LOCK (self); + g_free (self->priv->address); + self->priv->address = g_value_dup_string (value); + if (self->priv->address == NULL) + self->priv->address = g_strdup (DEFAULT_ADDRESS); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PORT: + GST_OBJECT_LOCK (self); + self->priv->port = g_value_get_int (value); + GST_OBJECT_UNLOCK (self); + break; + case PROP_ROUNDTRIP_LIMIT: + GST_OBJECT_LOCK (self); + self->priv->roundtrip_limit = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (self); + update = TRUE; + break; + case PROP_MINIMUM_UPDATE_INTERVAL: + GST_OBJECT_LOCK (self); + self->priv->minimum_update_interval = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (self); + update = TRUE; + break; + case PROP_BUS: + GST_OBJECT_LOCK (self); + if (self->priv->bus) + gst_object_unref (self->priv->bus); + self->priv->bus = g_value_dup_object (value); + GST_OBJECT_UNLOCK (self); + update = TRUE; + break; + case PROP_BASE_TIME: + self->priv->base_time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + if (update && self->priv->internal_clock) { + GList *l; + + G_LOCK (clocks_lock); + for (l = clocks; l; l = l->next) { + ClockCache *cache = l->data; + + if (cache->clock == self->priv->internal_clock) { + update_clock_cache (cache); + } + } + G_UNLOCK (clocks_lock); + } +} + +static void +gst_net_client_clock_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + + switch (prop_id) { + case PROP_ADDRESS: + GST_OBJECT_LOCK (self); + g_value_set_string (value, self->priv->address); + GST_OBJECT_UNLOCK (self); + break; + case PROP_PORT: + g_value_set_int (value, self->priv->port); + break; + case PROP_ROUNDTRIP_LIMIT: + GST_OBJECT_LOCK (self); + g_value_set_uint64 (value, self->priv->roundtrip_limit); + GST_OBJECT_UNLOCK (self); + break; + case PROP_MINIMUM_UPDATE_INTERVAL: + GST_OBJECT_LOCK (self); + g_value_set_uint64 (value, self->priv->minimum_update_interval); + GST_OBJECT_UNLOCK (self); + break; + case PROP_BUS: + GST_OBJECT_LOCK (self); + g_value_set_object (value, self->priv->bus); + GST_OBJECT_UNLOCK (self); + break; + case PROP_BASE_TIME: + g_value_set_uint64 (value, self->priv->base_time); + break; + case PROP_INTERNAL_CLOCK: + g_value_set_object (value, self->priv->internal_clock); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_net_client_clocked_synced_cb (GstClock * internal_clock, gboolean synced, + GstClock * self) +{ + gst_clock_set_synced (self, synced); +} + +static void +gst_net_client_clock_constructed (GObject * object) +{ + GstNetClientClock *self = GST_NET_CLIENT_CLOCK (object); + GstClock *internal_clock; + GstClockTime internal; + GList *l; + ClockCache *cache = NULL; + + G_OBJECT_CLASS (gst_net_client_clock_parent_class)->constructed (object); + + G_LOCK (clocks_lock); + for (l = clocks; l; l = l->next) { + ClockCache *tmp = l->data; + GstNetClientInternalClock *internal_clock = + GST_NET_CLIENT_INTERNAL_CLOCK (tmp->clock); + + if (strcmp (internal_clock->address, self->priv->address) == 0 && + internal_clock->port == self->priv->port) { + cache = tmp; + break; + } + } + + if (!cache) { + cache = g_new0 (ClockCache, 1); + + cache->clock = + g_object_new (GST_TYPE_NET_CLIENT_INTERNAL_CLOCK, "address", + self->priv->address, "port", self->priv->port, "is-ntp", + self->priv->is_ntp, NULL); + clocks = g_list_prepend (clocks, cache); + } + + cache->clocks = g_list_prepend (cache->clocks, self); + + GST_OBJECT_LOCK (cache->clock); + if (gst_clock_is_synced (cache->clock)) + gst_clock_set_synced (GST_CLOCK (self), TRUE); + self->priv->synced_id = + g_signal_connect (cache->clock, "synced", + G_CALLBACK (gst_net_client_clocked_synced_cb), self); + GST_OBJECT_UNLOCK (cache->clock); + + G_UNLOCK (clocks_lock); + + self->priv->internal_clock = internal_clock = cache->clock; + + /* gst_clock_get_time() values are guaranteed to be increasing. because no one + * has called get_time on this clock yet we are free to adjust to any value + * without worrying about worrying about MAX() issues with the clock's + * internal time. + */ + + /* update our internal time so get_time() give something around base_time. + assume that the rate is 1 in the beginning. */ + internal = gst_clock_get_internal_time (internal_clock); + gst_clock_set_calibration (internal_clock, internal, + self->priv->base_time, 1, 1); + + { + GstClockTime now = gst_clock_get_time (internal_clock); + + if (GST_CLOCK_DIFF (now, self->priv->base_time) > 0 || + GST_CLOCK_DIFF (now, self->priv->base_time + GST_SECOND) < 0) { + g_warning ("unable to set the base time, expect sync problems!"); + } + } + + /* all systems go, cap'n */ +} + +static GstClockTime +gst_net_client_clock_get_internal_time (GstClock * clock) +{ + GstNetClientClock *self = GST_NET_CLIENT_CLOCK (clock); + + return gst_clock_get_time (self->priv->internal_clock); +} + /** * gst_net_client_clock_new: * @name: a name for the clock @@ -1048,7 +1342,7 @@ gst_net_client_clock_stop (GstNetClientClock * self) * @remote_port: the port of the remote clock provider * @base_time: initial time of the clock * - * Create a new #GstNetClientClock that will report the time + * Create a new #GstNetClientInternalClock that will report the time * provided by the #GstNetTimeProvider on @remote_address and * @remote_port. * @@ -1059,7 +1353,7 @@ GstClock * gst_net_client_clock_new (const gchar * name, const gchar * remote_address, gint remote_port, GstClockTime base_time) { - GstNetClientClock *ret; + GstClock *ret; g_return_val_if_fail (remote_address != NULL, NULL); g_return_val_if_fail (remote_port > 0, NULL); @@ -1069,10 +1363,10 @@ gst_net_client_clock_new (const gchar * name, const gchar * remote_address, ret = g_object_new (GST_TYPE_NET_CLIENT_CLOCK, "address", remote_address, "port", remote_port, "base-time", base_time, NULL); - return (GstClock *) ret; + return ret; } -G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_NET_CLIENT_CLOCK); +G_DEFINE_TYPE (GstNtpClock, gst_ntp_clock, GST_TYPE_SYSTEM_CLOCK); static void gst_ntp_clock_class_init (GstNtpClockClass * klass) @@ -1104,7 +1398,7 @@ GstClock * gst_ntp_clock_new (const gchar * name, const gchar * remote_address, gint remote_port, GstClockTime base_time) { - GstNetClientClock *ret; + GstClock *ret; g_return_val_if_fail (remote_address != NULL, NULL); g_return_val_if_fail (remote_port > 0, NULL); @@ -1114,5 +1408,5 @@ gst_ntp_clock_new (const gchar * name, const gchar * remote_address, ret = g_object_new (GST_TYPE_NTP_CLOCK, "address", remote_address, "port", remote_port, "base-time", base_time, NULL); - return (GstClock *) ret; + return ret; } -- 2.7.4