11 // Client manages the estimated network coordinate for a given node, and adjusts
12 // it as the node observes round trip times and estimated coordinates from other
13 // nodes. The core algorithm is based on Vivaldi, see the documentation for Config
16 // coord is the current estimate of the client's network coordinate.
19 // origin is a coordinate sitting at the origin.
22 // config contains the tuning parameters that govern the performance of
26 // adjustmentIndex is the current index into the adjustmentSamples slice.
29 // adjustment is used to store samples for the adjustment calculation.
30 adjustmentSamples []float64
32 // latencyFilterSamples is used to store the last several RTT samples,
33 // keyed by node name. We will use the config's LatencyFilterSamples
34 // value to determine how many samples we keep, per node.
35 latencyFilterSamples map[string][]float64
37 // mutex enables safe concurrent access to the client.
41 // NewClient creates a new Client and verifies the configuration is valid.
42 func NewClient(config *Config) (*Client, error) {
43 if !(config.Dimensionality > 0) {
44 return nil, fmt.Errorf("dimensionality must be >0")
48 coord: NewCoordinate(config),
49 origin: NewCoordinate(config),
52 adjustmentSamples: make([]float64, config.AdjustmentWindowSize),
53 latencyFilterSamples: make(map[string][]float64),
57 // GetCoordinate returns a copy of the coordinate for this client.
58 func (c *Client) GetCoordinate() *Coordinate {
60 defer c.mutex.RUnlock()
62 return c.coord.Clone()
65 // SetCoordinate forces the client's coordinate to a known state.
66 func (c *Client) SetCoordinate(coord *Coordinate) {
68 defer c.mutex.Unlock()
70 c.coord = coord.Clone()
73 // ForgetNode removes any client state for the given node.
74 func (c *Client) ForgetNode(node string) {
76 defer c.mutex.Unlock()
78 delete(c.latencyFilterSamples, node)
81 // latencyFilter applies a simple moving median filter with a new sample for
82 // a node. This assumes that the mutex has been locked already.
83 func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
84 samples, ok := c.latencyFilterSamples[node]
86 samples = make([]float64, 0, c.config.LatencyFilterSize)
89 // Add the new sample and trim the list, if needed.
90 samples = append(samples, rttSeconds)
91 if len(samples) > int(c.config.LatencyFilterSize) {
94 c.latencyFilterSamples[node] = samples
96 // Sort a copy of the samples and return the median.
97 sorted := make([]float64, len(samples))
100 return sorted[len(sorted)/2]
103 // updateVivialdi updates the Vivaldi portion of the client's coordinate. This
104 // assumes that the mutex has been locked already.
105 func (c *Client) updateVivaldi(other *Coordinate, rttSeconds float64) {
106 const zeroThreshold = 1.0e-6
108 dist := c.coord.DistanceTo(other).Seconds()
109 if rttSeconds < zeroThreshold {
110 rttSeconds = zeroThreshold
112 wrongness := math.Abs(dist-rttSeconds) / rttSeconds
114 totalError := c.coord.Error + other.Error
115 if totalError < zeroThreshold {
116 totalError = zeroThreshold
118 weight := c.coord.Error / totalError
120 c.coord.Error = c.config.VivaldiCE*weight*wrongness + c.coord.Error*(1.0-c.config.VivaldiCE*weight)
121 if c.coord.Error > c.config.VivaldiErrorMax {
122 c.coord.Error = c.config.VivaldiErrorMax
125 delta := c.config.VivaldiCC * weight
126 force := delta * (rttSeconds - dist)
127 c.coord = c.coord.ApplyForce(c.config, force, other)
130 // updateAdjustment updates the adjustment portion of the client's coordinate, if
131 // the feature is enabled. This assumes that the mutex has been locked already.
132 func (c *Client) updateAdjustment(other *Coordinate, rttSeconds float64) {
133 if c.config.AdjustmentWindowSize == 0 {
137 // Note that the existing adjustment factors don't figure in to this
138 // calculation so we use the raw distance here.
139 dist := c.coord.rawDistanceTo(other)
140 c.adjustmentSamples[c.adjustmentIndex] = rttSeconds - dist
141 c.adjustmentIndex = (c.adjustmentIndex + 1) % c.config.AdjustmentWindowSize
144 for _, sample := range c.adjustmentSamples {
147 c.coord.Adjustment = sum / (2.0 * float64(c.config.AdjustmentWindowSize))
150 // updateGravity applies a small amount of gravity to pull coordinates towards
151 // the center of the coordinate system to combat drift. This assumes that the
152 // mutex is locked already.
153 func (c *Client) updateGravity() {
154 dist := c.origin.DistanceTo(c.coord).Seconds()
155 force := -1.0 * math.Pow(dist/c.config.GravityRho, 2.0)
156 c.coord = c.coord.ApplyForce(c.config, force, c.origin)
159 // Update takes other, a coordinate for another node, and rtt, a round trip
160 // time observation for a ping to that node, and updates the estimated position of
161 // the client's coordinate. Returns the updated coordinate.
162 func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate {
164 defer c.mutex.Unlock()
166 rttSeconds := c.latencyFilter(node, rtt.Seconds())
167 c.updateVivaldi(other, rttSeconds)
168 c.updateAdjustment(other, rttSeconds)
170 return c.coord.Clone()
173 // DistanceTo returns the estimated RTT from the client's coordinate to other, the
174 // coordinate for another node.
175 func (c *Client) DistanceTo(other *Coordinate) time.Duration {
177 defer c.mutex.RUnlock()
179 return c.coord.DistanceTo(other)