Imported Upstream version 4.8.1
[platform/upstream/gcc48.git] / libgo / go / net / rpc / client.go
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package rpc
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "io"
12         "log"
13         "net"
14         "net/http"
15         "sync"
16 )
17
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
21
22 func (e ServerError) Error() string {
23         return string(e)
24 }
25
26 var ErrShutdown = errors.New("connection is shut down")
27
28 // Call represents an active RPC.
29 type Call struct {
30         ServiceMethod string      // The name of the service and method to call.
31         Args          interface{} // The argument to the function (*struct).
32         Reply         interface{} // The reply from the function (*struct).
33         Error         error       // After completion, the error status.
34         Done          chan *Call  // Strobes when call is complete.
35 }
36
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client, and a Client may be used by
40 // multiple goroutines simultaneously.
41 type Client struct {
42         mutex    sync.Mutex // protects pending, seq, request
43         sending  sync.Mutex
44         request  Request
45         seq      uint64
46         codec    ClientCodec
47         pending  map[uint64]*Call
48         closing  bool
49         shutdown bool
50 }
51
52 // A ClientCodec implements writing of RPC requests and
53 // reading of RPC responses for the client side of an RPC session.
54 // The client calls WriteRequest to write a request to the connection
55 // and calls ReadResponseHeader and ReadResponseBody in pairs
56 // to read responses.  The client calls Close when finished with the
57 // connection. ReadResponseBody may be called with a nil
58 // argument to force the body of the response to be read and then
59 // discarded.
60 type ClientCodec interface {
61         WriteRequest(*Request, interface{}) error
62         ReadResponseHeader(*Response) error
63         ReadResponseBody(interface{}) error
64
65         Close() error
66 }
67
68 func (client *Client) send(call *Call) {
69         client.sending.Lock()
70         defer client.sending.Unlock()
71
72         // Register this call.
73         client.mutex.Lock()
74         if client.shutdown {
75                 call.Error = ErrShutdown
76                 client.mutex.Unlock()
77                 call.done()
78                 return
79         }
80         seq := client.seq
81         client.seq++
82         client.pending[seq] = call
83         client.mutex.Unlock()
84
85         // Encode and send the request.
86         client.request.Seq = seq
87         client.request.ServiceMethod = call.ServiceMethod
88         err := client.codec.WriteRequest(&client.request, call.Args)
89         if err != nil {
90                 client.mutex.Lock()
91                 call = client.pending[seq]
92                 delete(client.pending, seq)
93                 client.mutex.Unlock()
94                 if call != nil {
95                         call.Error = err
96                         call.done()
97                 }
98         }
99 }
100
101 func (client *Client) input() {
102         var err error
103         var response Response
104         for err == nil {
105                 response = Response{}
106                 err = client.codec.ReadResponseHeader(&response)
107                 if err != nil {
108                         if err == io.EOF && !client.closing {
109                                 err = io.ErrUnexpectedEOF
110                         }
111                         break
112                 }
113                 seq := response.Seq
114                 client.mutex.Lock()
115                 call := client.pending[seq]
116                 delete(client.pending, seq)
117                 client.mutex.Unlock()
118
119                 switch {
120                 case call == nil:
121                         // We've got no pending call. That usually means that
122                         // WriteRequest partially failed, and call was already
123                         // removed; response is a server telling us about an
124                         // error reading request body. We should still attempt
125                         // to read error body, but there's no one to give it to.
126                         err = client.codec.ReadResponseBody(nil)
127                         if err != nil {
128                                 err = errors.New("reading error body: " + err.Error())
129                         }
130                 case response.Error != "":
131                         // We've got an error response. Give this to the request;
132                         // any subsequent requests will get the ReadResponseBody
133                         // error if there is one.
134                         call.Error = ServerError(response.Error)
135                         err = client.codec.ReadResponseBody(nil)
136                         if err != nil {
137                                 err = errors.New("reading error body: " + err.Error())
138                         }
139                         call.done()
140                 default:
141                         err = client.codec.ReadResponseBody(call.Reply)
142                         if err != nil {
143                                 call.Error = errors.New("reading body " + err.Error())
144                         }
145                         call.done()
146                 }
147         }
148         // Terminate pending calls.
149         client.sending.Lock()
150         client.mutex.Lock()
151         client.shutdown = true
152         closing := client.closing
153         for _, call := range client.pending {
154                 call.Error = err
155                 call.done()
156         }
157         client.mutex.Unlock()
158         client.sending.Unlock()
159         if err != io.EOF && !closing {
160                 log.Println("rpc: client protocol error:", err)
161         }
162 }
163
164 func (call *Call) done() {
165         select {
166         case call.Done <- call:
167                 // ok
168         default:
169                 // We don't want to block here.  It is the caller's responsibility to make
170                 // sure the channel has enough buffer space. See comment in Go().
171                 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
172         }
173 }
174
175 // NewClient returns a new Client to handle requests to the
176 // set of services at the other end of the connection.
177 // It adds a buffer to the write side of the connection so
178 // the header and payload are sent as a unit.
179 func NewClient(conn io.ReadWriteCloser) *Client {
180         encBuf := bufio.NewWriter(conn)
181         client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
182         return NewClientWithCodec(client)
183 }
184
185 // NewClientWithCodec is like NewClient but uses the specified
186 // codec to encode requests and decode responses.
187 func NewClientWithCodec(codec ClientCodec) *Client {
188         client := &Client{
189                 codec:   codec,
190                 pending: make(map[uint64]*Call),
191         }
192         go client.input()
193         return client
194 }
195
196 type gobClientCodec struct {
197         rwc    io.ReadWriteCloser
198         dec    *gob.Decoder
199         enc    *gob.Encoder
200         encBuf *bufio.Writer
201 }
202
203 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
204         if err = c.enc.Encode(r); err != nil {
205                 return
206         }
207         if err = c.enc.Encode(body); err != nil {
208                 return
209         }
210         return c.encBuf.Flush()
211 }
212
213 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
214         return c.dec.Decode(r)
215 }
216
217 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
218         return c.dec.Decode(body)
219 }
220
221 func (c *gobClientCodec) Close() error {
222         return c.rwc.Close()
223 }
224
225 // DialHTTP connects to an HTTP RPC server at the specified network address
226 // listening on the default HTTP RPC path.
227 func DialHTTP(network, address string) (*Client, error) {
228         return DialHTTPPath(network, address, DefaultRPCPath)
229 }
230
231 // DialHTTPPath connects to an HTTP RPC server
232 // at the specified network address and path.
233 func DialHTTPPath(network, address, path string) (*Client, error) {
234         var err error
235         conn, err := net.Dial(network, address)
236         if err != nil {
237                 return nil, err
238         }
239         io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
240
241         // Require successful HTTP response
242         // before switching to RPC protocol.
243         resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
244         if err == nil && resp.Status == connected {
245                 return NewClient(conn), nil
246         }
247         if err == nil {
248                 err = errors.New("unexpected HTTP response: " + resp.Status)
249         }
250         conn.Close()
251         return nil, &net.OpError{
252                 Op:   "dial-http",
253                 Net:  network + " " + address,
254                 Addr: nil,
255                 Err:  err,
256         }
257 }
258
259 // Dial connects to an RPC server at the specified network address.
260 func Dial(network, address string) (*Client, error) {
261         conn, err := net.Dial(network, address)
262         if err != nil {
263                 return nil, err
264         }
265         return NewClient(conn), nil
266 }
267
268 func (client *Client) Close() error {
269         client.mutex.Lock()
270         if client.shutdown || client.closing {
271                 client.mutex.Unlock()
272                 return ErrShutdown
273         }
274         client.closing = true
275         client.mutex.Unlock()
276         return client.codec.Close()
277 }
278
279 // Go invokes the function asynchronously.  It returns the Call structure representing
280 // the invocation.  The done channel will signal when the call is complete by returning
281 // the same Call object.  If done is nil, Go will allocate a new channel.
282 // If non-nil, done must be buffered or Go will deliberately crash.
283 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
284         call := new(Call)
285         call.ServiceMethod = serviceMethod
286         call.Args = args
287         call.Reply = reply
288         if done == nil {
289                 done = make(chan *Call, 10) // buffered.
290         } else {
291                 // If caller passes done != nil, it must arrange that
292                 // done has enough buffer for the number of simultaneous
293                 // RPCs that will be using that channel.  If the channel
294                 // is totally unbuffered, it's best not to run at all.
295                 if cap(done) == 0 {
296                         log.Panic("rpc: done channel is unbuffered")
297                 }
298         }
299         call.Done = done
300         client.send(call)
301         return call
302 }
303
304 // Call invokes the named function, waits for it to complete, and returns its error status.
305 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
306         call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
307         return call.Error
308 }