Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / libnetwork / agent.go
1 package libnetwork
2
3 //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
4
5 import (
6         "encoding/json"
7         "fmt"
8         "net"
9         "os"
10         "sort"
11         "sync"
12
13         "github.com/Sirupsen/logrus"
14         "github.com/docker/docker/pkg/stringid"
15         "github.com/docker/go-events"
16         "github.com/docker/libnetwork/cluster"
17         "github.com/docker/libnetwork/datastore"
18         "github.com/docker/libnetwork/discoverapi"
19         "github.com/docker/libnetwork/driverapi"
20         "github.com/docker/libnetwork/networkdb"
21         "github.com/docker/libnetwork/types"
22         "github.com/gogo/protobuf/proto"
23 )
24
25 const (
26         subsysGossip = "networking:gossip"
27         subsysIPSec  = "networking:ipsec"
28         keyringSize  = 3
29 )
30
31 // ByTime implements sort.Interface for []*types.EncryptionKey based on
32 // the LamportTime field.
33 type ByTime []*types.EncryptionKey
34
35 func (b ByTime) Len() int           { return len(b) }
36 func (b ByTime) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
37 func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
38
39 type agent struct {
40         networkDB         *networkdb.NetworkDB
41         bindAddr          string
42         advertiseAddr     string
43         dataPathAddr      string
44         coreCancelFuncs   []func()
45         driverCancelFuncs map[string][]func()
46         sync.Mutex
47 }
48
49 func (a *agent) dataPathAddress() string {
50         a.Lock()
51         defer a.Unlock()
52         if a.dataPathAddr != "" {
53                 return a.dataPathAddr
54         }
55         return a.advertiseAddr
56 }
57
58 const libnetworkEPTable = "endpoint_table"
59
60 func getBindAddr(ifaceName string) (string, error) {
61         iface, err := net.InterfaceByName(ifaceName)
62         if err != nil {
63                 return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
64         }
65
66         addrs, err := iface.Addrs()
67         if err != nil {
68                 return "", fmt.Errorf("failed to get interface addresses: %v", err)
69         }
70
71         for _, a := range addrs {
72                 addr, ok := a.(*net.IPNet)
73                 if !ok {
74                         continue
75                 }
76                 addrIP := addr.IP
77
78                 if addrIP.IsLinkLocalUnicast() {
79                         continue
80                 }
81
82                 return addrIP.String(), nil
83         }
84
85         return "", fmt.Errorf("failed to get bind address")
86 }
87
88 func resolveAddr(addrOrInterface string) (string, error) {
89         // Try and see if this is a valid IP address
90         if net.ParseIP(addrOrInterface) != nil {
91                 return addrOrInterface, nil
92         }
93
94         addr, err := net.ResolveIPAddr("ip", addrOrInterface)
95         if err != nil {
96                 // If not a valid IP address, it should be a valid interface
97                 return getBindAddr(addrOrInterface)
98         }
99         return addr.String(), nil
100 }
101
102 func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
103         drvEnc := discoverapi.DriverEncryptionUpdate{}
104
105         a := c.getAgent()
106         if a == nil {
107                 logrus.Debug("Skipping key change as agent is nil")
108                 return nil
109         }
110
111         // Find the deleted key. If the deleted key was the primary key,
112         // a new primary key should be set before removing if from keyring.
113         c.Lock()
114         added := []byte{}
115         deleted := []byte{}
116         j := len(c.keys)
117         for i := 0; i < j; {
118                 same := false
119                 for _, key := range keys {
120                         if same = key.LamportTime == c.keys[i].LamportTime; same {
121                                 break
122                         }
123                 }
124                 if !same {
125                         cKey := c.keys[i]
126                         if cKey.Subsystem == subsysGossip {
127                                 deleted = cKey.Key
128                         }
129
130                         if cKey.Subsystem == subsysIPSec {
131                                 drvEnc.Prune = cKey.Key
132                                 drvEnc.PruneTag = cKey.LamportTime
133                         }
134                         c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
135                         c.keys[j-1] = nil
136                         j--
137                 }
138                 i++
139         }
140         c.keys = c.keys[:j]
141
142         // Find the new key and add it to the key ring
143         for _, key := range keys {
144                 same := false
145                 for _, cKey := range c.keys {
146                         if same = cKey.LamportTime == key.LamportTime; same {
147                                 break
148                         }
149                 }
150                 if !same {
151                         c.keys = append(c.keys, key)
152                         if key.Subsystem == subsysGossip {
153                                 added = key.Key
154                         }
155
156                         if key.Subsystem == subsysIPSec {
157                                 drvEnc.Key = key.Key
158                                 drvEnc.Tag = key.LamportTime
159                         }
160                 }
161         }
162         c.Unlock()
163
164         if len(added) > 0 {
165                 a.networkDB.SetKey(added)
166         }
167
168         key, _, err := c.getPrimaryKeyTag(subsysGossip)
169         if err != nil {
170                 return err
171         }
172         a.networkDB.SetPrimaryKey(key)
173
174         key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
175         if err != nil {
176                 return err
177         }
178         drvEnc.Primary = key
179         drvEnc.PrimaryTag = tag
180
181         if len(deleted) > 0 {
182                 a.networkDB.RemoveKey(deleted)
183         }
184
185         c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
186                 err := driver.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc)
187                 if err != nil {
188                         logrus.Warnf("Failed to update datapath keys in driver %s: %v", name, err)
189                 }
190                 return false
191         })
192
193         return nil
194 }
195
196 func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
197         agent := c.getAgent()
198
199         // If the agent is already present there is no need to try to initilize it again
200         if agent != nil {
201                 return nil
202         }
203
204         bindAddr := clusterProvider.GetLocalAddress()
205         advAddr := clusterProvider.GetAdvertiseAddress()
206         dataAddr := clusterProvider.GetDataPathAddress()
207         remoteList := clusterProvider.GetRemoteAddressList()
208         remoteAddrList := make([]string, 0, len(remoteList))
209         for _, remote := range remoteList {
210                 addr, _, _ := net.SplitHostPort(remote)
211                 remoteAddrList = append(remoteAddrList, addr)
212         }
213
214         listen := clusterProvider.GetListenAddress()
215         listenAddr, _, _ := net.SplitHostPort(listen)
216
217         logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
218                 listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
219         if advAddr != "" && agent == nil {
220                 if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
221                         logrus.Errorf("error in agentInit: %v", err)
222                         return err
223                 }
224                 c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
225                         if capability.ConnectivityScope == datastore.GlobalScope {
226                                 c.agentDriverNotify(driver)
227                         }
228                         return false
229                 })
230         }
231
232         if len(remoteAddrList) > 0 {
233                 if err := c.agentJoin(remoteAddrList); err != nil {
234                         logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
235                 }
236         }
237
238         return nil
239 }
240
241 // For a given subsystem getKeys sorts the keys by lamport time and returns
242 // slice of keys and lamport time which can used as a unique tag for the keys
243 func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
244         c.Lock()
245         defer c.Unlock()
246
247         sort.Sort(ByTime(c.keys))
248
249         keys := [][]byte{}
250         tags := []uint64{}
251         for _, key := range c.keys {
252                 if key.Subsystem == subsys {
253                         keys = append(keys, key.Key)
254                         tags = append(tags, key.LamportTime)
255                 }
256         }
257
258         keys[0], keys[1] = keys[1], keys[0]
259         tags[0], tags[1] = tags[1], tags[0]
260         return keys, tags
261 }
262
263 // getPrimaryKeyTag returns the primary key for a given subsystem from the
264 // list of sorted key and the associated tag
265 func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
266         c.Lock()
267         defer c.Unlock()
268         sort.Sort(ByTime(c.keys))
269         keys := []*types.EncryptionKey{}
270         for _, key := range c.keys {
271                 if key.Subsystem == subsys {
272                         keys = append(keys, key)
273                 }
274         }
275         return keys[1].Key, keys[1].LamportTime, nil
276 }
277
278 func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
279         bindAddr, err := resolveAddr(bindAddrOrInterface)
280         if err != nil {
281                 return err
282         }
283
284         keys, _ := c.getKeys(subsysGossip)
285         hostname, _ := os.Hostname()
286         nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
287         logrus.Info("Gossip cluster hostname ", nodeName)
288
289         nDB, err := networkdb.New(&networkdb.Config{
290                 BindAddr:      listenAddr,
291                 AdvertiseAddr: advertiseAddr,
292                 NodeName:      nodeName,
293                 Keys:          keys,
294         })
295
296         if err != nil {
297                 return err
298         }
299
300         var cancelList []func()
301         ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
302         cancelList = append(cancelList, cancel)
303         nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
304         cancelList = append(cancelList, cancel)
305
306         c.Lock()
307         c.agent = &agent{
308                 networkDB:         nDB,
309                 bindAddr:          bindAddr,
310                 advertiseAddr:     advertiseAddr,
311                 dataPathAddr:      dataPathAddr,
312                 coreCancelFuncs:   cancelList,
313                 driverCancelFuncs: make(map[string][]func()),
314         }
315         c.Unlock()
316
317         go c.handleTableEvents(ch, c.handleEpTableEvent)
318         go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
319
320         drvEnc := discoverapi.DriverEncryptionConfig{}
321         keys, tags := c.getKeys(subsysIPSec)
322         drvEnc.Keys = keys
323         drvEnc.Tags = tags
324
325         c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
326                 err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
327                 if err != nil {
328                         logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
329                 }
330                 return false
331         })
332
333         c.WalkNetworks(joinCluster)
334
335         return nil
336 }
337
338 func (c *controller) agentJoin(remoteAddrList []string) error {
339         agent := c.getAgent()
340         if agent == nil {
341                 return nil
342         }
343         return agent.networkDB.Join(remoteAddrList)
344 }
345
346 func (c *controller) agentDriverNotify(d driverapi.Driver) {
347         agent := c.getAgent()
348         if agent == nil {
349                 return
350         }
351
352         if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
353                 Address:     agent.dataPathAddress(),
354                 BindAddress: agent.bindAddr,
355                 Self:        true,
356         }); err != nil {
357                 logrus.Warnf("Failed the node discovery in driver: %v", err)
358         }
359
360         drvEnc := discoverapi.DriverEncryptionConfig{}
361         keys, tags := c.getKeys(subsysIPSec)
362         drvEnc.Keys = keys
363         drvEnc.Tags = tags
364
365         if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
366                 logrus.Warnf("Failed to set datapath keys in driver: %v", err)
367         }
368 }
369
370 func (c *controller) agentClose() {
371         // Acquire current agent instance and reset its pointer
372         // then run closing functions
373         c.Lock()
374         agent := c.agent
375         c.agent = nil
376         c.Unlock()
377
378         if agent == nil {
379                 return
380         }
381
382         var cancelList []func()
383
384         agent.Lock()
385         for _, cancelFuncs := range agent.driverCancelFuncs {
386                 for _, cancel := range cancelFuncs {
387                         cancelList = append(cancelList, cancel)
388                 }
389         }
390
391         // Add also the cancel functions for the network db
392         for _, cancel := range agent.coreCancelFuncs {
393                 cancelList = append(cancelList, cancel)
394         }
395         agent.Unlock()
396
397         for _, cancel := range cancelList {
398                 cancel()
399         }
400
401         agent.networkDB.Close()
402 }
403
404 // Task has the backend container details
405 type Task struct {
406         Name       string
407         EndpointID string
408         EndpointIP string
409         Info       map[string]string
410 }
411
412 // ServiceInfo has service specific details along with the list of backend tasks
413 type ServiceInfo struct {
414         VIP          string
415         LocalLBIndex int
416         Tasks        []Task
417         Ports        []string
418 }
419
420 type epRecord struct {
421         ep      EndpointRecord
422         info    map[string]string
423         lbIndex int
424 }
425
426 func (n *network) Services() map[string]ServiceInfo {
427         eps := make(map[string]epRecord)
428
429         if !n.isClusterEligible() {
430                 return nil
431         }
432         agent := n.getController().getAgent()
433         if agent == nil {
434                 return nil
435         }
436
437         // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
438         entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
439         for eid, value := range entries {
440                 var epRec EndpointRecord
441                 nid := n.ID()
442                 if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
443                         logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
444                         continue
445                 }
446                 i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
447                 eps[eid] = epRecord{
448                         ep:      epRec,
449                         lbIndex: i,
450                 }
451         }
452
453         // Walk through the driver's tables, have the driver decode the entries
454         // and return the tuple {ep ID, value}. value is a string that coveys
455         // relevant info about the endpoint.
456         d, err := n.driver(true)
457         if err != nil {
458                 logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
459                 return nil
460         }
461         for _, table := range n.driverTables {
462                 if table.objType != driverapi.EndpointObject {
463                         continue
464                 }
465                 entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
466                 for key, value := range entries {
467                         epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
468                         if ep, ok := eps[epID]; !ok {
469                                 logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
470                         } else {
471                                 ep.info = info
472                                 eps[epID] = ep
473                         }
474                 }
475         }
476
477         // group the endpoints into a map keyed by the service name
478         sinfo := make(map[string]ServiceInfo)
479         for ep, epr := range eps {
480                 var (
481                         s  ServiceInfo
482                         ok bool
483                 )
484                 if s, ok = sinfo[epr.ep.ServiceName]; !ok {
485                         s = ServiceInfo{
486                                 VIP:          epr.ep.VirtualIP,
487                                 LocalLBIndex: epr.lbIndex,
488                         }
489                 }
490                 ports := []string{}
491                 if s.Ports == nil {
492                         for _, port := range epr.ep.IngressPorts {
493                                 p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
494                                 ports = append(ports, p)
495                         }
496                         s.Ports = ports
497                 }
498                 s.Tasks = append(s.Tasks, Task{
499                         Name:       epr.ep.Name,
500                         EndpointID: ep,
501                         EndpointIP: epr.ep.EndpointIP,
502                         Info:       epr.info,
503                 })
504                 sinfo[epr.ep.ServiceName] = s
505         }
506         return sinfo
507 }
508
509 func (n *network) isClusterEligible() bool {
510         if n.scope != datastore.SwarmScope || !n.driverIsMultihost() {
511                 return false
512         }
513         return n.getController().getAgent() != nil
514 }
515
516 func (n *network) joinCluster() error {
517         if !n.isClusterEligible() {
518                 return nil
519         }
520
521         agent := n.getController().getAgent()
522         if agent == nil {
523                 return nil
524         }
525
526         return agent.networkDB.JoinNetwork(n.ID())
527 }
528
529 func (n *network) leaveCluster() error {
530         if !n.isClusterEligible() {
531                 return nil
532         }
533
534         agent := n.getController().getAgent()
535         if agent == nil {
536                 return nil
537         }
538
539         return agent.networkDB.LeaveNetwork(n.ID())
540 }
541
542 func (ep *endpoint) addDriverInfoToCluster() error {
543         n := ep.getNetwork()
544         if !n.isClusterEligible() {
545                 return nil
546         }
547         if ep.joinInfo == nil {
548                 return nil
549         }
550
551         agent := n.getController().getAgent()
552         if agent == nil {
553                 return nil
554         }
555
556         for _, te := range ep.joinInfo.driverTableEntries {
557                 if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
558                         return err
559                 }
560         }
561         return nil
562 }
563
564 func (ep *endpoint) deleteDriverInfoFromCluster() error {
565         n := ep.getNetwork()
566         if !n.isClusterEligible() {
567                 return nil
568         }
569         if ep.joinInfo == nil {
570                 return nil
571         }
572
573         agent := n.getController().getAgent()
574         if agent == nil {
575                 return nil
576         }
577
578         for _, te := range ep.joinInfo.driverTableEntries {
579                 if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
580                         return err
581                 }
582         }
583         return nil
584 }
585
586 func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
587         if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
588                 return nil
589         }
590
591         n := ep.getNetwork()
592         if !n.isClusterEligible() {
593                 return nil
594         }
595
596         sb.Service.Lock()
597         defer sb.Service.Unlock()
598         logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
599
600         // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
601         // This is to handle a race between the EnableService and the sbLeave
602         // It is possible that the EnableService starts, fetches the list of the endpoints and
603         // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
604         // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
605         // This check under the Service lock of the sandbox ensure the correct behavior.
606         // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
607         // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
608         // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
609         // removed from the list, in this situation the delete will bail out not finding any data to cleanup
610         // and the add will bail out not finding the endpoint on the sandbox.
611         if e := sb.getEndpoint(ep.ID()); e == nil {
612                 logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
613                 return nil
614         }
615
616         c := n.getController()
617         agent := c.getAgent()
618
619         name := ep.Name()
620         if ep.isAnonymous() {
621                 name = ep.MyAliases()[0]
622         }
623
624         var ingressPorts []*PortConfig
625         if ep.svcID != "" {
626                 // This is a task part of a service
627                 // Gossip ingress ports only in ingress network.
628                 if n.ingress {
629                         ingressPorts = ep.ingressPorts
630                 }
631                 if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
632                         return err
633                 }
634         } else {
635                 // This is a container simply attached to an attachable network
636                 if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
637                         return err
638                 }
639         }
640
641         buf, err := proto.Marshal(&EndpointRecord{
642                 Name:         name,
643                 ServiceName:  ep.svcName,
644                 ServiceID:    ep.svcID,
645                 VirtualIP:    ep.virtualIP.String(),
646                 IngressPorts: ingressPorts,
647                 Aliases:      ep.svcAliases,
648                 TaskAliases:  ep.myAliases,
649                 EndpointIP:   ep.Iface().Address().IP.String(),
650         })
651         if err != nil {
652                 return err
653         }
654
655         if agent != nil {
656                 if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
657                         logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
658                         return err
659                 }
660         }
661
662         logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
663
664         return nil
665 }
666
667 func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
668         if ep.isAnonymous() && len(ep.myAliases) == 0 {
669                 return nil
670         }
671
672         n := ep.getNetwork()
673         if !n.isClusterEligible() {
674                 return nil
675         }
676
677         sb.Service.Lock()
678         defer sb.Service.Unlock()
679         logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
680
681         c := n.getController()
682         agent := c.getAgent()
683
684         name := ep.Name()
685         if ep.isAnonymous() {
686                 name = ep.MyAliases()[0]
687         }
688
689         if agent != nil {
690                 // First delete from networkDB then locally
691                 if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
692                         logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
693                 }
694         }
695
696         if ep.Iface().Address() != nil {
697                 if ep.svcID != "" {
698                         // This is a task part of a service
699                         var ingressPorts []*PortConfig
700                         if n.ingress {
701                                 ingressPorts = ep.ingressPorts
702                         }
703                         if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
704                                 return err
705                         }
706                 } else {
707                         // This is a container simply attached to an attachable network
708                         if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
709                                 return err
710                         }
711                 }
712         }
713
714         logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
715
716         return nil
717 }
718
719 func (n *network) addDriverWatches() {
720         if !n.isClusterEligible() {
721                 return
722         }
723
724         c := n.getController()
725         agent := c.getAgent()
726         if agent == nil {
727                 return
728         }
729         for _, table := range n.driverTables {
730                 ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
731                 agent.Lock()
732                 agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
733                 agent.Unlock()
734                 go c.handleTableEvents(ch, n.handleDriverTableEvent)
735                 d, err := n.driver(false)
736                 if err != nil {
737                         logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
738                         return
739                 }
740
741                 agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
742                         if nid == n.ID() {
743                                 d.EventNotify(driverapi.Create, nid, table.name, key, value)
744                         }
745
746                         return false
747                 })
748         }
749 }
750
751 func (n *network) cancelDriverWatches() {
752         if !n.isClusterEligible() {
753                 return
754         }
755
756         agent := n.getController().getAgent()
757         if agent == nil {
758                 return
759         }
760
761         agent.Lock()
762         cancelFuncs := agent.driverCancelFuncs[n.ID()]
763         delete(agent.driverCancelFuncs, n.ID())
764         agent.Unlock()
765
766         for _, cancel := range cancelFuncs {
767                 cancel()
768         }
769 }
770
771 func (c *controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
772         for {
773                 select {
774                 case ev := <-ch.C:
775                         fn(ev)
776                 case <-ch.Done():
777                         return
778                 }
779         }
780 }
781
782 func (n *network) handleDriverTableEvent(ev events.Event) {
783         d, err := n.driver(false)
784         if err != nil {
785                 logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
786                 return
787         }
788
789         var (
790                 etype driverapi.EventType
791                 tname string
792                 key   string
793                 value []byte
794         )
795
796         switch event := ev.(type) {
797         case networkdb.CreateEvent:
798                 tname = event.Table
799                 key = event.Key
800                 value = event.Value
801                 etype = driverapi.Create
802         case networkdb.DeleteEvent:
803                 tname = event.Table
804                 key = event.Key
805                 value = event.Value
806                 etype = driverapi.Delete
807         case networkdb.UpdateEvent:
808                 tname = event.Table
809                 key = event.Key
810                 value = event.Value
811                 etype = driverapi.Delete
812         }
813
814         d.EventNotify(etype, n.ID(), tname, key, value)
815 }
816
817 func (c *controller) handleNodeTableEvent(ev events.Event) {
818         var (
819                 value    []byte
820                 isAdd    bool
821                 nodeAddr networkdb.NodeAddr
822         )
823         switch event := ev.(type) {
824         case networkdb.CreateEvent:
825                 value = event.Value
826                 isAdd = true
827         case networkdb.DeleteEvent:
828                 value = event.Value
829         case networkdb.UpdateEvent:
830                 logrus.Errorf("Unexpected update node table event = %#v", event)
831         }
832
833         err := json.Unmarshal(value, &nodeAddr)
834         if err != nil {
835                 logrus.Errorf("Error unmarshalling node table event %v", err)
836                 return
837         }
838         c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
839
840 }
841
842 func (c *controller) handleEpTableEvent(ev events.Event) {
843         var (
844                 nid   string
845                 eid   string
846                 value []byte
847                 isAdd bool
848                 epRec EndpointRecord
849         )
850
851         switch event := ev.(type) {
852         case networkdb.CreateEvent:
853                 nid = event.NetworkID
854                 eid = event.Key
855                 value = event.Value
856                 isAdd = true
857         case networkdb.DeleteEvent:
858                 nid = event.NetworkID
859                 eid = event.Key
860                 value = event.Value
861         case networkdb.UpdateEvent:
862                 logrus.Errorf("Unexpected update service table event = %#v", event)
863                 return
864         }
865
866         err := proto.Unmarshal(value, &epRec)
867         if err != nil {
868                 logrus.Errorf("Failed to unmarshal service table value: %v", err)
869                 return
870         }
871
872         containerName := epRec.Name
873         svcName := epRec.ServiceName
874         svcID := epRec.ServiceID
875         vip := net.ParseIP(epRec.VirtualIP)
876         ip := net.ParseIP(epRec.EndpointIP)
877         ingressPorts := epRec.IngressPorts
878         serviceAliases := epRec.Aliases
879         taskAliases := epRec.TaskAliases
880
881         if containerName == "" || ip == nil {
882                 logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
883                 return
884         }
885
886         if isAdd {
887                 logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
888                 if svcID != "" {
889                         // This is a remote task part of a service
890                         if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
891                                 logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
892                                 return
893                         }
894                 } else {
895                         // This is a remote container simply attached to an attachable network
896                         if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
897                                 logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
898                         }
899                 }
900         } else {
901                 logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
902                 if svcID != "" {
903                         // This is a remote task part of a service
904                         if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
905                                 logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
906                                 return
907                         }
908                 } else {
909                         // This is a remote container simply attached to an attachable network
910                         if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
911                                 logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
912                         }
913                 }
914         }
915 }