1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
18 // ServerError represents an error that has been returned from
19 // the remote side of the RPC connection.
20 type ServerError string
22 func (e ServerError) Error() string {
26 var ErrShutdown = errors.New("connection is shut down")
28 // Call represents an active RPC.
30 ServiceMethod string // The name of the service and method to call.
31 Args interface{} // The argument to the function (*struct).
32 Reply interface{} // The reply from the function (*struct).
33 Error error // After completion, the error status.
34 Done chan *Call // Strobes when call is complete.
37 // Client represents an RPC Client.
38 // There may be multiple outstanding Calls associated
39 // with a single Client, and a Client may be used by
40 // multiple goroutines simultaneously.
42 mutex sync.Mutex // protects pending, seq, request
47 pending map[uint64]*Call
52 // A ClientCodec implements writing of RPC requests and
53 // reading of RPC responses for the client side of an RPC session.
54 // The client calls WriteRequest to write a request to the connection
55 // and calls ReadResponseHeader and ReadResponseBody in pairs
56 // to read responses. The client calls Close when finished with the
57 // connection. ReadResponseBody may be called with a nil
58 // argument to force the body of the response to be read and then
60 type ClientCodec interface {
61 WriteRequest(*Request, interface{}) error
62 ReadResponseHeader(*Response) error
63 ReadResponseBody(interface{}) error
68 func (client *Client) send(call *Call) {
70 defer client.sending.Unlock()
72 // Register this call.
75 call.Error = ErrShutdown
82 client.pending[seq] = call
85 // Encode and send the request.
86 client.request.Seq = seq
87 client.request.ServiceMethod = call.ServiceMethod
88 err := client.codec.WriteRequest(&client.request, call.Args)
91 call = client.pending[seq]
92 delete(client.pending, seq)
101 func (client *Client) input() {
103 var response Response
105 response = Response{}
106 err = client.codec.ReadResponseHeader(&response)
108 if err == io.EOF && !client.closing {
109 err = io.ErrUnexpectedEOF
115 call := client.pending[seq]
116 delete(client.pending, seq)
117 client.mutex.Unlock()
121 // We've got no pending call. That usually means that
122 // WriteRequest partially failed, and call was already
123 // removed; response is a server telling us about an
124 // error reading request body. We should still attempt
125 // to read error body, but there's no one to give it to.
126 err = client.codec.ReadResponseBody(nil)
128 err = errors.New("reading error body: " + err.Error())
130 case response.Error != "":
131 // We've got an error response. Give this to the request;
132 // any subsequent requests will get the ReadResponseBody
133 // error if there is one.
134 call.Error = ServerError(response.Error)
135 err = client.codec.ReadResponseBody(nil)
137 err = errors.New("reading error body: " + err.Error())
141 err = client.codec.ReadResponseBody(call.Reply)
143 call.Error = errors.New("reading body " + err.Error())
148 // Terminate pending calls.
149 client.sending.Lock()
151 client.shutdown = true
152 closing := client.closing
153 for _, call := range client.pending {
157 client.mutex.Unlock()
158 client.sending.Unlock()
159 if err != io.EOF && !closing {
160 log.Println("rpc: client protocol error:", err)
164 func (call *Call) done() {
166 case call.Done <- call:
169 // We don't want to block here. It is the caller's responsibility to make
170 // sure the channel has enough buffer space. See comment in Go().
171 log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
175 // NewClient returns a new Client to handle requests to the
176 // set of services at the other end of the connection.
177 // It adds a buffer to the write side of the connection so
178 // the header and payload are sent as a unit.
179 func NewClient(conn io.ReadWriteCloser) *Client {
180 encBuf := bufio.NewWriter(conn)
181 client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
182 return NewClientWithCodec(client)
185 // NewClientWithCodec is like NewClient but uses the specified
186 // codec to encode requests and decode responses.
187 func NewClientWithCodec(codec ClientCodec) *Client {
190 pending: make(map[uint64]*Call),
196 type gobClientCodec struct {
197 rwc io.ReadWriteCloser
203 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
204 if err = c.enc.Encode(r); err != nil {
207 if err = c.enc.Encode(body); err != nil {
210 return c.encBuf.Flush()
213 func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
214 return c.dec.Decode(r)
217 func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
218 return c.dec.Decode(body)
221 func (c *gobClientCodec) Close() error {
225 // DialHTTP connects to an HTTP RPC server at the specified network address
226 // listening on the default HTTP RPC path.
227 func DialHTTP(network, address string) (*Client, error) {
228 return DialHTTPPath(network, address, DefaultRPCPath)
231 // DialHTTPPath connects to an HTTP RPC server
232 // at the specified network address and path.
233 func DialHTTPPath(network, address, path string) (*Client, error) {
235 conn, err := net.Dial(network, address)
239 io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
241 // Require successful HTTP response
242 // before switching to RPC protocol.
243 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
244 if err == nil && resp.Status == connected {
245 return NewClient(conn), nil
248 err = errors.New("unexpected HTTP response: " + resp.Status)
251 return nil, &net.OpError{
253 Net: network + " " + address,
259 // Dial connects to an RPC server at the specified network address.
260 func Dial(network, address string) (*Client, error) {
261 conn, err := net.Dial(network, address)
265 return NewClient(conn), nil
268 func (client *Client) Close() error {
270 if client.shutdown || client.closing {
271 client.mutex.Unlock()
274 client.closing = true
275 client.mutex.Unlock()
276 return client.codec.Close()
279 // Go invokes the function asynchronously. It returns the Call structure representing
280 // the invocation. The done channel will signal when the call is complete by returning
281 // the same Call object. If done is nil, Go will allocate a new channel.
282 // If non-nil, done must be buffered or Go will deliberately crash.
283 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
285 call.ServiceMethod = serviceMethod
289 done = make(chan *Call, 10) // buffered.
291 // If caller passes done != nil, it must arrange that
292 // done has enough buffer for the number of simultaneous
293 // RPCs that will be using that channel. If the channel
294 // is totally unbuffered, it's best not to run at all.
296 log.Panic("rpc: done channel is unbuffered")
304 // Call invokes the named function, waits for it to complete, and returns its error status.
305 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
306 call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done