Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / hashicorp / memberlist / util.go
1 package memberlist
2
3 import (
4         "bytes"
5         "compress/lzw"
6         "encoding/binary"
7         "fmt"
8         "io"
9         "math"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "time"
15
16         "github.com/hashicorp/go-msgpack/codec"
17         "github.com/sean-/seed"
18 )
19
20 // pushPullScale is the minimum number of nodes
21 // before we start scaling the push/pull timing. The scale
22 // effect is the log2(Nodes) - log2(pushPullScale). This means
23 // that the 33rd node will cause us to double the interval,
24 // while the 65th will triple it.
25 const pushPullScaleThreshold = 32
26
27 const (
28         // Constant litWidth 2-8
29         lzwLitWidth = 8
30 )
31
32 func init() {
33         seed.Init()
34 }
35
36 // Decode reverses the encode operation on a byte slice input
37 func decode(buf []byte, out interface{}) error {
38         r := bytes.NewReader(buf)
39         hd := codec.MsgpackHandle{}
40         dec := codec.NewDecoder(r, &hd)
41         return dec.Decode(out)
42 }
43
44 // Encode writes an encoded object to a new bytes buffer
45 func encode(msgType messageType, in interface{}) (*bytes.Buffer, error) {
46         buf := bytes.NewBuffer(nil)
47         buf.WriteByte(uint8(msgType))
48         hd := codec.MsgpackHandle{}
49         enc := codec.NewEncoder(buf, &hd)
50         err := enc.Encode(in)
51         return buf, err
52 }
53
54 // Returns a random offset between 0 and n
55 func randomOffset(n int) int {
56         if n == 0 {
57                 return 0
58         }
59         return int(rand.Uint32() % uint32(n))
60 }
61
62 // suspicionTimeout computes the timeout that should be used when
63 // a node is suspected
64 func suspicionTimeout(suspicionMult, n int, interval time.Duration) time.Duration {
65         nodeScale := math.Max(1.0, math.Log10(math.Max(1.0, float64(n))))
66         // multiply by 1000 to keep some precision because time.Duration is an int64 type
67         timeout := time.Duration(suspicionMult) * time.Duration(nodeScale*1000) * interval / 1000
68         return timeout
69 }
70
71 // retransmitLimit computes the limit of retransmissions
72 func retransmitLimit(retransmitMult, n int) int {
73         nodeScale := math.Ceil(math.Log10(float64(n + 1)))
74         limit := retransmitMult * int(nodeScale)
75         return limit
76 }
77
78 // shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle
79 func shuffleNodes(nodes []*nodeState) {
80         n := len(nodes)
81         for i := n - 1; i > 0; i-- {
82                 j := rand.Intn(i + 1)
83                 nodes[i], nodes[j] = nodes[j], nodes[i]
84         }
85 }
86
87 // pushPushScale is used to scale the time interval at which push/pull
88 // syncs take place. It is used to prevent network saturation as the
89 // cluster size grows
90 func pushPullScale(interval time.Duration, n int) time.Duration {
91         // Don't scale until we cross the threshold
92         if n <= pushPullScaleThreshold {
93                 return interval
94         }
95
96         multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(pushPullScaleThreshold)) + 1.0
97         return time.Duration(multiplier) * interval
98 }
99
100 // moveDeadNodes moves nodes that are dead and beyond the gossip to the dead interval
101 // to the end of the slice and returns the index of the first moved node.
102 func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int {
103         numDead := 0
104         n := len(nodes)
105         for i := 0; i < n-numDead; i++ {
106                 if nodes[i].State != stateDead {
107                         continue
108                 }
109
110                 // Respect the gossip to the dead interval
111                 if time.Since(nodes[i].StateChange) <= gossipToTheDeadTime {
112                         continue
113                 }
114
115                 // Move this node to the end
116                 nodes[i], nodes[n-numDead-1] = nodes[n-numDead-1], nodes[i]
117                 numDead++
118                 i--
119         }
120         return n - numDead
121 }
122
123 // kRandomNodes is used to select up to k random nodes, excluding any nodes where
124 // the filter function returns true. It is possible that less than k nodes are
125 // returned.
126 func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState {
127         n := len(nodes)
128         kNodes := make([]*nodeState, 0, k)
129 OUTER:
130         // Probe up to 3*n times, with large n this is not necessary
131         // since k << n, but with small n we want search to be
132         // exhaustive
133         for i := 0; i < 3*n && len(kNodes) < k; i++ {
134                 // Get random node
135                 idx := randomOffset(n)
136                 node := nodes[idx]
137
138                 // Give the filter a shot at it.
139                 if filterFn != nil && filterFn(node) {
140                         continue OUTER
141                 }
142
143                 // Check if we have this node already
144                 for j := 0; j < len(kNodes); j++ {
145                         if node == kNodes[j] {
146                                 continue OUTER
147                         }
148                 }
149
150                 // Append the node
151                 kNodes = append(kNodes, node)
152         }
153         return kNodes
154 }
155
156 // makeCompoundMessage takes a list of messages and generates
157 // a single compound message containing all of them
158 func makeCompoundMessage(msgs [][]byte) *bytes.Buffer {
159         // Create a local buffer
160         buf := bytes.NewBuffer(nil)
161
162         // Write out the type
163         buf.WriteByte(uint8(compoundMsg))
164
165         // Write out the number of message
166         buf.WriteByte(uint8(len(msgs)))
167
168         // Add the message lengths
169         for _, m := range msgs {
170                 binary.Write(buf, binary.BigEndian, uint16(len(m)))
171         }
172
173         // Append the messages
174         for _, m := range msgs {
175                 buf.Write(m)
176         }
177
178         return buf
179 }
180
181 // decodeCompoundMessage splits a compound message and returns
182 // the slices of individual messages. Also returns the number
183 // of truncated messages and any potential error
184 func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
185         if len(buf) < 1 {
186                 err = fmt.Errorf("missing compound length byte")
187                 return
188         }
189         numParts := uint8(buf[0])
190         buf = buf[1:]
191
192         // Check we have enough bytes
193         if len(buf) < int(numParts*2) {
194                 err = fmt.Errorf("truncated len slice")
195                 return
196         }
197
198         // Decode the lengths
199         lengths := make([]uint16, numParts)
200         for i := 0; i < int(numParts); i++ {
201                 lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
202         }
203         buf = buf[numParts*2:]
204
205         // Split each message
206         for idx, msgLen := range lengths {
207                 if len(buf) < int(msgLen) {
208                         trunc = int(numParts) - idx
209                         return
210                 }
211
212                 // Extract the slice, seek past on the buffer
213                 slice := buf[:msgLen]
214                 buf = buf[msgLen:]
215                 parts = append(parts, slice)
216         }
217         return
218 }
219
220 // Given a string of the form "host", "host:port",
221 // "ipv6::addr" or "[ipv6::address]:port",
222 // return true if the string includes a port.
223 func hasPort(s string) bool {
224         last := strings.LastIndex(s, ":")
225         if last == -1 {
226                 return false
227         }
228         if s[0] == '[' {
229                 return s[last-1] == ']'
230         }
231         return strings.Index(s, ":") == last
232 }
233
234 // compressPayload takes an opaque input buffer, compresses it
235 // and wraps it in a compress{} message that is encoded.
236 func compressPayload(inp []byte) (*bytes.Buffer, error) {
237         var buf bytes.Buffer
238         compressor := lzw.NewWriter(&buf, lzw.LSB, lzwLitWidth)
239
240         _, err := compressor.Write(inp)
241         if err != nil {
242                 return nil, err
243         }
244
245         // Ensure we flush everything out
246         if err := compressor.Close(); err != nil {
247                 return nil, err
248         }
249
250         // Create a compressed message
251         c := compress{
252                 Algo: lzwAlgo,
253                 Buf:  buf.Bytes(),
254         }
255         return encode(compressMsg, &c)
256 }
257
258 // decompressPayload is used to unpack an encoded compress{}
259 // message and return its payload uncompressed
260 func decompressPayload(msg []byte) ([]byte, error) {
261         // Decode the message
262         var c compress
263         if err := decode(msg, &c); err != nil {
264                 return nil, err
265         }
266         return decompressBuffer(&c)
267 }
268
269 // decompressBuffer is used to decompress the buffer of
270 // a single compress message, handling multiple algorithms
271 func decompressBuffer(c *compress) ([]byte, error) {
272         // Verify the algorithm
273         if c.Algo != lzwAlgo {
274                 return nil, fmt.Errorf("Cannot decompress unknown algorithm %d", c.Algo)
275         }
276
277         // Create a uncompressor
278         uncomp := lzw.NewReader(bytes.NewReader(c.Buf), lzw.LSB, lzwLitWidth)
279         defer uncomp.Close()
280
281         // Read all the data
282         var b bytes.Buffer
283         _, err := io.Copy(&b, uncomp)
284         if err != nil {
285                 return nil, err
286         }
287
288         // Return the uncompressed bytes
289         return b.Bytes(), nil
290 }
291
292 // joinHostPort returns the host:port form of an address, for use with a
293 // transport.
294 func joinHostPort(host string, port uint16) string {
295         return net.JoinHostPort(host, strconv.Itoa(int(port)))
296 }