Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / hashicorp / serf / coordinate / client.go
1 package coordinate
2
3 import (
4         "fmt"
5         "math"
6         "sort"
7         "sync"
8         "time"
9 )
10
11 // Client manages the estimated network coordinate for a given node, and adjusts
12 // it as the node observes round trip times and estimated coordinates from other
13 // nodes. The core algorithm is based on Vivaldi, see the documentation for Config
14 // for more details.
15 type Client struct {
16         // coord is the current estimate of the client's network coordinate.
17         coord *Coordinate
18
19         // origin is a coordinate sitting at the origin.
20         origin *Coordinate
21
22         // config contains the tuning parameters that govern the performance of
23         // the algorithm.
24         config *Config
25
26         // adjustmentIndex is the current index into the adjustmentSamples slice.
27         adjustmentIndex uint
28
29         // adjustment is used to store samples for the adjustment calculation.
30         adjustmentSamples []float64
31
32         // latencyFilterSamples is used to store the last several RTT samples,
33         // keyed by node name. We will use the config's LatencyFilterSamples
34         // value to determine how many samples we keep, per node.
35         latencyFilterSamples map[string][]float64
36
37         // mutex enables safe concurrent access to the client.
38         mutex sync.RWMutex
39 }
40
41 // NewClient creates a new Client and verifies the configuration is valid.
42 func NewClient(config *Config) (*Client, error) {
43         if !(config.Dimensionality > 0) {
44                 return nil, fmt.Errorf("dimensionality must be >0")
45         }
46
47         return &Client{
48                 coord:                NewCoordinate(config),
49                 origin:               NewCoordinate(config),
50                 config:               config,
51                 adjustmentIndex:      0,
52                 adjustmentSamples:    make([]float64, config.AdjustmentWindowSize),
53                 latencyFilterSamples: make(map[string][]float64),
54         }, nil
55 }
56
57 // GetCoordinate returns a copy of the coordinate for this client.
58 func (c *Client) GetCoordinate() *Coordinate {
59         c.mutex.RLock()
60         defer c.mutex.RUnlock()
61
62         return c.coord.Clone()
63 }
64
65 // SetCoordinate forces the client's coordinate to a known state.
66 func (c *Client) SetCoordinate(coord *Coordinate) {
67         c.mutex.Lock()
68         defer c.mutex.Unlock()
69
70         c.coord = coord.Clone()
71 }
72
73 // ForgetNode removes any client state for the given node.
74 func (c *Client) ForgetNode(node string) {
75         c.mutex.Lock()
76         defer c.mutex.Unlock()
77
78         delete(c.latencyFilterSamples, node)
79 }
80
81 // latencyFilter applies a simple moving median filter with a new sample for
82 // a node. This assumes that the mutex has been locked already.
83 func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
84         samples, ok := c.latencyFilterSamples[node]
85         if !ok {
86                 samples = make([]float64, 0, c.config.LatencyFilterSize)
87         }
88
89         // Add the new sample and trim the list, if needed.
90         samples = append(samples, rttSeconds)
91         if len(samples) > int(c.config.LatencyFilterSize) {
92                 samples = samples[1:]
93         }
94         c.latencyFilterSamples[node] = samples
95
96         // Sort a copy of the samples and return the median.
97         sorted := make([]float64, len(samples))
98         copy(sorted, samples)
99         sort.Float64s(sorted)
100         return sorted[len(sorted)/2]
101 }
102
103 // updateVivialdi updates the Vivaldi portion of the client's coordinate. This
104 // assumes that the mutex has been locked already.
105 func (c *Client) updateVivaldi(other *Coordinate, rttSeconds float64) {
106         const zeroThreshold = 1.0e-6
107
108         dist := c.coord.DistanceTo(other).Seconds()
109         if rttSeconds < zeroThreshold {
110                 rttSeconds = zeroThreshold
111         }
112         wrongness := math.Abs(dist-rttSeconds) / rttSeconds
113
114         totalError := c.coord.Error + other.Error
115         if totalError < zeroThreshold {
116                 totalError = zeroThreshold
117         }
118         weight := c.coord.Error / totalError
119
120         c.coord.Error = c.config.VivaldiCE*weight*wrongness + c.coord.Error*(1.0-c.config.VivaldiCE*weight)
121         if c.coord.Error > c.config.VivaldiErrorMax {
122                 c.coord.Error = c.config.VivaldiErrorMax
123         }
124
125         delta := c.config.VivaldiCC * weight
126         force := delta * (rttSeconds - dist)
127         c.coord = c.coord.ApplyForce(c.config, force, other)
128 }
129
130 // updateAdjustment updates the adjustment portion of the client's coordinate, if
131 // the feature is enabled. This assumes that the mutex has been locked already.
132 func (c *Client) updateAdjustment(other *Coordinate, rttSeconds float64) {
133         if c.config.AdjustmentWindowSize == 0 {
134                 return
135         }
136
137         // Note that the existing adjustment factors don't figure in to this
138         // calculation so we use the raw distance here.
139         dist := c.coord.rawDistanceTo(other)
140         c.adjustmentSamples[c.adjustmentIndex] = rttSeconds - dist
141         c.adjustmentIndex = (c.adjustmentIndex + 1) % c.config.AdjustmentWindowSize
142
143         sum := 0.0
144         for _, sample := range c.adjustmentSamples {
145                 sum += sample
146         }
147         c.coord.Adjustment = sum / (2.0 * float64(c.config.AdjustmentWindowSize))
148 }
149
150 // updateGravity applies a small amount of gravity to pull coordinates towards
151 // the center of the coordinate system to combat drift. This assumes that the
152 // mutex is locked already.
153 func (c *Client) updateGravity() {
154         dist := c.origin.DistanceTo(c.coord).Seconds()
155         force := -1.0 * math.Pow(dist/c.config.GravityRho, 2.0)
156         c.coord = c.coord.ApplyForce(c.config, force, c.origin)
157 }
158
159 // Update takes other, a coordinate for another node, and rtt, a round trip
160 // time observation for a ping to that node, and updates the estimated position of
161 // the client's coordinate. Returns the updated coordinate.
162 func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) *Coordinate {
163         c.mutex.Lock()
164         defer c.mutex.Unlock()
165
166         rttSeconds := c.latencyFilter(node, rtt.Seconds())
167         c.updateVivaldi(other, rttSeconds)
168         c.updateAdjustment(other, rttSeconds)
169         c.updateGravity()
170         return c.coord.Clone()
171 }
172
173 // DistanceTo returns the estimated RTT from the client's coordinate to other, the
174 // coordinate for another node.
175 func (c *Client) DistanceTo(other *Coordinate) time.Duration {
176         c.mutex.RLock()
177         defer c.mutex.RUnlock()
178
179         return c.coord.DistanceTo(other)
180 }