8 "github.com/pkg/errors"
9 "golang.org/x/net/context"
10 "google.golang.org/grpc"
13 // Caller can invoke requests on the session
14 type Caller interface {
15 Context() context.Context
16 Supports(method string) bool
17 Conn() *grpc.ClientConn
25 supported map[string]struct{}
28 // Manager is a controller for accessing currently active sessions
30 sessions map[string]*client
32 updateCondition *sync.Cond
35 // NewManager returns a new Manager
36 func NewManager() (*Manager, error) {
38 sessions: make(map[string]*client),
40 sm.updateCondition = sync.NewCond(&sm.mu)
44 // HandleHTTPRequest handles an incoming HTTP request
45 func (sm *Manager) HandleHTTPRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
46 hijacker, ok := w.(http.Hijacker)
48 return errors.New("handler does not support hijack")
51 uuid := r.Header.Get(headerSessionUUID)
52 name := r.Header.Get(headerSessionName)
53 sharedKey := r.Header.Get(headerSessionSharedKey)
55 proto := r.Header.Get("Upgrade")
58 if _, ok := sm.sessions[uuid]; ok {
60 return errors.Errorf("session %s already exists", uuid)
65 return errors.New("no upgrade proto in request")
70 return errors.Errorf("protocol %s not supported", proto)
73 conn, _, err := hijacker.Hijack()
76 return errors.Wrap(err, "failed to hijack connection")
79 resp := &http.Response{
80 StatusCode: http.StatusSwitchingProtocols,
83 Header: http.Header{},
85 resp.Header.Set("Connection", "Upgrade")
86 resp.Header.Set("Upgrade", proto)
92 ctx, cancel := context.WithCancel(ctx)
95 ctx, cc, err := grpcClientConn(ctx, conn)
105 sharedKey: sharedKey,
108 done: make(chan struct{}),
111 supported: make(map[string]struct{}),
114 for _, m := range r.Header[headerSessionMethod] {
115 c.supported[strings.ToLower(m)] = struct{}{}
117 sm.sessions[uuid] = c
118 sm.updateCondition.Broadcast()
123 delete(sm.sessions, uuid)
134 // Get returns a session by UUID
135 func (sm *Manager) Get(ctx context.Context, uuid string) (Caller, error) {
136 ctx, cancel := context.WithCancel(ctx)
142 sm.updateCondition.Broadcast()
153 return nil, errors.Wrapf(ctx.Err(), "no active session for %s", uuid)
157 c, ok = sm.sessions[uuid]
158 if !ok || c.closed() {
159 sm.updateCondition.Wait()
169 func (c *client) Context() context.Context {
173 func (c *client) Name() string {
177 func (c *client) SharedKey() string {
181 func (c *client) Supports(url string) bool {
182 _, ok := c.supported[strings.ToLower(url)]
185 func (c *client) Conn() *grpc.ClientConn {