7 "github.com/Sirupsen/logrus"
9 mapset "github.com/deckarep/golang-set"
10 "github.com/docker/docker/pkg/discovery"
12 _ "github.com/docker/docker/pkg/discovery/kv"
13 "github.com/docker/libnetwork/types"
16 type hostDiscovery struct {
17 watcher discovery.Watcher
19 stopChan chan struct{}
23 // NewHostDiscovery function creates a host discovery object
24 func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
25 return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
28 func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
33 return types.BadRequestErrorf("invalid discovery watcher")
35 discoveryCh, errCh := d.Watch(h.stopChan)
36 go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
40 func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
41 activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
45 h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
48 logrus.Errorf("discovery error: %v", err)
56 func (h *hostDiscovery) StopDiscovery() error {
58 stopChan := h.stopChan
66 func (h *hostDiscovery) processCallback(entries discovery.Entries,
67 activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
68 updated := hosts(entries)
71 added, removed := diff(existing, updated)
80 leaveCallback(removed)
84 func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
85 addSlice := updated.Difference(existing).ToSlice()
86 removeSlice := existing.Difference(updated).ToSlice()
87 for _, ip := range addSlice {
88 added = append(added, net.ParseIP(ip.(string)))
90 for _, ip := range removeSlice {
91 removed = append(removed, net.ParseIP(ip.(string)))
96 func (h *hostDiscovery) Fetch() []net.IP {
100 for _, ipstr := range h.nodes.ToSlice() {
101 ips = append(ips, net.ParseIP(ipstr.(string)))
106 func hosts(entries discovery.Entries) mapset.Set {
107 hosts := mapset.NewSet()
108 for _, entry := range entries {
109 hosts.Add(entry.Host)