1 // Package connectionbroker is a layer on top of remotes that returns
2 // a gRPC connection to a manager. The connection may be a local connection
3 // using a local socket such as a UNIX socket.
4 package connectionbroker
9 "github.com/docker/swarmkit/api"
10 "github.com/docker/swarmkit/remotes"
11 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
12 "google.golang.org/grpc"
15 // Broker is a simple connection broker. It can either return a fresh
16 // connection to a remote manager selected with weighted randomization, or a
17 // local gRPC connection to the local manager.
20 remotes remotes.Remotes
21 localConn *grpc.ClientConn
24 // New creates a new connection broker.
25 func New(remotes remotes.Remotes) *Broker {
31 // SetLocalConn changes the local gRPC connection used by the connection broker.
32 func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
36 b.localConn = localConn
39 // Select a manager from the set of available managers, and return a connection.
40 func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
42 localConn := b.localConn
47 ClientConn: localConn,
52 return b.SelectRemote(dialOpts...)
55 // SelectRemote chooses a manager from the remotes, and returns a TCP
57 func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
58 peer, err := b.remotes.Select()
63 dialOpts = append(dialOpts,
64 grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
65 grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor))
67 cc, err := grpc.Dial(peer.Addr, dialOpts...)
69 b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
80 // Remotes returns the remotes interface used by the broker, so the caller
81 // can make observations or see weights directly.
82 func (b *Broker) Remotes() remotes.Remotes {
86 // Conn is a wrapper around a gRPC client connection.
90 remotes remotes.Remotes
94 // Close closes the client connection if it is a remote connection. It also
95 // records a positive experience with the remote peer if success is true,
96 // otherwise it records a negative experience. If a local connection is in use,
98 func (c *Conn) Close(success bool) error {
104 c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
106 c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
109 return c.ClientConn.Close()