6 "golang.org/x/net/context"
8 "github.com/docker/swarmkit/api"
9 "github.com/docker/swarmkit/manager/state"
10 "github.com/docker/swarmkit/manager/state/store"
11 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
12 "google.golang.org/grpc"
13 "google.golang.org/grpc/credentials"
16 // dial returns a grpc client connection
17 func dial(addr string, protocol string, creds credentials.TransportCredentials, timeout time.Duration) (*grpc.ClientConn, error) {
18 grpcOptions := []grpc.DialOption{
19 grpc.WithBackoffMaxDelay(2 * time.Second),
20 grpc.WithTransportCredentials(creds),
21 grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
22 grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
26 grpcOptions = append(grpcOptions, grpc.WithTimeout(timeout))
29 return grpc.Dial(addr, grpcOptions...)
32 // Register registers the node raft server
33 func Register(server *grpc.Server, node *Node) {
34 api.RegisterRaftServer(server, node)
35 api.RegisterRaftMembershipServer(server, node)
38 // WaitForLeader waits until node observe some leader in cluster. It returns
39 // error if ctx was cancelled before leader appeared.
40 func WaitForLeader(ctx context.Context, n *Node) error {
45 ticker := time.NewTicker(50 * time.Millisecond)
58 // WaitForCluster waits until node observes that the cluster wide config is
59 // committed to raft. This ensures that we can see and serve informations
60 // related to the cluster.
61 func WaitForCluster(ctx context.Context, n *Node) (cluster *api.Cluster, err error) {
62 watch, cancel := state.Watch(n.MemoryStore().WatchQueue(), api.EventCreateCluster{})
65 var clusters []*api.Cluster
66 n.MemoryStore().View(func(readTx store.ReadTx) {
67 clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
74 if len(clusters) == 1 {
79 cluster = e.(api.EventCreateCluster).Cluster