// RawChannel::ReadBuffer ------------------------------------------------------
-RawChannel::ReadBuffer::ReadBuffer()
- : buffer_(kReadSize),
- num_valid_bytes_(0) {
+RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
}
RawChannel::ReadBuffer::~ReadBuffer() {
if (!transport_data)
return false;
- const std::vector<embedder::PlatformHandle>* all_platform_handles =
+ const embedder::PlatformHandleVector* all_platform_handles =
transport_data->platform_handles();
if (!all_platform_handles) {
DCHECK_EQ(platform_handles_offset_, 0u);
DCHECK(HavePlatformHandlesToSend());
TransportData* transport_data = message_queue_.front()->transport_data();
- std::vector<embedder::PlatformHandle>* all_platform_handles =
+ embedder::PlatformHandleVector* all_platform_handles =
transport_data->platform_handles();
*num_platform_handles =
all_platform_handles->size() - platform_handles_offset_;
DCHECK_LT(data_offset_, message->total_size());
size_t bytes_to_write = message->total_size() - data_offset_;
- size_t transport_data_buffer_size = message->transport_data() ?
- message->transport_data()->buffer_size() : 0;
+ size_t transport_data_buffer_size =
+ message->transport_data() ? message->transport_data()->buffer_size() : 0;
if (!transport_data_buffer_size) {
// Only write from the main buffer.
return;
}
+ // TODO(vtl): We could actually send out buffers from multiple messages, with
+ // the "stopping" condition being reaching a message with platform handles
+ // attached.
+
// Write from both buffers.
- DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
- transport_data_buffer_size);
+ DCHECK_EQ(
+ bytes_to_write,
+ message->main_buffer_size() - data_offset_ + transport_data_buffer_size);
Buffer buffer1 = {
- static_cast<const char*>(message->main_buffer()) + data_offset_,
- message->main_buffer_size() - data_offset_
- };
+ static_cast<const char*>(message->main_buffer()) + data_offset_,
+ message->main_buffer_size() - data_offset_};
buffers->push_back(buffer1);
Buffer buffer2 = {
- static_cast<const char*>(message->transport_data()->buffer()),
- transport_data_buffer_size
- };
+ static_cast<const char*>(message->transport_data()->buffer()),
+ transport_data_buffer_size};
buffers->push_back(buffer2);
}
// RawChannel ------------------------------------------------------------------
RawChannel::RawChannel()
- : message_loop_for_io_(NULL),
- delegate_(NULL),
+ : message_loop_for_io_(nullptr),
+ delegate_(nullptr),
read_stopped_(false),
write_stopped_(false),
weak_ptr_factory_(this) {
write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
if (!OnInit()) {
- delegate_ = NULL;
- message_loop_for_io_ = NULL;
+ delegate_ = nullptr;
+ message_loop_for_io_ = nullptr;
read_buffer_.reset();
write_buffer_.reset();
return false;
}
- return ScheduleRead() == IO_PENDING;
+ IOResult io_result = ScheduleRead();
+ if (io_result != IO_PENDING) {
+ // This will notify the delegate about the read failure. Although we're on
+ // the I/O thread, don't call it in the nested context.
+ message_loop_for_io_->PostTask(FROM_HERE,
+ base::Bind(&RawChannel::OnReadCompleted,
+ weak_ptr_factory_.GetWeakPtr(),
+ io_result,
+ 0));
+ }
+
+ // ScheduleRead() failure is treated as a read failure (by notifying the
+ // delegate), not as an init failure.
+ return true;
}
void RawChannel::Shutdown() {
<< "Shutting down RawChannel with write buffer nonempty";
// Reset the delegate so that it won't receive further calls.
- delegate_ = NULL;
+ delegate_ = nullptr;
read_stopped_ = true;
write_stopped_ = true;
weak_ptr_factory_.InvalidateWeakPtrs();
return false;
if (!write_buffer_->message_queue_.empty()) {
- write_buffer_->message_queue_.push_back(message.release());
+ EnqueueMessageNoLock(message.Pass());
return true;
}
- write_buffer_->message_queue_.push_front(message.release());
+ EnqueueMessageNoLock(message.Pass());
DCHECK_EQ(write_buffer_->data_offset_, 0u);
size_t platform_handles_written = 0;
if (io_result == IO_PENDING)
return true;
- bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
- platform_handles_written,
- bytes_written);
+ bool result = OnWriteCompletedNoLock(
+ io_result, platform_handles_written, bytes_written);
if (!result) {
- // Even if we're on the I/O thread, don't call |OnFatalError()| in the
- // nested context.
- message_loop_for_io_->PostTask(
- FROM_HERE,
- base::Bind(&RawChannel::CallOnFatalError,
- weak_ptr_factory_.GetWeakPtr(),
- Delegate::FATAL_ERROR_FAILED_WRITE));
+ // Even if we're on the I/O thread, don't call |OnError()| in the nested
+ // context.
+ message_loop_for_io_->PostTask(FROM_HERE,
+ base::Bind(&RawChannel::CallOnError,
+ weak_ptr_factory_.GetWeakPtr(),
+ Delegate::ERROR_WRITE));
}
return result;
return write_buffer_->message_queue_.empty();
}
-RawChannel::ReadBuffer* RawChannel::read_buffer() {
- DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
- return read_buffer_.get();
-}
-
-RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
- write_lock_.AssertAcquired();
- return write_buffer_.get();
-}
-
-void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
+void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
if (read_stopped_) {
return;
}
- IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
-
// Keep reading data in a loop, and dispatch messages if enough data is
// received. Exit the loop if any of the following happens:
// - one or more messages were dispatched;
// - the last read failed, was a partial read or would block;
// - |Shutdown()| was called.
do {
- if (io_result != IO_SUCCEEDED) {
- read_stopped_ = true;
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
- return;
+ switch (io_result) {
+ case IO_SUCCEEDED:
+ break;
+ case IO_FAILED_SHUTDOWN:
+ case IO_FAILED_BROKEN:
+ case IO_FAILED_UNKNOWN:
+ read_stopped_ = true;
+ CallOnError(ReadIOResultToError(io_result));
+ return;
+ case IO_PENDING:
+ NOTREACHED();
+ return;
}
read_buffer_->num_valid_bytes_ += bytes_read;
// TODO(vtl): Use |message_size| more intelligently (e.g., to request the
// next read).
// TODO(vtl): Validate that |message_size| is sane.
- while (remaining_bytes > 0 &&
- MessageInTransit::GetNextMessageSize(
- &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
- &message_size) &&
+ while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
+ &read_buffer_->buffer_[read_buffer_start],
+ remaining_bytes,
+ &message_size) &&
remaining_bytes >= message_size) {
- MessageInTransit::View
- message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
+ MessageInTransit::View message_view(
+ message_size, &read_buffer_->buffer_[read_buffer_start]);
DCHECK_EQ(message_view.total_size(), message_size);
- // Dispatch the message.
- DCHECK(delegate_);
- delegate_->OnReadMessage(message_view);
- if (read_stopped_) {
- // |Shutdown()| was called in |OnReadMessage()|.
- // TODO(vtl): Add test for this case.
+ const char* error_message = nullptr;
+ if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
+ &error_message)) {
+ DCHECK(error_message);
+ LOG(ERROR) << "Received invalid message: " << error_message;
+ read_stopped_ = true;
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
return;
}
+
+ if (message_view.type() == MessageInTransit::kTypeRawChannel) {
+ if (!OnReadMessageForRawChannel(message_view)) {
+ read_stopped_ = true;
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+ return;
+ }
+ } else {
+ embedder::ScopedPlatformHandleVectorPtr platform_handles;
+ if (message_view.transport_data_buffer()) {
+ size_t num_platform_handles;
+ const void* platform_handle_table;
+ TransportData::GetPlatformHandleTable(
+ message_view.transport_data_buffer(),
+ &num_platform_handles,
+ &platform_handle_table);
+
+ if (num_platform_handles > 0) {
+ platform_handles =
+ GetReadPlatformHandles(num_platform_handles,
+ platform_handle_table).Pass();
+ if (!platform_handles) {
+ LOG(ERROR) << "Invalid number of platform handles received";
+ read_stopped_ = true;
+ CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+ return;
+ }
+ }
+ }
+
+ // TODO(vtl): In the case that we aren't expecting any platform handles,
+ // for the POSIX implementation, we should confirm that none are stored.
+
+ // Dispatch the message.
+ DCHECK(delegate_);
+ delegate_->OnReadMessage(message_view, platform_handles.Pass());
+ if (read_stopped_) {
+ // |Shutdown()| was called in |OnReadMessage()|.
+ // TODO(vtl): Add test for this case.
+ return;
+ }
+ }
+
did_dispatch_message = true;
// Update our state.
read_buffer_->num_valid_bytes_ = remaining_bytes;
if (read_buffer_->num_valid_bytes_ > 0) {
memmove(&read_buffer_->buffer_[0],
- &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
+ &read_buffer_->buffer_[read_buffer_start],
+ remaining_bytes);
}
read_buffer_start = 0;
}
if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
- kReadSize) {
+ kReadSize) {
// Use power-of-2 buffer sizes.
// TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
// maximum message size to whatever extent necessary).
} while (io_result != IO_PENDING);
}
-void RawChannel::OnWriteCompleted(bool result,
+void RawChannel::OnWriteCompleted(IOResult io_result,
size_t platform_handles_written,
size_t bytes_written) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+ DCHECK_NE(io_result, IO_PENDING);
bool did_fail = false;
{
return;
}
- did_fail = !OnWriteCompletedNoLock(result,
- platform_handles_written,
- bytes_written);
+ did_fail = !OnWriteCompletedNoLock(
+ io_result, platform_handles_written, bytes_written);
}
if (did_fail)
- CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
+ CallOnError(Delegate::ERROR_WRITE);
+}
+
+void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
+ write_lock_.AssertAcquired();
+ write_buffer_->message_queue_.push_back(message.release());
+}
+
+bool RawChannel::OnReadMessageForRawChannel(
+ const MessageInTransit::View& message_view) {
+ // No non-implementation specific |RawChannel| control messages.
+ LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
+ << ")";
+ return false;
+}
+
+// static
+RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
+ IOResult io_result) {
+ switch (io_result) {
+ case IO_FAILED_SHUTDOWN:
+ return Delegate::ERROR_READ_SHUTDOWN;
+ case IO_FAILED_BROKEN:
+ return Delegate::ERROR_READ_BROKEN;
+ case IO_FAILED_UNKNOWN:
+ return Delegate::ERROR_READ_UNKNOWN;
+ case IO_SUCCEEDED:
+ case IO_PENDING:
+ NOTREACHED();
+ break;
+ }
+ return Delegate::ERROR_READ_UNKNOWN;
}
-void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
+void RawChannel::CallOnError(Delegate::Error error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
// TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
if (delegate_)
- delegate_->OnFatalError(fatal_error);
+ delegate_->OnError(error);
}
-bool RawChannel::OnWriteCompletedNoLock(bool result,
+bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
size_t platform_handles_written,
size_t bytes_written) {
write_lock_.AssertAcquired();
DCHECK(!write_stopped_);
DCHECK(!write_buffer_->message_queue_.empty());
- if (result) {
+ if (io_result == IO_SUCCEEDED) {
write_buffer_->platform_handles_offset_ += platform_handles_written;
write_buffer_->data_offset_ += bytes_written;
MessageInTransit* message = write_buffer_->message_queue_.front();
if (write_buffer_->data_offset_ >= message->total_size()) {
// Complete write.
- DCHECK_EQ(write_buffer_->data_offset_, message->total_size());
+ CHECK_EQ(write_buffer_->data_offset_, message->total_size());
write_buffer_->message_queue_.pop_front();
delete message;
write_buffer_->platform_handles_offset_ = 0;
}
// Schedule the next write.
- IOResult io_result = ScheduleWriteNoLock();
+ io_result = ScheduleWriteNoLock();
if (io_result == IO_PENDING)
return true;
- DCHECK_EQ(io_result, IO_FAILED);
+ DCHECK_NE(io_result, IO_SUCCEEDED);
}
write_stopped_ = true;