Tizen_4.0 base
[platform/upstream/docker-engine.git] / client / session / manager.go
1 package session
2
3 import (
4         "net/http"
5         "strings"
6         "sync"
7
8         "github.com/pkg/errors"
9         "golang.org/x/net/context"
10         "google.golang.org/grpc"
11 )
12
13 // Caller can invoke requests on the session
14 type Caller interface {
15         Context() context.Context
16         Supports(method string) bool
17         Conn() *grpc.ClientConn
18         Name() string
19         SharedKey() string
20 }
21
22 type client struct {
23         Session
24         cc        *grpc.ClientConn
25         supported map[string]struct{}
26 }
27
28 // Manager is a controller for accessing currently active sessions
29 type Manager struct {
30         sessions        map[string]*client
31         mu              sync.Mutex
32         updateCondition *sync.Cond
33 }
34
35 // NewManager returns a new Manager
36 func NewManager() (*Manager, error) {
37         sm := &Manager{
38                 sessions: make(map[string]*client),
39         }
40         sm.updateCondition = sync.NewCond(&sm.mu)
41         return sm, nil
42 }
43
44 // HandleHTTPRequest handles an incoming HTTP request
45 func (sm *Manager) HandleHTTPRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
46         hijacker, ok := w.(http.Hijacker)
47         if !ok {
48                 return errors.New("handler does not support hijack")
49         }
50
51         uuid := r.Header.Get(headerSessionUUID)
52         name := r.Header.Get(headerSessionName)
53         sharedKey := r.Header.Get(headerSessionSharedKey)
54
55         proto := r.Header.Get("Upgrade")
56
57         sm.mu.Lock()
58         if _, ok := sm.sessions[uuid]; ok {
59                 sm.mu.Unlock()
60                 return errors.Errorf("session %s already exists", uuid)
61         }
62
63         if proto == "" {
64                 sm.mu.Unlock()
65                 return errors.New("no upgrade proto in request")
66         }
67
68         if proto != "h2c" {
69                 sm.mu.Unlock()
70                 return errors.Errorf("protocol %s not supported", proto)
71         }
72
73         conn, _, err := hijacker.Hijack()
74         if err != nil {
75                 sm.mu.Unlock()
76                 return errors.Wrap(err, "failed to hijack connection")
77         }
78
79         resp := &http.Response{
80                 StatusCode: http.StatusSwitchingProtocols,
81                 ProtoMajor: 1,
82                 ProtoMinor: 1,
83                 Header:     http.Header{},
84         }
85         resp.Header.Set("Connection", "Upgrade")
86         resp.Header.Set("Upgrade", proto)
87
88         // set raw mode
89         conn.Write([]byte{})
90         resp.Write(conn)
91
92         ctx, cancel := context.WithCancel(ctx)
93         defer cancel()
94
95         ctx, cc, err := grpcClientConn(ctx, conn)
96         if err != nil {
97                 sm.mu.Unlock()
98                 return err
99         }
100
101         c := &client{
102                 Session: Session{
103                         uuid:      uuid,
104                         name:      name,
105                         sharedKey: sharedKey,
106                         ctx:       ctx,
107                         cancelCtx: cancel,
108                         done:      make(chan struct{}),
109                 },
110                 cc:        cc,
111                 supported: make(map[string]struct{}),
112         }
113
114         for _, m := range r.Header[headerSessionMethod] {
115                 c.supported[strings.ToLower(m)] = struct{}{}
116         }
117         sm.sessions[uuid] = c
118         sm.updateCondition.Broadcast()
119         sm.mu.Unlock()
120
121         defer func() {
122                 sm.mu.Lock()
123                 delete(sm.sessions, uuid)
124                 sm.mu.Unlock()
125         }()
126
127         <-c.ctx.Done()
128         conn.Close()
129         close(c.done)
130
131         return nil
132 }
133
134 // Get returns a session by UUID
135 func (sm *Manager) Get(ctx context.Context, uuid string) (Caller, error) {
136         ctx, cancel := context.WithCancel(ctx)
137         defer cancel()
138
139         go func() {
140                 select {
141                 case <-ctx.Done():
142                         sm.updateCondition.Broadcast()
143                 }
144         }()
145
146         var c *client
147
148         sm.mu.Lock()
149         for {
150                 select {
151                 case <-ctx.Done():
152                         sm.mu.Unlock()
153                         return nil, errors.Wrapf(ctx.Err(), "no active session for %s", uuid)
154                 default:
155                 }
156                 var ok bool
157                 c, ok = sm.sessions[uuid]
158                 if !ok || c.closed() {
159                         sm.updateCondition.Wait()
160                         continue
161                 }
162                 sm.mu.Unlock()
163                 break
164         }
165
166         return c, nil
167 }
168
169 func (c *client) Context() context.Context {
170         return c.context()
171 }
172
173 func (c *client) Name() string {
174         return c.name
175 }
176
177 func (c *client) SharedKey() string {
178         return c.sharedKey
179 }
180
181 func (c *client) Supports(url string) bool {
182         _, ok := c.supported[strings.ToLower(url)]
183         return ok
184 }
185 func (c *client) Conn() *grpc.ClientConn {
186         return c.cc
187 }