#include "mojo/public/bindings/lib/connector.h"
-#include <assert.h>
#include <stdlib.h>
-#include <algorithm>
+#include "mojo/public/bindings/error_handler.h"
namespace mojo {
namespace internal {
Connector::Connector(ScopedMessagePipeHandle message_pipe,
MojoAsyncWaiter* waiter)
- : waiter_(waiter),
+ : error_handler_(NULL),
+ waiter_(waiter),
message_pipe_(message_pipe.Pass()),
incoming_receiver_(NULL),
async_wait_id_(0),
- error_(false) {
+ error_(false),
+ drop_writes_(false) {
+ // Even though we don't have an incoming receiver, we still want to monitor
+ // the message pipe to know if is closed or encounters an error.
+ WaitToReadMore();
}
Connector::~Connector() {
waiter_->CancelWait(waiter_, async_wait_id_);
}
-void Connector::SetIncomingReceiver(MessageReceiver* receiver) {
- assert(!incoming_receiver_);
- incoming_receiver_ = receiver;
- if (incoming_receiver_)
- WaitToReadMore();
-}
-
bool Connector::Accept(Message* message) {
if (error_)
return false;
- WriteOne(message);
- return !error_;
+ if (drop_writes_)
+ return true;
+
+ MojoResult rv = WriteMessageRaw(
+ message_pipe_.get(),
+ message->data,
+ message->data->header.num_bytes,
+ message->handles.empty() ? NULL :
+ reinterpret_cast<const MojoHandle*>(&message->handles[0]),
+ static_cast<uint32_t>(message->handles.size()),
+ MOJO_WRITE_MESSAGE_FLAG_NONE);
+
+ switch (rv) {
+ case MOJO_RESULT_OK:
+ // The handles were successfully transferred, so we don't need the message
+ // to track their lifetime any longer.
+ message->handles.clear();
+ break;
+ case MOJO_RESULT_FAILED_PRECONDITION:
+ // There's no point in continuing to write to this pipe since the other
+ // end is gone. Avoid writing any future messages. Hide write failures
+ // from the caller since we'd like them to continue consuming any backlog
+ // of incoming messages before regarding the message pipe as closed.
+ drop_writes_ = true;
+ break;
+ default:
+ // This particular write was rejected, presumably because of bad input.
+ // The pipe is not necessarily in a bad state.
+ return false;
+ }
+ return true;
}
// static
-void Connector::OnHandleReady(void* closure, MojoResult result) {
+void Connector::CallOnHandleReady(void* closure, MojoResult result) {
Connector* self = static_cast<Connector*>(closure);
- self->async_wait_id_ = 0;
- self->ReadMore();
+ self->OnHandleReady(result);
+}
+
+void Connector::OnHandleReady(MojoResult result) {
+ async_wait_id_ = 0;
+
+ if (result == MOJO_RESULT_OK) {
+ ReadMore();
+ } else {
+ error_ = true;
+ }
+
+ if (error_ && error_handler_)
+ error_handler_->OnError();
}
void Connector::WaitToReadMore() {
message_pipe_.get().value(),
MOJO_WAIT_FLAG_READABLE,
MOJO_DEADLINE_INDEFINITE,
- &Connector::OnHandleReady,
+ &Connector::CallOnHandleReady,
this);
}
break;
}
- incoming_receiver_->Accept(&message);
- }
-}
-
-void Connector::WriteOne(Message* message) {
- MojoResult rv = WriteMessageRaw(
- message_pipe_.get(),
- message->data,
- message->data->header.num_bytes,
- message->handles.empty() ? NULL :
- reinterpret_cast<const MojoHandle*>(&message->handles[0]),
- static_cast<uint32_t>(message->handles.size()),
- MOJO_WRITE_MESSAGE_FLAG_NONE);
- if (rv == MOJO_RESULT_OK) {
- // The handles were successfully transferred, so we don't need the message
- // to track their lifetime any longer.
- message->handles.clear();
- } else {
- error_ = true;
+ if (incoming_receiver_)
+ incoming_receiver_->Accept(&message);
}
}