16 "github.com/hashicorp/go-msgpack/codec"
17 "github.com/sean-/seed"
20 // pushPullScale is the minimum number of nodes
21 // before we start scaling the push/pull timing. The scale
22 // effect is the log2(Nodes) - log2(pushPullScale). This means
23 // that the 33rd node will cause us to double the interval,
24 // while the 65th will triple it.
25 const pushPullScaleThreshold = 32
28 // Constant litWidth 2-8
36 // Decode reverses the encode operation on a byte slice input
37 func decode(buf []byte, out interface{}) error {
38 r := bytes.NewReader(buf)
39 hd := codec.MsgpackHandle{}
40 dec := codec.NewDecoder(r, &hd)
41 return dec.Decode(out)
44 // Encode writes an encoded object to a new bytes buffer
45 func encode(msgType messageType, in interface{}) (*bytes.Buffer, error) {
46 buf := bytes.NewBuffer(nil)
47 buf.WriteByte(uint8(msgType))
48 hd := codec.MsgpackHandle{}
49 enc := codec.NewEncoder(buf, &hd)
54 // Returns a random offset between 0 and n
55 func randomOffset(n int) int {
59 return int(rand.Uint32() % uint32(n))
62 // suspicionTimeout computes the timeout that should be used when
63 // a node is suspected
64 func suspicionTimeout(suspicionMult, n int, interval time.Duration) time.Duration {
65 nodeScale := math.Max(1.0, math.Log10(math.Max(1.0, float64(n))))
66 // multiply by 1000 to keep some precision because time.Duration is an int64 type
67 timeout := time.Duration(suspicionMult) * time.Duration(nodeScale*1000) * interval / 1000
71 // retransmitLimit computes the limit of retransmissions
72 func retransmitLimit(retransmitMult, n int) int {
73 nodeScale := math.Ceil(math.Log10(float64(n + 1)))
74 limit := retransmitMult * int(nodeScale)
78 // shuffleNodes randomly shuffles the input nodes using the Fisher-Yates shuffle
79 func shuffleNodes(nodes []*nodeState) {
81 for i := n - 1; i > 0; i-- {
83 nodes[i], nodes[j] = nodes[j], nodes[i]
87 // pushPushScale is used to scale the time interval at which push/pull
88 // syncs take place. It is used to prevent network saturation as the
90 func pushPullScale(interval time.Duration, n int) time.Duration {
91 // Don't scale until we cross the threshold
92 if n <= pushPullScaleThreshold {
96 multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(pushPullScaleThreshold)) + 1.0
97 return time.Duration(multiplier) * interval
100 // moveDeadNodes moves nodes that are dead and beyond the gossip to the dead interval
101 // to the end of the slice and returns the index of the first moved node.
102 func moveDeadNodes(nodes []*nodeState, gossipToTheDeadTime time.Duration) int {
105 for i := 0; i < n-numDead; i++ {
106 if nodes[i].State != stateDead {
110 // Respect the gossip to the dead interval
111 if time.Since(nodes[i].StateChange) <= gossipToTheDeadTime {
115 // Move this node to the end
116 nodes[i], nodes[n-numDead-1] = nodes[n-numDead-1], nodes[i]
123 // kRandomNodes is used to select up to k random nodes, excluding any nodes where
124 // the filter function returns true. It is possible that less than k nodes are
126 func kRandomNodes(k int, nodes []*nodeState, filterFn func(*nodeState) bool) []*nodeState {
128 kNodes := make([]*nodeState, 0, k)
130 // Probe up to 3*n times, with large n this is not necessary
131 // since k << n, but with small n we want search to be
133 for i := 0; i < 3*n && len(kNodes) < k; i++ {
135 idx := randomOffset(n)
138 // Give the filter a shot at it.
139 if filterFn != nil && filterFn(node) {
143 // Check if we have this node already
144 for j := 0; j < len(kNodes); j++ {
145 if node == kNodes[j] {
151 kNodes = append(kNodes, node)
156 // makeCompoundMessage takes a list of messages and generates
157 // a single compound message containing all of them
158 func makeCompoundMessage(msgs [][]byte) *bytes.Buffer {
159 // Create a local buffer
160 buf := bytes.NewBuffer(nil)
162 // Write out the type
163 buf.WriteByte(uint8(compoundMsg))
165 // Write out the number of message
166 buf.WriteByte(uint8(len(msgs)))
168 // Add the message lengths
169 for _, m := range msgs {
170 binary.Write(buf, binary.BigEndian, uint16(len(m)))
173 // Append the messages
174 for _, m := range msgs {
181 // decodeCompoundMessage splits a compound message and returns
182 // the slices of individual messages. Also returns the number
183 // of truncated messages and any potential error
184 func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
186 err = fmt.Errorf("missing compound length byte")
189 numParts := uint8(buf[0])
192 // Check we have enough bytes
193 if len(buf) < int(numParts*2) {
194 err = fmt.Errorf("truncated len slice")
198 // Decode the lengths
199 lengths := make([]uint16, numParts)
200 for i := 0; i < int(numParts); i++ {
201 lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
203 buf = buf[numParts*2:]
205 // Split each message
206 for idx, msgLen := range lengths {
207 if len(buf) < int(msgLen) {
208 trunc = int(numParts) - idx
212 // Extract the slice, seek past on the buffer
213 slice := buf[:msgLen]
215 parts = append(parts, slice)
220 // Given a string of the form "host", "host:port",
221 // "ipv6::addr" or "[ipv6::address]:port",
222 // return true if the string includes a port.
223 func hasPort(s string) bool {
224 last := strings.LastIndex(s, ":")
229 return s[last-1] == ']'
231 return strings.Index(s, ":") == last
234 // compressPayload takes an opaque input buffer, compresses it
235 // and wraps it in a compress{} message that is encoded.
236 func compressPayload(inp []byte) (*bytes.Buffer, error) {
238 compressor := lzw.NewWriter(&buf, lzw.LSB, lzwLitWidth)
240 _, err := compressor.Write(inp)
245 // Ensure we flush everything out
246 if err := compressor.Close(); err != nil {
250 // Create a compressed message
255 return encode(compressMsg, &c)
258 // decompressPayload is used to unpack an encoded compress{}
259 // message and return its payload uncompressed
260 func decompressPayload(msg []byte) ([]byte, error) {
261 // Decode the message
263 if err := decode(msg, &c); err != nil {
266 return decompressBuffer(&c)
269 // decompressBuffer is used to decompress the buffer of
270 // a single compress message, handling multiple algorithms
271 func decompressBuffer(c *compress) ([]byte, error) {
272 // Verify the algorithm
273 if c.Algo != lzwAlgo {
274 return nil, fmt.Errorf("Cannot decompress unknown algorithm %d", c.Algo)
277 // Create a uncompressor
278 uncomp := lzw.NewReader(bytes.NewReader(c.Buf), lzw.LSB, lzwLitWidth)
283 _, err := io.Copy(&b, uncomp)
288 // Return the uncompressed bytes
289 return b.Bytes(), nil
292 // joinHostPort returns the host:port form of an address, for use with a
294 func joinHostPort(host string, port uint16) string {
295 return net.JoinHostPort(host, strconv.Itoa(int(port)))