3 //go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto
12 "github.com/Sirupsen/logrus"
13 "github.com/armon/go-radix"
14 "github.com/docker/go-events"
15 "github.com/docker/libnetwork/types"
16 "github.com/hashicorp/memberlist"
17 "github.com/hashicorp/serf/serf"
21 byTable int = 1 + iota
25 // NetworkDB instance drives the networkdb cluster and acts the broker
26 // for cluster-scoped and network-scoped gossip and watches.
27 type NetworkDB struct {
28 // The clocks MUST be the first things
29 // in this struct due to Golang issue #599.
31 // Global lamport clock for node network attach events.
32 networkClock serf.LamportClock
34 // Global lamport clock for table events.
35 tableClock serf.LamportClock
39 // NetworkDB configuration.
42 // All the tree index (byTable, byNetwork) that we maintain
44 indexes map[int]*radix.Tree
46 // Memberlist we use to drive the cluster.
47 memberlist *memberlist.Memberlist
49 // List of all peer nodes in the cluster not-limited to any
51 nodes map[string]*node
53 // List of all peer nodes which have failed
54 failedNodes map[string]*node
56 // List of all peer nodes which have left
57 leftNodes map[string]*node
59 // A multi-dimensional map of network/node attachmemts. The
60 // first key is a node name and the second key is a network ID
61 // for the network that node is participating in.
62 networks map[string]map[string]*network
64 // A map of nodes which are participating in a given
65 // network. The key is a network ID.
66 networkNodes map[string][]string
68 // A table of ack channels for every node from which we are
69 // waiting for an ack.
70 bulkSyncAckTbl map[string]chan struct{}
72 // Broadcast queue for network event gossip.
73 networkBroadcasts *memberlist.TransmitLimitedQueue
75 // Broadcast queue for node event gossip.
76 nodeBroadcasts *memberlist.TransmitLimitedQueue
78 // A central stop channel to stop all go routines running on
79 // behalf of the NetworkDB instance.
82 // A central broadcaster for all local watchers watching table
84 broadcaster *events.Broadcaster
86 // List of all tickers which needed to be stopped when
88 tickers []*time.Ticker
90 // Reference to the memberlist's keyring to add & remove keys
91 keyring *memberlist.Keyring
93 // bootStrapIP is the list of IPs that can be used to bootstrap
98 // PeerInfo represents the peer (gossip cluster) nodes of a network
99 type PeerInfo struct {
106 ltime serf.LamportTime
107 // Number of hours left before the reaper removes the node
108 reapTime time.Duration
111 // network describes the node/network attachment.
112 type network struct {
116 // Lamport time for the latest state of the entry.
117 ltime serf.LamportTime
119 // Node leave is in progress.
122 // Number of seconds still left before a deleted network entry gets
123 // removed from networkDB
124 reapTime time.Duration
126 // The broadcast queue for table event gossip. This is only
127 // initialized for this node's network attachment entries.
128 tableBroadcasts *memberlist.TransmitLimitedQueue
131 // Config represents the configuration of the networdb instance and
132 // can be passed by the caller.
134 // NodeName is the cluster wide unique name for this node.
137 // BindAddr is the IP on which networkdb listens. It can be
138 // 0.0.0.0 to listen on all addresses on the host.
141 // AdvertiseAddr is the node's IP address that we advertise for
142 // cluster communication.
145 // BindPort is the local node's port to which we bind to for
146 // cluster communication.
149 // Keys to be added to the Keyring of the memberlist. Key at index
150 // 0 is the primary key
154 // entry defines a table entry
156 // node from which this entry was learned.
159 // Lamport time for the most recent update to the entry
160 ltime serf.LamportTime
162 // Opaque value store in the entry
165 // Deleting the entry is in progress. All entries linger in
166 // the cluster for certain amount of time after deletion.
169 // Number of seconds still left before a deleted table entry gets
170 // removed from networkDB
171 reapTime time.Duration
174 // New creates a new instance of NetworkDB using the Config passed by
176 func New(c *Config) (*NetworkDB, error) {
179 indexes: make(map[int]*radix.Tree),
180 networks: make(map[string]map[string]*network),
181 nodes: make(map[string]*node),
182 failedNodes: make(map[string]*node),
183 leftNodes: make(map[string]*node),
184 networkNodes: make(map[string][]string),
185 bulkSyncAckTbl: make(map[string]chan struct{}),
186 broadcaster: events.NewBroadcaster(),
189 nDB.indexes[byTable] = radix.New()
190 nDB.indexes[byNetwork] = radix.New()
192 if err := nDB.clusterInit(); err != nil {
199 // Join joins this NetworkDB instance with a list of peer NetworkDB
200 // instances passed by the caller in the form of addr:port
201 func (nDB *NetworkDB) Join(members []string) error {
203 for _, m := range members {
204 nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
207 return nDB.clusterJoin(members)
210 // Close destroys this NetworkDB instance by leave the cluster,
211 // stopping timers, canceling goroutines etc.
212 func (nDB *NetworkDB) Close() {
213 if err := nDB.clusterLeave(); err != nil {
214 logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
218 // Peers returns the gossip peers for a given network.
219 func (nDB *NetworkDB) Peers(nid string) []PeerInfo {
222 peers := make([]PeerInfo, 0, len(nDB.networkNodes[nid]))
223 for _, nodeName := range nDB.networkNodes[nid] {
224 if node, ok := nDB.nodes[nodeName]; ok {
225 peers = append(peers, PeerInfo{
227 IP: node.Addr.String(),
234 // GetEntry retrieves the value of a table entry in a given (network,
236 func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
237 entry, err := nDB.getEntry(tname, nid, key)
242 return entry.value, nil
245 func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
249 e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
251 return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
254 return e.(*entry), nil
257 // CreateEntry creates a table entry in NetworkDB for given (network,
258 // table, key) tuple and if the NetworkDB is part of the cluster
259 // propagates this event to the cluster. It is an error to create an
260 // entry for the same tuple for which there is already an existing
261 // entry unless the current entry is deleting state.
262 func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
263 oldEntry, err := nDB.getEntry(tname, nid, key)
265 if _, ok := err.(types.NotFoundError); !ok {
266 return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err)
269 if oldEntry != nil && !oldEntry.deleting {
270 return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key)
274 ltime: nDB.tableClock.Increment(),
275 node: nDB.config.NodeName,
279 if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil {
280 return fmt.Errorf("cannot send create event for table %s, %v", tname, err)
284 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
285 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
291 // UpdateEntry updates a table entry in NetworkDB for given (network,
292 // table, key) tuple and if the NetworkDB is part of the cluster
293 // propagates this event to the cluster. It is an error to update a
294 // non-existent entry.
295 func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
296 if _, err := nDB.GetEntry(tname, nid, key); err != nil {
297 return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
301 ltime: nDB.tableClock.Increment(),
302 node: nDB.config.NodeName,
306 if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil {
307 return fmt.Errorf("cannot send table update event: %v", err)
311 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
312 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
318 // GetTableByNetwork walks the networkdb by the give table and network id and
319 // returns a map of keys and values
320 func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]interface{} {
321 entries := make(map[string]interface{})
322 nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s/%s", tname, nid), func(k string, v interface{}) bool {
327 key := k[strings.LastIndex(k, "/")+1:]
328 entries[key] = entry.value
334 // DeleteEntry deletes a table entry in NetworkDB for given (network,
335 // table, key) tuple and if the NetworkDB is part of the cluster
336 // propagates this event to the cluster.
337 func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
338 value, err := nDB.GetEntry(tname, nid, key)
340 return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
344 ltime: nDB.tableClock.Increment(),
345 node: nDB.config.NodeName,
348 reapTime: reapInterval,
351 if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
352 return fmt.Errorf("cannot send table delete event: %v", err)
356 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
357 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
363 func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
365 for nid, nodes := range nDB.networkNodes {
366 updatedNodes := make([]string, 0, len(nodes))
367 for _, node := range nodes {
368 if node == deletedNode {
372 updatedNodes = append(updatedNodes, node)
375 nDB.networkNodes[nid] = updatedNodes
378 delete(nDB.networks, deletedNode)
382 func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
384 nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid),
385 func(path string, v interface{}) bool {
386 oldEntry := v.(*entry)
387 params := strings.Split(path[1:], "/")
392 if oldEntry.node != node {
397 ltime: oldEntry.ltime,
399 value: oldEntry.value,
401 reapTime: reapInterval,
404 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
405 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
407 nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
413 func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
415 nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
416 oldEntry := v.(*entry)
417 if oldEntry.node != node {
421 params := strings.Split(path[1:], "/")
427 ltime: oldEntry.ltime,
429 value: oldEntry.value,
431 reapTime: reapInterval,
434 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
435 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
437 nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
443 // WalkTable walks a single table in NetworkDB and invokes the passed
444 // function for each entry in the table passing the network, key,
445 // value. The walk stops if the passed function returns a true.
446 func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
448 values := make(map[string]interface{})
449 nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
455 for k, v := range values {
456 params := strings.Split(k[1:], "/")
459 if fn(nid, key, v.(*entry).value) {
467 // JoinNetwork joins this node to a given network and propagates this
468 // event across the cluster. This triggers this node joining the
469 // sub-cluster of this network and participates in the network-scoped
470 // gossip and bulk sync for this network.
471 func (nDB *NetworkDB) JoinNetwork(nid string) error {
472 ltime := nDB.networkClock.Increment()
475 nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
477 nodeNetworks = make(map[string]*network)
478 nDB.networks[nDB.config.NodeName] = nodeNetworks
480 nodeNetworks[nid] = &network{id: nid, ltime: ltime}
481 nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
482 NumNodes: func() int {
484 num := len(nDB.networkNodes[nid])
490 nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
491 networkNodes := nDB.networkNodes[nid]
494 if err := nDB.sendNetworkEvent(nid, NetworkEventTypeJoin, ltime); err != nil {
495 return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
498 logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
499 if _, err := nDB.bulkSync(networkNodes, true); err != nil {
500 logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
506 // LeaveNetwork leaves this node from a given network and propagates
507 // this event across the cluster. This triggers this node leaving the
508 // sub-cluster of this network and as a result will no longer
509 // participate in the network-scoped gossip and bulk sync for this
510 // network. Also remove all the table entries for this network from
512 func (nDB *NetworkDB) LeaveNetwork(nid string) error {
513 ltime := nDB.networkClock.Increment()
514 if err := nDB.sendNetworkEvent(nid, NetworkEventTypeLeave, ltime); err != nil {
515 return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
525 nwWalker := func(path string, v interface{}) bool {
526 entry, ok := v.(*entry)
530 paths = append(paths, path)
531 entries = append(entries, entry)
535 nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker)
536 for _, path := range paths {
537 params := strings.Split(path[1:], "/")
541 if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
542 logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
545 if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
546 logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
550 nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
552 return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
555 n, ok := nodeNetworks[nid]
557 return fmt.Errorf("could not find network %s while trying to leave", nid)
565 // addNetworkNode adds the node to the list of nodes which participate
566 // in the passed network only if it is not already present. Caller
567 // should hold the NetworkDB lock while calling this
568 func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
569 nodes := nDB.networkNodes[nid]
570 for _, node := range nodes {
571 if node == nodeName {
576 nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
579 // Deletes the node from the list of nodes which participate in the
580 // passed network. Caller should hold the NetworkDB lock while calling
582 func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
583 nodes := nDB.networkNodes[nid]
584 newNodes := make([]string, 0, len(nodes)-1)
585 for _, name := range nodes {
586 if name == nodeName {
589 newNodes = append(newNodes, name)
591 nDB.networkNodes[nid] = newNodes
594 // findCommonnetworks find the networks that both this node and the
595 // passed node have joined.
596 func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
600 var networks []string
601 for nid := range nDB.networks[nDB.config.NodeName] {
602 if n, ok := nDB.networks[nodeName][nid]; ok {
604 networks = append(networks, nid)
612 func (nDB *NetworkDB) updateLocalNetworkTime() {
616 ltime := nDB.networkClock.Increment()
617 for _, n := range nDB.networks[nDB.config.NodeName] {
622 func (nDB *NetworkDB) updateLocalTableTime() {
626 ltime := nDB.tableClock.Increment()
627 nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
629 if entry.node != nDB.config.NodeName {
633 params := strings.Split(path[1:], "/")
639 nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
640 nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)