3 //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
13 "github.com/Sirupsen/logrus"
14 "github.com/docker/docker/pkg/stringid"
15 "github.com/docker/go-events"
16 "github.com/docker/libnetwork/cluster"
17 "github.com/docker/libnetwork/datastore"
18 "github.com/docker/libnetwork/discoverapi"
19 "github.com/docker/libnetwork/driverapi"
20 "github.com/docker/libnetwork/networkdb"
21 "github.com/docker/libnetwork/types"
22 "github.com/gogo/protobuf/proto"
26 subsysGossip = "networking:gossip"
27 subsysIPSec = "networking:ipsec"
31 // ByTime implements sort.Interface for []*types.EncryptionKey based on
32 // the LamportTime field.
33 type ByTime []*types.EncryptionKey
35 func (b ByTime) Len() int { return len(b) }
36 func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
37 func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
40 networkDB *networkdb.NetworkDB
44 coreCancelFuncs []func()
45 driverCancelFuncs map[string][]func()
49 func (a *agent) dataPathAddress() string {
52 if a.dataPathAddr != "" {
55 return a.advertiseAddr
58 const libnetworkEPTable = "endpoint_table"
60 func getBindAddr(ifaceName string) (string, error) {
61 iface, err := net.InterfaceByName(ifaceName)
63 return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
66 addrs, err := iface.Addrs()
68 return "", fmt.Errorf("failed to get interface addresses: %v", err)
71 for _, a := range addrs {
72 addr, ok := a.(*net.IPNet)
78 if addrIP.IsLinkLocalUnicast() {
82 return addrIP.String(), nil
85 return "", fmt.Errorf("failed to get bind address")
88 func resolveAddr(addrOrInterface string) (string, error) {
89 // Try and see if this is a valid IP address
90 if net.ParseIP(addrOrInterface) != nil {
91 return addrOrInterface, nil
94 addr, err := net.ResolveIPAddr("ip", addrOrInterface)
96 // If not a valid IP address, it should be a valid interface
97 return getBindAddr(addrOrInterface)
99 return addr.String(), nil
102 func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
103 drvEnc := discoverapi.DriverEncryptionUpdate{}
107 logrus.Debug("Skipping key change as agent is nil")
111 // Find the deleted key. If the deleted key was the primary key,
112 // a new primary key should be set before removing if from keyring.
119 for _, key := range keys {
120 if same = key.LamportTime == c.keys[i].LamportTime; same {
126 if cKey.Subsystem == subsysGossip {
130 if cKey.Subsystem == subsysIPSec {
131 drvEnc.Prune = cKey.Key
132 drvEnc.PruneTag = cKey.LamportTime
134 c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
142 // Find the new key and add it to the key ring
143 for _, key := range keys {
145 for _, cKey := range c.keys {
146 if same = cKey.LamportTime == key.LamportTime; same {
151 c.keys = append(c.keys, key)
152 if key.Subsystem == subsysGossip {
156 if key.Subsystem == subsysIPSec {
158 drvEnc.Tag = key.LamportTime
165 a.networkDB.SetKey(added)
168 key, _, err := c.getPrimaryKeyTag(subsysGossip)
172 a.networkDB.SetPrimaryKey(key)
174 key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
179 drvEnc.PrimaryTag = tag
181 if len(deleted) > 0 {
182 a.networkDB.RemoveKey(deleted)
185 c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
186 err := driver.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc)
188 logrus.Warnf("Failed to update datapath keys in driver %s: %v", name, err)
196 func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
197 agent := c.getAgent()
199 // If the agent is already present there is no need to try to initilize it again
204 bindAddr := clusterProvider.GetLocalAddress()
205 advAddr := clusterProvider.GetAdvertiseAddress()
206 dataAddr := clusterProvider.GetDataPathAddress()
207 remoteList := clusterProvider.GetRemoteAddressList()
208 remoteAddrList := make([]string, 0, len(remoteList))
209 for _, remote := range remoteList {
210 addr, _, _ := net.SplitHostPort(remote)
211 remoteAddrList = append(remoteAddrList, addr)
214 listen := clusterProvider.GetListenAddress()
215 listenAddr, _, _ := net.SplitHostPort(listen)
217 logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
218 listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
219 if advAddr != "" && agent == nil {
220 if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
221 logrus.Errorf("error in agentInit: %v", err)
224 c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
225 if capability.ConnectivityScope == datastore.GlobalScope {
226 c.agentDriverNotify(driver)
232 if len(remoteAddrList) > 0 {
233 if err := c.agentJoin(remoteAddrList); err != nil {
234 logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
241 // For a given subsystem getKeys sorts the keys by lamport time and returns
242 // slice of keys and lamport time which can used as a unique tag for the keys
243 func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
247 sort.Sort(ByTime(c.keys))
251 for _, key := range c.keys {
252 if key.Subsystem == subsys {
253 keys = append(keys, key.Key)
254 tags = append(tags, key.LamportTime)
258 keys[0], keys[1] = keys[1], keys[0]
259 tags[0], tags[1] = tags[1], tags[0]
263 // getPrimaryKeyTag returns the primary key for a given subsystem from the
264 // list of sorted key and the associated tag
265 func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
268 sort.Sort(ByTime(c.keys))
269 keys := []*types.EncryptionKey{}
270 for _, key := range c.keys {
271 if key.Subsystem == subsys {
272 keys = append(keys, key)
275 return keys[1].Key, keys[1].LamportTime, nil
278 func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
279 bindAddr, err := resolveAddr(bindAddrOrInterface)
284 keys, _ := c.getKeys(subsysGossip)
285 hostname, _ := os.Hostname()
286 nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
287 logrus.Info("Gossip cluster hostname ", nodeName)
289 nDB, err := networkdb.New(&networkdb.Config{
290 BindAddr: listenAddr,
291 AdvertiseAddr: advertiseAddr,
300 var cancelList []func()
301 ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
302 cancelList = append(cancelList, cancel)
303 nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
304 cancelList = append(cancelList, cancel)
310 advertiseAddr: advertiseAddr,
311 dataPathAddr: dataPathAddr,
312 coreCancelFuncs: cancelList,
313 driverCancelFuncs: make(map[string][]func()),
317 go c.handleTableEvents(ch, c.handleEpTableEvent)
318 go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
320 drvEnc := discoverapi.DriverEncryptionConfig{}
321 keys, tags := c.getKeys(subsysIPSec)
325 c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
326 err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
328 logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
333 c.WalkNetworks(joinCluster)
338 func (c *controller) agentJoin(remoteAddrList []string) error {
339 agent := c.getAgent()
343 return agent.networkDB.Join(remoteAddrList)
346 func (c *controller) agentDriverNotify(d driverapi.Driver) {
347 agent := c.getAgent()
352 if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
353 Address: agent.dataPathAddress(),
354 BindAddress: agent.bindAddr,
357 logrus.Warnf("Failed the node discovery in driver: %v", err)
360 drvEnc := discoverapi.DriverEncryptionConfig{}
361 keys, tags := c.getKeys(subsysIPSec)
365 if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
366 logrus.Warnf("Failed to set datapath keys in driver: %v", err)
370 func (c *controller) agentClose() {
371 // Acquire current agent instance and reset its pointer
372 // then run closing functions
382 var cancelList []func()
385 for _, cancelFuncs := range agent.driverCancelFuncs {
386 for _, cancel := range cancelFuncs {
387 cancelList = append(cancelList, cancel)
391 // Add also the cancel functions for the network db
392 for _, cancel := range agent.coreCancelFuncs {
393 cancelList = append(cancelList, cancel)
397 for _, cancel := range cancelList {
401 agent.networkDB.Close()
404 // Task has the backend container details
409 Info map[string]string
412 // ServiceInfo has service specific details along with the list of backend tasks
413 type ServiceInfo struct {
420 type epRecord struct {
422 info map[string]string
426 func (n *network) Services() map[string]ServiceInfo {
427 eps := make(map[string]epRecord)
429 if !n.isClusterEligible() {
432 agent := n.getController().getAgent()
437 // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
438 entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
439 for eid, value := range entries {
440 var epRec EndpointRecord
442 if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
443 logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
446 i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
453 // Walk through the driver's tables, have the driver decode the entries
454 // and return the tuple {ep ID, value}. value is a string that coveys
455 // relevant info about the endpoint.
456 d, err := n.driver(true)
458 logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
461 for _, table := range n.driverTables {
462 if table.objType != driverapi.EndpointObject {
465 entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
466 for key, value := range entries {
467 epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
468 if ep, ok := eps[epID]; !ok {
469 logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
477 // group the endpoints into a map keyed by the service name
478 sinfo := make(map[string]ServiceInfo)
479 for ep, epr := range eps {
484 if s, ok = sinfo[epr.ep.ServiceName]; !ok {
486 VIP: epr.ep.VirtualIP,
487 LocalLBIndex: epr.lbIndex,
492 for _, port := range epr.ep.IngressPorts {
493 p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
494 ports = append(ports, p)
498 s.Tasks = append(s.Tasks, Task{
501 EndpointIP: epr.ep.EndpointIP,
504 sinfo[epr.ep.ServiceName] = s
509 func (n *network) isClusterEligible() bool {
510 if n.scope != datastore.SwarmScope || !n.driverIsMultihost() {
513 return n.getController().getAgent() != nil
516 func (n *network) joinCluster() error {
517 if !n.isClusterEligible() {
521 agent := n.getController().getAgent()
526 return agent.networkDB.JoinNetwork(n.ID())
529 func (n *network) leaveCluster() error {
530 if !n.isClusterEligible() {
534 agent := n.getController().getAgent()
539 return agent.networkDB.LeaveNetwork(n.ID())
542 func (ep *endpoint) addDriverInfoToCluster() error {
544 if !n.isClusterEligible() {
547 if ep.joinInfo == nil {
551 agent := n.getController().getAgent()
556 for _, te := range ep.joinInfo.driverTableEntries {
557 if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
564 func (ep *endpoint) deleteDriverInfoFromCluster() error {
566 if !n.isClusterEligible() {
569 if ep.joinInfo == nil {
573 agent := n.getController().getAgent()
578 for _, te := range ep.joinInfo.driverTableEntries {
579 if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
586 func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
587 if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
592 if !n.isClusterEligible() {
597 defer sb.Service.Unlock()
598 logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
600 // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
601 // This is to handle a race between the EnableService and the sbLeave
602 // It is possible that the EnableService starts, fetches the list of the endpoints and
603 // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
604 // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
605 // This check under the Service lock of the sandbox ensure the correct behavior.
606 // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
607 // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
608 // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
609 // removed from the list, in this situation the delete will bail out not finding any data to cleanup
610 // and the add will bail out not finding the endpoint on the sandbox.
611 if e := sb.getEndpoint(ep.ID()); e == nil {
612 logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
616 c := n.getController()
617 agent := c.getAgent()
620 if ep.isAnonymous() {
621 name = ep.MyAliases()[0]
624 var ingressPorts []*PortConfig
626 // This is a task part of a service
627 // Gossip ingress ports only in ingress network.
629 ingressPorts = ep.ingressPorts
631 if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
635 // This is a container simply attached to an attachable network
636 if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
641 buf, err := proto.Marshal(&EndpointRecord{
643 ServiceName: ep.svcName,
645 VirtualIP: ep.virtualIP.String(),
646 IngressPorts: ingressPorts,
647 Aliases: ep.svcAliases,
648 TaskAliases: ep.myAliases,
649 EndpointIP: ep.Iface().Address().IP.String(),
656 if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
657 logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
662 logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
667 func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
668 if ep.isAnonymous() && len(ep.myAliases) == 0 {
673 if !n.isClusterEligible() {
678 defer sb.Service.Unlock()
679 logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
681 c := n.getController()
682 agent := c.getAgent()
685 if ep.isAnonymous() {
686 name = ep.MyAliases()[0]
690 // First delete from networkDB then locally
691 if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
692 logrus.Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
696 if ep.Iface().Address() != nil {
698 // This is a task part of a service
699 var ingressPorts []*PortConfig
701 ingressPorts = ep.ingressPorts
703 if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
707 // This is a container simply attached to an attachable network
708 if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
714 logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
719 func (n *network) addDriverWatches() {
720 if !n.isClusterEligible() {
724 c := n.getController()
725 agent := c.getAgent()
729 for _, table := range n.driverTables {
730 ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
732 agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
734 go c.handleTableEvents(ch, n.handleDriverTableEvent)
735 d, err := n.driver(false)
737 logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
741 agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
743 d.EventNotify(driverapi.Create, nid, table.name, key, value)
751 func (n *network) cancelDriverWatches() {
752 if !n.isClusterEligible() {
756 agent := n.getController().getAgent()
762 cancelFuncs := agent.driverCancelFuncs[n.ID()]
763 delete(agent.driverCancelFuncs, n.ID())
766 for _, cancel := range cancelFuncs {
771 func (c *controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
782 func (n *network) handleDriverTableEvent(ev events.Event) {
783 d, err := n.driver(false)
785 logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
790 etype driverapi.EventType
796 switch event := ev.(type) {
797 case networkdb.CreateEvent:
801 etype = driverapi.Create
802 case networkdb.DeleteEvent:
806 etype = driverapi.Delete
807 case networkdb.UpdateEvent:
811 etype = driverapi.Delete
814 d.EventNotify(etype, n.ID(), tname, key, value)
817 func (c *controller) handleNodeTableEvent(ev events.Event) {
821 nodeAddr networkdb.NodeAddr
823 switch event := ev.(type) {
824 case networkdb.CreateEvent:
827 case networkdb.DeleteEvent:
829 case networkdb.UpdateEvent:
830 logrus.Errorf("Unexpected update node table event = %#v", event)
833 err := json.Unmarshal(value, &nodeAddr)
835 logrus.Errorf("Error unmarshalling node table event %v", err)
838 c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
842 func (c *controller) handleEpTableEvent(ev events.Event) {
851 switch event := ev.(type) {
852 case networkdb.CreateEvent:
853 nid = event.NetworkID
857 case networkdb.DeleteEvent:
858 nid = event.NetworkID
861 case networkdb.UpdateEvent:
862 logrus.Errorf("Unexpected update service table event = %#v", event)
866 err := proto.Unmarshal(value, &epRec)
868 logrus.Errorf("Failed to unmarshal service table value: %v", err)
872 containerName := epRec.Name
873 svcName := epRec.ServiceName
874 svcID := epRec.ServiceID
875 vip := net.ParseIP(epRec.VirtualIP)
876 ip := net.ParseIP(epRec.EndpointIP)
877 ingressPorts := epRec.IngressPorts
878 serviceAliases := epRec.Aliases
879 taskAliases := epRec.TaskAliases
881 if containerName == "" || ip == nil {
882 logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
887 logrus.Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
889 // This is a remote task part of a service
890 if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
891 logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
895 // This is a remote container simply attached to an attachable network
896 if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
897 logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
901 logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
903 // This is a remote task part of a service
904 if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
905 logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
909 // This is a remote container simply attached to an attachable network
910 if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
911 logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)