1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/raw_channel.h"
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17 #include "mojo/system/transport_data.h"
22 const size_t kReadSize = 4096;
24 // RawChannel::ReadBuffer ------------------------------------------------------
26 RawChannel::ReadBuffer::ReadBuffer()
31 RawChannel::ReadBuffer::~ReadBuffer() {
34 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
35 DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
36 *addr = &buffer_[0] + num_valid_bytes_;
40 // RawChannel::WriteBuffer -----------------------------------------------------
42 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
43 : serialized_platform_handle_size_(serialized_platform_handle_size),
44 platform_handles_offset_(0),
48 RawChannel::WriteBuffer::~WriteBuffer() {
49 STLDeleteElements(&message_queue_);
52 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
53 if (message_queue_.empty())
56 const TransportData* transport_data =
57 message_queue_.front()->transport_data();
61 const std::vector<embedder::PlatformHandle>* all_platform_handles =
62 transport_data->platform_handles();
63 if (!all_platform_handles) {
64 DCHECK_EQ(platform_handles_offset_, 0u);
67 if (platform_handles_offset_ >= all_platform_handles->size()) {
68 DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
75 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
76 size_t* num_platform_handles,
77 embedder::PlatformHandle** platform_handles,
78 void** serialization_data) {
79 DCHECK(HavePlatformHandlesToSend());
81 TransportData* transport_data = message_queue_.front()->transport_data();
82 std::vector<embedder::PlatformHandle>* all_platform_handles =
83 transport_data->platform_handles();
84 *num_platform_handles =
85 all_platform_handles->size() - platform_handles_offset_;
86 *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
87 size_t serialization_data_offset =
88 transport_data->platform_handle_table_offset();
89 DCHECK_GT(serialization_data_offset, 0u);
90 serialization_data_offset +=
91 platform_handles_offset_ * serialized_platform_handle_size_;
93 static_cast<char*>(transport_data->buffer()) + serialization_data_offset;
96 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
99 if (message_queue_.empty())
102 MessageInTransit* message = message_queue_.front();
103 DCHECK_LT(data_offset_, message->total_size());
104 size_t bytes_to_write = message->total_size() - data_offset_;
106 size_t transport_data_buffer_size = message->transport_data() ?
107 message->transport_data()->buffer_size() : 0;
109 if (!transport_data_buffer_size) {
110 // Only write from the main buffer.
111 DCHECK_LT(data_offset_, message->main_buffer_size());
112 DCHECK_LE(bytes_to_write, message->main_buffer_size());
114 static_cast<const char*>(message->main_buffer()) + data_offset_,
116 buffers->push_back(buffer);
120 if (data_offset_ >= message->main_buffer_size()) {
121 // Only write from the transport data buffer.
122 DCHECK_LT(data_offset_ - message->main_buffer_size(),
123 transport_data_buffer_size);
124 DCHECK_LE(bytes_to_write, transport_data_buffer_size);
126 static_cast<const char*>(message->transport_data()->buffer()) +
127 (data_offset_ - message->main_buffer_size()),
129 buffers->push_back(buffer);
133 // Write from both buffers.
134 DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
135 transport_data_buffer_size);
137 static_cast<const char*>(message->main_buffer()) + data_offset_,
138 message->main_buffer_size() - data_offset_
140 buffers->push_back(buffer1);
142 static_cast<const char*>(message->transport_data()->buffer()),
143 transport_data_buffer_size
145 buffers->push_back(buffer2);
148 // RawChannel ------------------------------------------------------------------
150 RawChannel::RawChannel()
151 : message_loop_for_io_(NULL),
153 read_stopped_(false),
154 write_stopped_(false),
155 weak_ptr_factory_(this) {
158 RawChannel::~RawChannel() {
159 DCHECK(!read_buffer_);
160 DCHECK(!write_buffer_);
162 // No need to take the |write_lock_| here -- if there are still weak pointers
163 // outstanding, then we're hosed anyway (since we wouldn't be able to
164 // invalidate them cleanly, since we might not be on the I/O thread).
165 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
168 bool RawChannel::Init(Delegate* delegate) {
172 delegate_ = delegate;
174 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
175 DCHECK(!message_loop_for_io_);
176 message_loop_for_io_ =
177 static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
179 // No need to take the lock. No one should be using us yet.
180 DCHECK(!read_buffer_);
181 read_buffer_.reset(new ReadBuffer);
182 DCHECK(!write_buffer_);
183 write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
187 message_loop_for_io_ = NULL;
188 read_buffer_.reset();
189 write_buffer_.reset();
193 return ScheduleRead() == IO_PENDING;
196 void RawChannel::Shutdown() {
197 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
199 base::AutoLock locker(write_lock_);
201 LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
202 << "Shutting down RawChannel with write buffer nonempty";
204 // Reset the delegate so that it won't receive further calls.
206 read_stopped_ = true;
207 write_stopped_ = true;
208 weak_ptr_factory_.InvalidateWeakPtrs();
210 OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
213 // Reminder: This must be thread-safe.
214 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
217 base::AutoLock locker(write_lock_);
221 if (!write_buffer_->message_queue_.empty()) {
222 write_buffer_->message_queue_.push_back(message.release());
226 write_buffer_->message_queue_.push_front(message.release());
227 DCHECK_EQ(write_buffer_->data_offset_, 0u);
229 size_t platform_handles_written = 0;
230 size_t bytes_written = 0;
231 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
232 if (io_result == IO_PENDING)
235 bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
236 platform_handles_written,
239 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
241 message_loop_for_io_->PostTask(
243 base::Bind(&RawChannel::CallOnFatalError,
244 weak_ptr_factory_.GetWeakPtr(),
245 Delegate::FATAL_ERROR_FAILED_WRITE));
251 // Reminder: This must be thread-safe.
252 bool RawChannel::IsWriteBufferEmpty() {
253 base::AutoLock locker(write_lock_);
254 return write_buffer_->message_queue_.empty();
257 RawChannel::ReadBuffer* RawChannel::read_buffer() {
258 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
259 return read_buffer_.get();
262 RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
263 write_lock_.AssertAcquired();
264 return write_buffer_.get();
267 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
268 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
275 IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
277 // Keep reading data in a loop, and dispatch messages if enough data is
278 // received. Exit the loop if any of the following happens:
279 // - one or more messages were dispatched;
280 // - the last read failed, was a partial read or would block;
281 // - |Shutdown()| was called.
283 if (io_result != IO_SUCCEEDED) {
284 read_stopped_ = true;
285 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
289 read_buffer_->num_valid_bytes_ += bytes_read;
291 // Dispatch all the messages that we can.
292 bool did_dispatch_message = false;
293 // Tracks the offset of the first undispatched message in |read_buffer_|.
294 // Currently, we copy data to ensure that this is zero at the beginning.
295 size_t read_buffer_start = 0;
296 size_t remaining_bytes = read_buffer_->num_valid_bytes_;
298 // Note that we rely on short-circuit evaluation here:
299 // - |read_buffer_start| may be an invalid index into
300 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
301 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
302 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
304 // TODO(vtl): Validate that |message_size| is sane.
305 while (remaining_bytes > 0 &&
306 MessageInTransit::GetNextMessageSize(
307 &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
309 remaining_bytes >= message_size) {
310 MessageInTransit::View
311 message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
312 DCHECK_EQ(message_view.total_size(), message_size);
314 // Dispatch the message.
316 delegate_->OnReadMessage(message_view);
318 // |Shutdown()| was called in |OnReadMessage()|.
319 // TODO(vtl): Add test for this case.
322 did_dispatch_message = true;
325 read_buffer_start += message_size;
326 remaining_bytes -= message_size;
329 if (read_buffer_start > 0) {
330 // Move data back to start.
331 read_buffer_->num_valid_bytes_ = remaining_bytes;
332 if (read_buffer_->num_valid_bytes_ > 0) {
333 memmove(&read_buffer_->buffer_[0],
334 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
336 read_buffer_start = 0;
339 if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
341 // Use power-of-2 buffer sizes.
342 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
343 // maximum message size to whatever extent necessary).
344 // TODO(vtl): We may often be able to peek at the header and get the real
345 // required extra space (which may be much bigger than |kReadSize|).
346 size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
347 while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
350 // TODO(vtl): It's suboptimal to zero out the fresh memory.
351 read_buffer_->buffer_.resize(new_size, 0);
354 // (1) If we dispatched any messages, stop reading for now (and let the
355 // message loop do its thing for another round).
356 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
357 // a single message. Risks: slower, more complex if we want to avoid lots of
358 // copying. ii. Keep reading until there's no more data and dispatch all the
359 // messages we can. Risks: starvation of other users of the message loop.)
360 // (2) If we didn't max out |kReadSize|, stop reading for now.
361 bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
363 io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
364 } while (io_result != IO_PENDING);
367 void RawChannel::OnWriteCompleted(bool result,
368 size_t platform_handles_written,
369 size_t bytes_written) {
370 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
372 bool did_fail = false;
374 base::AutoLock locker(write_lock_);
375 DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
377 if (write_stopped_) {
382 did_fail = !OnWriteCompletedNoLock(result,
383 platform_handles_written,
388 CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
391 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
392 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
393 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
395 delegate_->OnFatalError(fatal_error);
398 bool RawChannel::OnWriteCompletedNoLock(bool result,
399 size_t platform_handles_written,
400 size_t bytes_written) {
401 write_lock_.AssertAcquired();
403 DCHECK(!write_stopped_);
404 DCHECK(!write_buffer_->message_queue_.empty());
407 write_buffer_->platform_handles_offset_ += platform_handles_written;
408 write_buffer_->data_offset_ += bytes_written;
410 MessageInTransit* message = write_buffer_->message_queue_.front();
411 if (write_buffer_->data_offset_ >= message->total_size()) {
413 DCHECK_EQ(write_buffer_->data_offset_, message->total_size());
414 write_buffer_->message_queue_.pop_front();
416 write_buffer_->platform_handles_offset_ = 0;
417 write_buffer_->data_offset_ = 0;
419 if (write_buffer_->message_queue_.empty())
423 // Schedule the next write.
424 IOResult io_result = ScheduleWriteNoLock();
425 if (io_result == IO_PENDING)
427 DCHECK_EQ(io_result, IO_FAILED);
430 write_stopped_ = true;
431 STLDeleteElements(&write_buffer_->message_queue_);
432 write_buffer_->platform_handles_offset_ = 0;
433 write_buffer_->data_offset_ = 0;
437 } // namespace system