Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / libnetwork / hostdiscovery / hostdiscovery.go
1 package hostdiscovery
2
3 import (
4         "net"
5         "sync"
6
7         "github.com/Sirupsen/logrus"
8
9         mapset "github.com/deckarep/golang-set"
10         "github.com/docker/docker/pkg/discovery"
11         // Including KV
12         _ "github.com/docker/docker/pkg/discovery/kv"
13         "github.com/docker/libnetwork/types"
14 )
15
16 type hostDiscovery struct {
17         watcher  discovery.Watcher
18         nodes    mapset.Set
19         stopChan chan struct{}
20         sync.Mutex
21 }
22
23 // NewHostDiscovery function creates a host discovery object
24 func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
25         return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
26 }
27
28 func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
29         h.Lock()
30         d := h.watcher
31         h.Unlock()
32         if d == nil {
33                 return types.BadRequestErrorf("invalid discovery watcher")
34         }
35         discoveryCh, errCh := d.Watch(h.stopChan)
36         go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
37         return nil
38 }
39
40 func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
41         activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
42         for {
43                 select {
44                 case entries := <-ch:
45                         h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
46                 case err := <-errCh:
47                         if err != nil {
48                                 logrus.Errorf("discovery error: %v", err)
49                         }
50                 case <-h.stopChan:
51                         return
52                 }
53         }
54 }
55
56 func (h *hostDiscovery) StopDiscovery() error {
57         h.Lock()
58         stopChan := h.stopChan
59         h.watcher = nil
60         h.Unlock()
61
62         close(stopChan)
63         return nil
64 }
65
66 func (h *hostDiscovery) processCallback(entries discovery.Entries,
67         activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
68         updated := hosts(entries)
69         h.Lock()
70         existing := h.nodes
71         added, removed := diff(existing, updated)
72         h.nodes = updated
73         h.Unlock()
74
75         activeCallback()
76         if len(added) > 0 {
77                 joinCallback(added)
78         }
79         if len(removed) > 0 {
80                 leaveCallback(removed)
81         }
82 }
83
84 func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
85         addSlice := updated.Difference(existing).ToSlice()
86         removeSlice := existing.Difference(updated).ToSlice()
87         for _, ip := range addSlice {
88                 added = append(added, net.ParseIP(ip.(string)))
89         }
90         for _, ip := range removeSlice {
91                 removed = append(removed, net.ParseIP(ip.(string)))
92         }
93         return
94 }
95
96 func (h *hostDiscovery) Fetch() []net.IP {
97         h.Lock()
98         defer h.Unlock()
99         ips := []net.IP{}
100         for _, ipstr := range h.nodes.ToSlice() {
101                 ips = append(ips, net.ParseIP(ipstr.(string)))
102         }
103         return ips
104 }
105
106 func hosts(entries discovery.Entries) mapset.Set {
107         hosts := mapset.NewSet()
108         for _, entry := range entries {
109                 hosts.Add(entry.Host)
110         }
111         return hosts
112 }