Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / libnetwork / networkdb / networkdb.go
1 package networkdb
2
3 //go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
4
5 import (
6         "fmt"
7         "net"
8         "strings"
9         "sync"
10         "time"
11
12         "github.com/Sirupsen/logrus"
13         "github.com/armon/go-radix"
14         "github.com/docker/go-events"
15         "github.com/docker/libnetwork/types"
16         "github.com/hashicorp/memberlist"
17         "github.com/hashicorp/serf/serf"
18 )
19
20 const (
21         byTable int = 1 + iota
22         byNetwork
23 )
24
25 // NetworkDB instance drives the networkdb cluster and acts the broker
26 // for cluster-scoped and network-scoped gossip and watches.
27 type NetworkDB struct {
28         // The clocks MUST be the first things
29         // in this struct due to Golang issue #599.
30
31         // Global lamport clock for node network attach events.
32         networkClock serf.LamportClock
33
34         // Global lamport clock for table events.
35         tableClock serf.LamportClock
36
37         sync.RWMutex
38
39         // NetworkDB configuration.
40         config *Config
41
42         // All the tree index (byTable, byNetwork) that we maintain
43         // the db.
44         indexes map[int]*radix.Tree
45
46         // Memberlist we use to drive the cluster.
47         memberlist *memberlist.Memberlist
48
49         // List of all peer nodes in the cluster not-limited to any
50         // network.
51         nodes map[string]*node
52
53         // List of all peer nodes which have failed
54         failedNodes map[string]*node
55
56         // List of all peer nodes which have left
57         leftNodes map[string]*node
58
59         // A multi-dimensional map of network/node attachmemts. The
60         // first key is a node name and the second key is a network ID
61         // for the network that node is participating in.
62         networks map[string]map[string]*network
63
64         // A map of nodes which are participating in a given
65         // network. The key is a network ID.
66         networkNodes map[string][]string
67
68         // A table of ack channels for every node from which we are
69         // waiting for an ack.
70         bulkSyncAckTbl map[string]chan struct{}
71
72         // Broadcast queue for network event gossip.
73         networkBroadcasts *memberlist.TransmitLimitedQueue
74
75         // Broadcast queue for node event gossip.
76         nodeBroadcasts *memberlist.TransmitLimitedQueue
77
78         // A central stop channel to stop all go routines running on
79         // behalf of the NetworkDB instance.
80         stopCh chan struct{}
81
82         // A central broadcaster for all local watchers watching table
83         // events.
84         broadcaster *events.Broadcaster
85
86         // List of all tickers which needed to be stopped when
87         // cleaning up.
88         tickers []*time.Ticker
89
90         // Reference to the memberlist's keyring to add & remove keys
91         keyring *memberlist.Keyring
92
93         // bootStrapIP is the list of IPs that can be used to bootstrap
94         // the gossip.
95         bootStrapIP []net.IP
96 }
97
98 // PeerInfo represents the peer (gossip cluster) nodes of a network
99 type PeerInfo struct {
100         Name string
101         IP   string
102 }
103
104 type node struct {
105         memberlist.Node
106         ltime serf.LamportTime
107         // Number of hours left before the reaper removes the node
108         reapTime time.Duration
109 }
110
111 // network describes the node/network attachment.
112 type network struct {
113         // Network ID
114         id string
115
116         // Lamport time for the latest state of the entry.
117         ltime serf.LamportTime
118
119         // Node leave is in progress.
120         leaving bool
121
122         // Number of seconds still left before a deleted network entry gets
123         // removed from networkDB
124         reapTime time.Duration
125
126         // The broadcast queue for table event gossip. This is only
127         // initialized for this node's network attachment entries.
128         tableBroadcasts *memberlist.TransmitLimitedQueue
129 }
130
131 // Config represents the configuration of the networdb instance and
132 // can be passed by the caller.
133 type Config struct {
134         // NodeName is the cluster wide unique name for this node.
135         NodeName string
136
137         // BindAddr is the IP on which networkdb listens. It can be
138         // 0.0.0.0 to listen on all addresses on the host.
139         BindAddr string
140
141         // AdvertiseAddr is the node's IP address that we advertise for
142         // cluster communication.
143         AdvertiseAddr string
144
145         // BindPort is the local node's port to which we bind to for
146         // cluster communication.
147         BindPort int
148
149         // Keys to be added to the Keyring of the memberlist. Key at index
150         // 0 is the primary key
151         Keys [][]byte
152 }
153
154 // entry defines a table entry
155 type entry struct {
156         // node from which this entry was learned.
157         node string
158
159         // Lamport time for the most recent update to the entry
160         ltime serf.LamportTime
161
162         // Opaque value store in the entry
163         value []byte
164
165         // Deleting the entry is in progress. All entries linger in
166         // the cluster for certain amount of time after deletion.
167         deleting bool
168
169         // Number of seconds still left before a deleted table entry gets
170         // removed from networkDB
171         reapTime time.Duration
172 }
173
174 // New creates a new instance of NetworkDB using the Config passed by
175 // the caller.
176 func New(c *Config) (*NetworkDB, error) {
177         nDB := &NetworkDB{
178                 config:         c,
179                 indexes:        make(map[int]*radix.Tree),
180                 networks:       make(map[string]map[string]*network),
181                 nodes:          make(map[string]*node),
182                 failedNodes:    make(map[string]*node),
183                 leftNodes:      make(map[string]*node),
184                 networkNodes:   make(map[string][]string),
185                 bulkSyncAckTbl: make(map[string]chan struct{}),
186                 broadcaster:    events.NewBroadcaster(),
187         }
188
189         nDB.indexes[byTable] = radix.New()
190         nDB.indexes[byNetwork] = radix.New()
191
192         if err := nDB.clusterInit(); err != nil {
193                 return nil, err
194         }
195
196         return nDB, nil
197 }
198
199 // Join joins this NetworkDB instance with a list of peer NetworkDB
200 // instances passed by the caller in the form of addr:port
201 func (nDB *NetworkDB) Join(members []string) error {
202         nDB.Lock()
203         for _, m := range members {
204                 nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
205         }
206         nDB.Unlock()
207         return nDB.clusterJoin(members)
208 }
209
210 // Close destroys this NetworkDB instance by leave the cluster,
211 // stopping timers, canceling goroutines etc.
212 func (nDB *NetworkDB) Close() {
213         if err := nDB.clusterLeave(); err != nil {
214                 logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
215         }
216 }
217
218 // Peers returns the gossip peers for a given network.
219 func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
220         nDB.RLock()
221         defer nDB.RUnlock()
222         peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
223         for _, nodeName := range nDB.networkNodes[nid] {
224                 if node, ok := nDB.nodes[nodeName]; ok {
225                         peers = append(peers, PeerInfo{
226                                 Name: node.Name,
227                                 IP:   node.Addr.String(),
228                         })
229                 }
230         }
231         return peers
232 }
233
234 // GetEntry retrieves the value of a table entry in a given (network,
235 // table, key) tuple
236 func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
237         entry, err := nDB.getEntry(tname, nid, key)
238         if err != nil {
239                 return nil, err
240         }
241
242         return entry.value, nil
243 }
244
245 func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
246         nDB.RLock()
247         defer nDB.RUnlock()
248
249         e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
250         if !ok {
251                 return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
252         }
253
254         return e.(*entry), nil
255 }
256
257 // CreateEntry creates a table entry in NetworkDB for given (network,
258 // table, key) tuple and if the NetworkDB is part of the cluster
259 // propagates this event to the cluster. It is an error to create an
260 // entry for the same tuple for which there is already an existing
261 // entry unless the current entry is deleting state.
262 func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
263         oldEntry, err := nDB.getEntry(tname, nid, key)
264         if err != nil {
265                 if _, ok := err.(types.NotFoundError); !ok {
266                         return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
267                 }
268         }
269         if oldEntry != nil && !oldEntry.deleting {
270                 return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
271         }
272
273         entry := &entry{
274                 ltime: nDB.tableClock.Increment(),
275                 node:  nDB.config.NodeName,
276                 value: value,
277         }
278
279         if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
280                 return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
281         }
282
283         nDB.Lock()
284         nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
285         nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
286         nDB.Unlock()
287
288         return nil
289 }
290
291 // UpdateEntry updates a table entry in NetworkDB for given (network,
292 // table, key) tuple and if the NetworkDB is part of the cluster
293 // propagates this event to the cluster. It is an error to update a
294 // non-existent entry.
295 func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
296         if _, err := nDB.GetEntry(tname, nid, key); err != nil {
297                 return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
298         }
299
300         entry := &entry{
301                 ltime: nDB.tableClock.Increment(),
302                 node:  nDB.config.NodeName,
303                 value: value,
304         }
305
306         if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
307                 return fmt.Errorf("cannot send table update event: %v", err)
308         }
309
310         nDB.Lock()
311         nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
312         nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
313         nDB.Unlock()
314
315         return nil
316 }
317
318 // GetTableByNetwork walks the networkdb by the give table and network id and
319 // returns a map of keys and values
320 func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]interface{} {
321         entries := make(map[string]interface{})
322         nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
323                 entry := v.(*entry)
324                 if entry.deleting {
325                         return false
326                 }
327                 key := k[strings.LastIndex(k, "/")+1:]
328                 entries[key] = entry.value
329                 return false
330         })
331         return entries
332 }
333
334 // DeleteEntry deletes a table entry in NetworkDB for given (network,
335 // table, key) tuple and if the NetworkDB is part of the cluster
336 // propagates this event to the cluster.
337 func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
338         value, err := nDB.GetEntry(tname, nid, key)
339         if err != nil {
340                 return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
341         }
342
343         entry := &entry{
344                 ltime:    nDB.tableClock.Increment(),
345                 node:     nDB.config.NodeName,
346                 value:    value,
347                 deleting: true,
348                 reapTime: reapInterval,
349         }
350
351         if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
352                 return fmt.Errorf("cannot send table delete event: %v", err)
353         }
354
355         nDB.Lock()
356         nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
357         nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
358         nDB.Unlock()
359
360         return nil
361 }
362
363 func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
364         nDB.Lock()
365         for nid, nodes := range nDB.networkNodes {
366                 updatedNodes := make([]string, 0, len(nodes))
367                 for _, node := range nodes {
368                         if node == deletedNode {
369                                 continue
370                         }
371
372                         updatedNodes = append(updatedNodes, node)
373                 }
374
375                 nDB.networkNodes[nid] = updatedNodes
376         }
377
378         delete(nDB.networks, deletedNode)
379         nDB.Unlock()
380 }
381
382 func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
383         nDB.Lock()
384         nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
385                 func(path string, v interface{}) bool {
386                         oldEntry := v.(*entry)
387                         params := strings.Split(path[1:], "/")
388                         nid := params[0]
389                         tname := params[1]
390                         key := params[2]
391
392                         if oldEntry.node != node {
393                                 return false
394                         }
395
396                         entry := &entry{
397                                 ltime:    oldEntry.ltime,
398                                 node:     node,
399                                 value:    oldEntry.value,
400                                 deleting: true,
401                                 reapTime: reapInterval,
402                         }
403
404                         nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
405                         nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
406
407                         nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
408                         return false
409                 })
410         nDB.Unlock()
411 }
412
413 func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
414         nDB.Lock()
415         nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
416                 oldEntry := v.(*entry)
417                 if oldEntry.node != node {
418                         return false
419                 }
420
421                 params := strings.Split(path[1:], "/")
422                 tname := params[0]
423                 nid := params[1]
424                 key := params[2]
425
426                 entry := &entry{
427                         ltime:    oldEntry.ltime,
428                         node:     node,
429                         value:    oldEntry.value,
430                         deleting: true,
431                         reapTime: reapInterval,
432                 }
433
434                 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
435                 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
436
437                 nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
438                 return false
439         })
440         nDB.Unlock()
441 }
442
443 // WalkTable walks a single table in NetworkDB and invokes the passed
444 // function for each entry in the table passing the network, key,
445 // value. The walk stops if the passed function returns a true.
446 func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
447         nDB.RLock()
448         values := make(map[string]interface{})
449         nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
450                 values[path] = v
451                 return false
452         })
453         nDB.RUnlock()
454
455         for k, v := range values {
456                 params := strings.Split(k[1:], "/")
457                 nid := params[1]
458                 key := params[2]
459                 if fn(nid, key, v.(*entry).value) {
460                         return nil
461                 }
462         }
463
464         return nil
465 }
466
467 // JoinNetwork joins this node to a given network and propagates this
468 // event across the cluster. This triggers this node joining the
469 // sub-cluster of this network and participates in the network-scoped
470 // gossip and bulk sync for this network.
471 func (nDB *NetworkDB) JoinNetwork(nid string) error {
472         ltime := nDB.networkClock.Increment()
473
474         nDB.Lock()
475         nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
476         if !ok {
477                 nodeNetworks = make(map[string]*network)
478                 nDB.networks[nDB.config.NodeName] = nodeNetworks
479         }
480         nodeNetworks[nid] = &network{id: nid, ltime: ltime}
481         nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
482                 NumNodes: func() int {
483                         nDB.RLock()
484                         num := len(nDB.networkNodes[nid])
485                         nDB.RUnlock()
486                         return num
487                 },
488                 RetransmitMult: 4,
489         }
490         nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
491         networkNodes := nDB.networkNodes[nid]
492         nDB.Unlock()
493
494         if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
495                 return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
496         }
497
498         logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
499         if _, err := nDB.bulkSync(networkNodes, true); err != nil {
500                 logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
501         }
502
503         return nil
504 }
505
506 // LeaveNetwork leaves this node from a given network and propagates
507 // this event across the cluster. This triggers this node leaving the
508 // sub-cluster of this network and as a result will no longer
509 // participate in the network-scoped gossip and bulk sync for this
510 // network. Also remove all the table entries for this network from
511 // networkdb
512 func (nDB *NetworkDB) LeaveNetwork(nid string) error {
513         ltime := nDB.networkClock.Increment()
514         if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
515                 return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
516         }
517
518         nDB.Lock()
519         defer nDB.Unlock()
520         var (
521                 paths   []string
522                 entries []*entry
523         )
524
525         nwWalker := func(path string, v interface{}) bool {
526                 entry, ok := v.(*entry)
527                 if !ok {
528                         return false
529                 }
530                 paths = append(paths, path)
531                 entries = append(entries, entry)
532                 return false
533         }
534
535         nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
536         for _, path := range paths {
537                 params := strings.Split(path[1:], "/")
538                 tname := params[1]
539                 key := params[2]
540
541                 if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
542                         logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
543                 }
544
545                 if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
546                         logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
547                 }
548         }
549
550         nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
551         if !ok {
552                 return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
553         }
554
555         n, ok := nodeNetworks[nid]
556         if !ok {
557                 return fmt.Errorf("could not find network %s while trying to leave", nid)
558         }
559
560         n.ltime = ltime
561         n.leaving = true
562         return nil
563 }
564
565 // addNetworkNode adds the node to the list of nodes which participate
566 // in the passed network only if it is not already present. Caller
567 // should hold the NetworkDB lock while calling this
568 func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
569         nodes := nDB.networkNodes[nid]
570         for _, node := range nodes {
571                 if node == nodeName {
572                         return
573                 }
574         }
575
576         nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
577 }
578
579 // Deletes the node from the list of nodes which participate in the
580 // passed network. Caller should hold the NetworkDB lock while calling
581 // this
582 func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
583         nodes := nDB.networkNodes[nid]
584         newNodes := make([]string, 0, len(nodes)-1)
585         for _, name := range nodes {
586                 if name == nodeName {
587                         continue
588                 }
589                 newNodes = append(newNodes, name)
590         }
591         nDB.networkNodes[nid] = newNodes
592 }
593
594 // findCommonnetworks find the networks that both this node and the
595 // passed node have joined.
596 func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
597         nDB.RLock()
598         defer nDB.RUnlock()
599
600         var networks []string
601         for nid := range nDB.networks[nDB.config.NodeName] {
602                 if n, ok := nDB.networks[nodeName][nid]; ok {
603                         if !n.leaving {
604                                 networks = append(networks, nid)
605                         }
606                 }
607         }
608
609         return networks
610 }
611
612 func (nDB *NetworkDB) updateLocalNetworkTime() {
613         nDB.Lock()
614         defer nDB.Unlock()
615
616         ltime := nDB.networkClock.Increment()
617         for _, n := range nDB.networks[nDB.config.NodeName] {
618                 n.ltime = ltime
619         }
620 }
621
622 func (nDB *NetworkDB) updateLocalTableTime() {
623         nDB.Lock()
624         defer nDB.Unlock()
625
626         ltime := nDB.tableClock.Increment()
627         nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
628                 entry := v.(*entry)
629                 if entry.node != nDB.config.NodeName {
630                         return false
631                 }
632
633                 params := strings.Split(path[1:], "/")
634                 tname := params[0]
635                 nid := params[1]
636                 key := params[2]
637                 entry.ltime = ltime
638
639                 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
640                 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
641
642                 return false
643         })
644 }