1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 package org.chromium.mojo.bindings;
7 import org.chromium.mojo.system.AsyncWaiter;
8 import org.chromium.mojo.system.Core;
9 import org.chromium.mojo.system.MessagePipeHandle;
10 import org.chromium.mojo.system.MessagePipeHandle.ReadMessageResult;
11 import org.chromium.mojo.system.MojoException;
12 import org.chromium.mojo.system.MojoResult;
14 import java.nio.ByteBuffer;
17 * A {@link Connector} owns a {@link MessagePipeHandle} and will send any received messages to the
18 * registered {@link MessageReceiver}. It also acts as a {@link MessageReceiver} and will send any
19 * message through the handle.
21 * The method |start| must be called before the {@link Connector} will start listening to incoming
24 public class Connector implements MessageReceiver, HandleOwner<MessagePipeHandle> {
27 * The callback that is notified when the state of the owned handle changes.
29 private final AsyncWaiterCallback mAsyncWaiterCallback = new AsyncWaiterCallback();
32 * The owned message pipe.
34 private final MessagePipeHandle mMessagePipeHandle;
37 * A waiter which is notified when a new message is available on the owned message pipe.
39 private final AsyncWaiter mAsyncWaiter;
42 * The {@link MessageReceiver} to which received messages are sent.
44 private MessageReceiver mIncomingMessageReceiver;
47 * The Cancellable for the current wait. Is |null| when not currently waiting for new messages.
49 private AsyncWaiter.Cancellable mCancellable;
52 * The error handler to notify of errors.
54 private ConnectionErrorHandler mErrorHandler;
57 * Create a new connector over a |messagePipeHandle|. The created connector will use the default
58 * {@link AsyncWaiter} from the {@link Core} implementation of |messagePipeHandle|.
60 public Connector(MessagePipeHandle messagePipeHandle) {
61 this(messagePipeHandle, BindingsHelper.getDefaultAsyncWaiterForHandle(messagePipeHandle));
65 * Create a new connector over a |messagePipeHandle| using the given {@link AsyncWaiter} to get
66 * notified of changes on the handle.
68 public Connector(MessagePipeHandle messagePipeHandle, AsyncWaiter asyncWaiter) {
70 mMessagePipeHandle = messagePipeHandle;
71 mAsyncWaiter = asyncWaiter;
75 * Set the {@link MessageReceiver} that will receive message from the owned message pipe.
77 public void setIncomingMessageReceiver(MessageReceiver incomingMessageReceiver) {
78 mIncomingMessageReceiver = incomingMessageReceiver;
82 * Set the {@link ConnectionErrorHandler} that will be notified of errors on the owned message
85 public void setErrorHandler(ConnectionErrorHandler errorHandler) {
86 mErrorHandler = errorHandler;
90 * Start listening for incoming messages.
93 assert mCancellable == null;
94 registerAsyncWaiterForRead();
98 * @see MessageReceiver#accept(Message)
101 public boolean accept(Message message) {
103 mMessagePipeHandle.writeMessage(message.getData(),
104 message.getHandles(), MessagePipeHandle.WriteFlags.NONE);
106 } catch (MojoException e) {
113 * Pass the owned handle of the connector. After this, the connector is disconnected. It cannot
114 * accept new message and it isn't listening to the handle anymore.
116 * @see org.chromium.mojo.bindings.HandleOwner#passHandle()
119 public MessagePipeHandle passHandle() {
121 MessagePipeHandle handle = mMessagePipeHandle.pass();
122 if (mIncomingMessageReceiver != null) {
123 mIncomingMessageReceiver.close();
129 * @see java.io.Closeable#close()
132 public void close() {
134 mMessagePipeHandle.close();
135 if (mIncomingMessageReceiver != null) {
136 mIncomingMessageReceiver.close();
140 private class AsyncWaiterCallback implements AsyncWaiter.Callback {
143 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int)
146 public void onResult(int result) {
147 Connector.this.onAsyncWaiterResult(result);
151 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onError(MojoException)
154 public void onError(MojoException exception) {
155 Connector.this.onError(exception);
161 * @see org.chromium.mojo.system.AsyncWaiter.Callback#onResult(int)
163 private void onAsyncWaiterResult(int result) {
165 if (result == MojoResult.OK) {
166 readOutstandingMessages();
168 onError(new MojoException(result));
172 private void onError(MojoException exception) {
175 if (mErrorHandler != null) {
176 mErrorHandler.onConnectionError(exception);
181 * Register to be called back when a new message is available on the owned message pipe.
183 private void registerAsyncWaiterForRead() {
184 assert mCancellable == null;
185 if (mAsyncWaiter != null) {
186 mCancellable = mAsyncWaiter.asyncWait(mMessagePipeHandle, Core.HandleSignals.READABLE,
187 Core.DEADLINE_INFINITE, mAsyncWaiterCallback);
189 onError(new MojoException(MojoResult.INVALID_ARGUMENT));
194 * Read all available messages on the owned message pipe.
196 private void readOutstandingMessages() {
200 result = readAndDispatchMessage(mMessagePipeHandle, mIncomingMessageReceiver);
201 } catch (MojoException e) {
205 } while (result == MojoResult.OK);
206 if (result == MojoResult.SHOULD_WAIT) {
207 registerAsyncWaiterForRead();
209 onError(new MojoException(result));
213 private void cancelIfActive() {
214 if (mCancellable != null) {
215 mCancellable.cancel();
221 * Read a message, and pass it to the given |MessageReceiver| if not null. If the
222 * |MessageReceiver| is null, the message is lost.
224 * @param receiver The {@link MessageReceiver} that will receive the read {@link Message}. Can
225 * be <code>null</code>, in which case the message is discarded.
227 static int readAndDispatchMessage(MessagePipeHandle handle, MessageReceiver receiver) {
228 // TODO(qsr) Allow usage of a pool of pre-allocated buffer for performance.
229 ReadMessageResult result = handle.readMessage(null, 0, MessagePipeHandle.ReadFlags.NONE);
230 if (result.getMojoResult() != MojoResult.RESOURCE_EXHAUSTED) {
231 return result.getMojoResult();
233 ByteBuffer buffer = ByteBuffer.allocateDirect(result.getMessageSize());
234 result = handle.readMessage(buffer, result.getHandlesCount(),
235 MessagePipeHandle.ReadFlags.NONE);
236 if (receiver != null && result.getMojoResult() == MojoResult.OK) {
237 receiver.accept(new Message(buffer, result.getHandles()));
239 return result.getMojoResult();