Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / google.golang.org / grpc / transport / handler_server.go
1 /*
2  * Copyright 2016, Google Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  * copyright notice, this list of conditions and the following disclaimer
13  * in the documentation and/or other materials provided with the
14  * distribution.
15  *     * Neither the name of Google Inc. nor the names of its
16  * contributors may be used to endorse or promote products derived from
17  * this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30  *
31  */
32
33 // This file is the implementation of a gRPC server using HTTP/2 which
34 // uses the standard Go http2 Server implementation (via the
35 // http.Handler interface), rather than speaking low-level HTTP/2
36 // frames itself. It is the implementation of *grpc.Server.ServeHTTP.
37
38 package transport
39
40 import (
41         "errors"
42         "fmt"
43         "io"
44         "net"
45         "net/http"
46         "strings"
47         "sync"
48         "time"
49
50         "golang.org/x/net/context"
51         "golang.org/x/net/http2"
52         "google.golang.org/grpc/codes"
53         "google.golang.org/grpc/credentials"
54         "google.golang.org/grpc/metadata"
55         "google.golang.org/grpc/peer"
56         "google.golang.org/grpc/status"
57 )
58
59 // NewServerHandlerTransport returns a ServerTransport handling gRPC
60 // from inside an http.Handler. It requires that the http Server
61 // supports HTTP/2.
62 func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) {
63         if r.ProtoMajor != 2 {
64                 return nil, errors.New("gRPC requires HTTP/2")
65         }
66         if r.Method != "POST" {
67                 return nil, errors.New("invalid gRPC request method")
68         }
69         if !validContentType(r.Header.Get("Content-Type")) {
70                 return nil, errors.New("invalid gRPC request content-type")
71         }
72         if _, ok := w.(http.Flusher); !ok {
73                 return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
74         }
75         if _, ok := w.(http.CloseNotifier); !ok {
76                 return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier")
77         }
78
79         st := &serverHandlerTransport{
80                 rw:       w,
81                 req:      r,
82                 closedCh: make(chan struct{}),
83                 writes:   make(chan func()),
84         }
85
86         if v := r.Header.Get("grpc-timeout"); v != "" {
87                 to, err := decodeTimeout(v)
88                 if err != nil {
89                         return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
90                 }
91                 st.timeoutSet = true
92                 st.timeout = to
93         }
94
95         var metakv []string
96         if r.Host != "" {
97                 metakv = append(metakv, ":authority", r.Host)
98         }
99         for k, vv := range r.Header {
100                 k = strings.ToLower(k)
101                 if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) {
102                         continue
103                 }
104                 for _, v := range vv {
105                         if k == "user-agent" {
106                                 // user-agent is special. Copying logic of http_util.go.
107                                 if i := strings.LastIndex(v, " "); i == -1 {
108                                         // There is no application user agent string being set
109                                         continue
110                                 } else {
111                                         v = v[:i]
112                                 }
113                         }
114                         v, err := decodeMetadataHeader(k, v)
115                         if err != nil {
116                                 return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err)
117                         }
118                         metakv = append(metakv, k, v)
119                 }
120         }
121         st.headerMD = metadata.Pairs(metakv...)
122
123         return st, nil
124 }
125
126 // serverHandlerTransport is an implementation of ServerTransport
127 // which replies to exactly one gRPC request (exactly one HTTP request),
128 // using the net/http.Handler interface. This http.Handler is guaranteed
129 // at this point to be speaking over HTTP/2, so it's able to speak valid
130 // gRPC.
131 type serverHandlerTransport struct {
132         rw               http.ResponseWriter
133         req              *http.Request
134         timeoutSet       bool
135         timeout          time.Duration
136         didCommonHeaders bool
137
138         headerMD metadata.MD
139
140         closeOnce sync.Once
141         closedCh  chan struct{} // closed on Close
142
143         // writes is a channel of code to run serialized in the
144         // ServeHTTP (HandleStreams) goroutine. The channel is closed
145         // when WriteStatus is called.
146         writes chan func()
147 }
148
149 func (ht *serverHandlerTransport) Close() error {
150         ht.closeOnce.Do(ht.closeCloseChanOnce)
151         return nil
152 }
153
154 func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
155
156 func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
157
158 // strAddr is a net.Addr backed by either a TCP "ip:port" string, or
159 // the empty string if unknown.
160 type strAddr string
161
162 func (a strAddr) Network() string {
163         if a != "" {
164                 // Per the documentation on net/http.Request.RemoteAddr, if this is
165                 // set, it's set to the IP:port of the peer (hence, TCP):
166                 // https://golang.org/pkg/net/http/#Request
167                 //
168                 // If we want to support Unix sockets later, we can
169                 // add our own grpc-specific convention within the
170                 // grpc codebase to set RemoteAddr to a different
171                 // format, or probably better: we can attach it to the
172                 // context and use that from serverHandlerTransport.RemoteAddr.
173                 return "tcp"
174         }
175         return ""
176 }
177
178 func (a strAddr) String() string { return string(a) }
179
180 // do runs fn in the ServeHTTP goroutine.
181 func (ht *serverHandlerTransport) do(fn func()) error {
182         select {
183         case ht.writes <- fn:
184                 return nil
185         case <-ht.closedCh:
186                 return ErrConnClosing
187         }
188 }
189
190 func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
191         err := ht.do(func() {
192                 ht.writeCommonHeaders(s)
193
194                 // And flush, in case no header or body has been sent yet.
195                 // This forces a separation of headers and trailers if this is the
196                 // first call (for example, in end2end tests's TestNoService).
197                 ht.rw.(http.Flusher).Flush()
198
199                 h := ht.rw.Header()
200                 h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
201                 if m := st.Message(); m != "" {
202                         h.Set("Grpc-Message", encodeGrpcMessage(m))
203                 }
204
205                 // TODO: Support Grpc-Status-Details-Bin
206
207                 if md := s.Trailer(); len(md) > 0 {
208                         for k, vv := range md {
209                                 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
210                                 if isReservedHeader(k) {
211                                         continue
212                                 }
213                                 for _, v := range vv {
214                                         // http2 ResponseWriter mechanism to send undeclared Trailers after
215                                         // the headers have possibly been written.
216                                         h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
217                                 }
218                         }
219                 }
220         })
221         close(ht.writes)
222         return err
223 }
224
225 // writeCommonHeaders sets common headers on the first write
226 // call (Write, WriteHeader, or WriteStatus).
227 func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
228         if ht.didCommonHeaders {
229                 return
230         }
231         ht.didCommonHeaders = true
232
233         h := ht.rw.Header()
234         h["Date"] = nil // suppress Date to make tests happy; TODO: restore
235         h.Set("Content-Type", "application/grpc")
236
237         // Predeclare trailers we'll set later in WriteStatus (after the body).
238         // This is a SHOULD in the HTTP RFC, and the way you add (known)
239         // Trailers per the net/http.ResponseWriter contract.
240         // See https://golang.org/pkg/net/http/#ResponseWriter
241         // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
242         h.Add("Trailer", "Grpc-Status")
243         h.Add("Trailer", "Grpc-Message")
244         // TODO: Support Grpc-Status-Details-Bin
245
246         if s.sendCompress != "" {
247                 h.Set("Grpc-Encoding", s.sendCompress)
248         }
249 }
250
251 func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error {
252         return ht.do(func() {
253                 ht.writeCommonHeaders(s)
254                 ht.rw.Write(data)
255                 if !opts.Delay {
256                         ht.rw.(http.Flusher).Flush()
257                 }
258         })
259 }
260
261 func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
262         return ht.do(func() {
263                 ht.writeCommonHeaders(s)
264                 h := ht.rw.Header()
265                 for k, vv := range md {
266                         // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
267                         if isReservedHeader(k) {
268                                 continue
269                         }
270                         for _, v := range vv {
271                                 v = encodeMetadataHeader(k, v)
272                                 h.Add(k, v)
273                         }
274                 }
275                 ht.rw.WriteHeader(200)
276                 ht.rw.(http.Flusher).Flush()
277         })
278 }
279
280 func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
281         // With this transport type there will be exactly 1 stream: this HTTP request.
282
283         var ctx context.Context
284         var cancel context.CancelFunc
285         if ht.timeoutSet {
286                 ctx, cancel = context.WithTimeout(context.Background(), ht.timeout)
287         } else {
288                 ctx, cancel = context.WithCancel(context.Background())
289         }
290
291         // requestOver is closed when either the request's context is done
292         // or the status has been written via WriteStatus.
293         requestOver := make(chan struct{})
294
295         // clientGone receives a single value if peer is gone, either
296         // because the underlying connection is dead or because the
297         // peer sends an http2 RST_STREAM.
298         clientGone := ht.rw.(http.CloseNotifier).CloseNotify()
299         go func() {
300                 select {
301                 case <-requestOver:
302                         return
303                 case <-ht.closedCh:
304                 case <-clientGone:
305                 }
306                 cancel()
307         }()
308
309         req := ht.req
310
311         s := &Stream{
312                 id:            0,            // irrelevant
313                 windowHandler: func(int) {}, // nothing
314                 cancel:        cancel,
315                 buf:           newRecvBuffer(),
316                 st:            ht,
317                 method:        req.URL.Path,
318                 recvCompress:  req.Header.Get("grpc-encoding"),
319         }
320         pr := &peer.Peer{
321                 Addr: ht.RemoteAddr(),
322         }
323         if req.TLS != nil {
324                 pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
325         }
326         ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
327         ctx = peer.NewContext(ctx, pr)
328         s.ctx = newContextWithStream(ctx, s)
329         s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
330
331         // readerDone is closed when the Body.Read-ing goroutine exits.
332         readerDone := make(chan struct{})
333         go func() {
334                 defer close(readerDone)
335
336                 // TODO: minimize garbage, optimize recvBuffer code/ownership
337                 const readSize = 8196
338                 for buf := make([]byte, readSize); ; {
339                         n, err := req.Body.Read(buf)
340                         if n > 0 {
341                                 s.buf.put(&recvMsg{data: buf[:n:n]})
342                                 buf = buf[n:]
343                         }
344                         if err != nil {
345                                 s.buf.put(&recvMsg{err: mapRecvMsgError(err)})
346                                 return
347                         }
348                         if len(buf) == 0 {
349                                 buf = make([]byte, readSize)
350                         }
351                 }
352         }()
353
354         // startStream is provided by the *grpc.Server's serveStreams.
355         // It starts a goroutine serving s and exits immediately.
356         // The goroutine that is started is the one that then calls
357         // into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
358         startStream(s)
359
360         ht.runStream()
361         close(requestOver)
362
363         // Wait for reading goroutine to finish.
364         req.Body.Close()
365         <-readerDone
366 }
367
368 func (ht *serverHandlerTransport) runStream() {
369         for {
370                 select {
371                 case fn, ok := <-ht.writes:
372                         if !ok {
373                                 return
374                         }
375                         fn()
376                 case <-ht.closedCh:
377                         return
378                 }
379         }
380 }
381
382 func (ht *serverHandlerTransport) Drain() {
383         panic("Drain() is not implemented")
384 }
385
386 // mapRecvMsgError returns the non-nil err into the appropriate
387 // error value as expected by callers of *grpc.parser.recvMsg.
388 // In particular, in can only be:
389 //   * io.EOF
390 //   * io.ErrUnexpectedEOF
391 //   * of type transport.ConnectionError
392 //   * of type transport.StreamError
393 func mapRecvMsgError(err error) error {
394         if err == io.EOF || err == io.ErrUnexpectedEOF {
395                 return err
396         }
397         if se, ok := err.(http2.StreamError); ok {
398                 if code, ok := http2ErrConvTab[se.Code]; ok {
399                         return StreamError{
400                                 Code: code,
401                                 Desc: se.Error(),
402                         }
403                 }
404         }
405         return connectionErrorf(true, err, err.Error())
406 }