Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / docker / swarmkit / manager / state / raft / util.go
1 package raft
2
3 import (
4         "time"
5
6         "golang.org/x/net/context"
7
8         "github.com/docker/swarmkit/api"
9         "github.com/docker/swarmkit/manager/state"
10         "github.com/docker/swarmkit/manager/state/store"
11         grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
12         "google.golang.org/grpc"
13         "google.golang.org/grpc/credentials"
14 )
15
16 // dial returns a grpc client connection
17 func dial(addr string, protocol string, creds credentials.TransportCredentials, timeout time.Duration) (*grpc.ClientConn, error) {
18         grpcOptions := []grpc.DialOption{
19                 grpc.WithBackoffMaxDelay(2 * time.Second),
20                 grpc.WithTransportCredentials(creds),
21                 grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
22                 grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
23         }
24
25         if timeout != 0 {
26                 grpcOptions = append(grpcOptions, grpc.WithTimeout(timeout))
27         }
28
29         return grpc.Dial(addr, grpcOptions...)
30 }
31
32 // Register registers the node raft server
33 func Register(server *grpc.Server, node *Node) {
34         api.RegisterRaftServer(server, node)
35         api.RegisterRaftMembershipServer(server, node)
36 }
37
38 // WaitForLeader waits until node observe some leader in cluster. It returns
39 // error if ctx was cancelled before leader appeared.
40 func WaitForLeader(ctx context.Context, n *Node) error {
41         _, err := n.Leader()
42         if err == nil {
43                 return nil
44         }
45         ticker := time.NewTicker(50 * time.Millisecond)
46         defer ticker.Stop()
47         for err != nil {
48                 select {
49                 case <-ticker.C:
50                 case <-ctx.Done():
51                         return ctx.Err()
52                 }
53                 _, err = n.Leader()
54         }
55         return nil
56 }
57
58 // WaitForCluster waits until node observes that the cluster wide config is
59 // committed to raft. This ensures that we can see and serve informations
60 // related to the cluster.
61 func WaitForCluster(ctx context.Context, n *Node) (cluster *api.Cluster, err error) {
62         watch, cancel := state.Watch(n.MemoryStore().WatchQueue(), api.EventCreateCluster{})
63         defer cancel()
64
65         var clusters []*api.Cluster
66         n.MemoryStore().View(func(readTx store.ReadTx) {
67                 clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
68         })
69
70         if err != nil {
71                 return nil, err
72         }
73
74         if len(clusters) == 1 {
75                 cluster = clusters[0]
76         } else {
77                 select {
78                 case e := <-watch:
79                         cluster = e.(api.EventCreateCluster).Cluster
80                 case <-ctx.Done():
81                         return nil, ctx.Err()
82                 }
83         }
84
85         return cluster, nil
86 }