Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / mojo / system / raw_channel.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17 #include "mojo/system/transport_data.h"
18
19 namespace mojo {
20 namespace system {
21
22 const size_t kReadSize = 4096;
23
24 // RawChannel::ReadBuffer ------------------------------------------------------
25
26 RawChannel::ReadBuffer::ReadBuffer()
27     : buffer_(kReadSize),
28       num_valid_bytes_(0) {
29 }
30
31 RawChannel::ReadBuffer::~ReadBuffer() {
32 }
33
34 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
35   DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
36   *addr = &buffer_[0] + num_valid_bytes_;
37   *size = kReadSize;
38 }
39
40 // RawChannel::WriteBuffer -----------------------------------------------------
41
42 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
43     : serialized_platform_handle_size_(serialized_platform_handle_size),
44       platform_handles_offset_(0),
45       data_offset_(0) {
46 }
47
48 RawChannel::WriteBuffer::~WriteBuffer() {
49   STLDeleteElements(&message_queue_);
50 }
51
52 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
53   if (message_queue_.empty())
54     return false;
55
56   const TransportData* transport_data =
57       message_queue_.front()->transport_data();
58   if (!transport_data)
59     return false;
60
61   const std::vector<embedder::PlatformHandle>* all_platform_handles =
62       transport_data->platform_handles();
63   if (!all_platform_handles) {
64     DCHECK_EQ(platform_handles_offset_, 0u);
65     return false;
66   }
67   if (platform_handles_offset_ >= all_platform_handles->size()) {
68     DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
69     return false;
70   }
71
72   return true;
73 }
74
75 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
76     size_t* num_platform_handles,
77     embedder::PlatformHandle** platform_handles,
78     void** serialization_data) {
79   DCHECK(HavePlatformHandlesToSend());
80
81   TransportData* transport_data = message_queue_.front()->transport_data();
82   std::vector<embedder::PlatformHandle>* all_platform_handles =
83       transport_data->platform_handles();
84   *num_platform_handles =
85       all_platform_handles->size() - platform_handles_offset_;
86   *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
87   size_t serialization_data_offset =
88       transport_data->platform_handle_table_offset();
89   DCHECK_GT(serialization_data_offset, 0u);
90   serialization_data_offset +=
91       platform_handles_offset_ * serialized_platform_handle_size_;
92   *serialization_data =
93       static_cast<char*>(transport_data->buffer()) + serialization_data_offset;
94 }
95
96 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
97   buffers->clear();
98
99   if (message_queue_.empty())
100     return;
101
102   MessageInTransit* message = message_queue_.front();
103   DCHECK_LT(data_offset_, message->total_size());
104   size_t bytes_to_write = message->total_size() - data_offset_;
105
106   size_t transport_data_buffer_size = message->transport_data() ?
107       message->transport_data()->buffer_size() : 0;
108
109   if (!transport_data_buffer_size) {
110     // Only write from the main buffer.
111     DCHECK_LT(data_offset_, message->main_buffer_size());
112     DCHECK_LE(bytes_to_write, message->main_buffer_size());
113     Buffer buffer = {
114         static_cast<const char*>(message->main_buffer()) + data_offset_,
115         bytes_to_write};
116     buffers->push_back(buffer);
117     return;
118   }
119
120   if (data_offset_ >= message->main_buffer_size()) {
121     // Only write from the transport data buffer.
122     DCHECK_LT(data_offset_ - message->main_buffer_size(),
123               transport_data_buffer_size);
124     DCHECK_LE(bytes_to_write, transport_data_buffer_size);
125     Buffer buffer = {
126         static_cast<const char*>(message->transport_data()->buffer()) +
127             (data_offset_ - message->main_buffer_size()),
128         bytes_to_write};
129     buffers->push_back(buffer);
130     return;
131   }
132
133   // Write from both buffers.
134   DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
135                                 transport_data_buffer_size);
136   Buffer buffer1 = {
137     static_cast<const char*>(message->main_buffer()) + data_offset_,
138     message->main_buffer_size() - data_offset_
139   };
140   buffers->push_back(buffer1);
141   Buffer buffer2 = {
142     static_cast<const char*>(message->transport_data()->buffer()),
143     transport_data_buffer_size
144   };
145   buffers->push_back(buffer2);
146 }
147
148 // RawChannel ------------------------------------------------------------------
149
150 RawChannel::RawChannel()
151     : message_loop_for_io_(NULL),
152       delegate_(NULL),
153       read_stopped_(false),
154       write_stopped_(false),
155       weak_ptr_factory_(this) {
156 }
157
158 RawChannel::~RawChannel() {
159   DCHECK(!read_buffer_);
160   DCHECK(!write_buffer_);
161
162   // No need to take the |write_lock_| here -- if there are still weak pointers
163   // outstanding, then we're hosed anyway (since we wouldn't be able to
164   // invalidate them cleanly, since we might not be on the I/O thread).
165   DCHECK(!weak_ptr_factory_.HasWeakPtrs());
166 }
167
168 bool RawChannel::Init(Delegate* delegate) {
169   DCHECK(delegate);
170
171   DCHECK(!delegate_);
172   delegate_ = delegate;
173
174   CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
175   DCHECK(!message_loop_for_io_);
176   message_loop_for_io_ =
177       static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
178
179   // No need to take the lock. No one should be using us yet.
180   DCHECK(!read_buffer_);
181   read_buffer_.reset(new ReadBuffer);
182   DCHECK(!write_buffer_);
183   write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
184
185   if (!OnInit()) {
186     delegate_ = NULL;
187     message_loop_for_io_ = NULL;
188     read_buffer_.reset();
189     write_buffer_.reset();
190     return false;
191   }
192
193   return ScheduleRead() == IO_PENDING;
194 }
195
196 void RawChannel::Shutdown() {
197   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
198
199   base::AutoLock locker(write_lock_);
200
201   LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
202       << "Shutting down RawChannel with write buffer nonempty";
203
204   // Reset the delegate so that it won't receive further calls.
205   delegate_ = NULL;
206   read_stopped_ = true;
207   write_stopped_ = true;
208   weak_ptr_factory_.InvalidateWeakPtrs();
209
210   OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
211 }
212
213 // Reminder: This must be thread-safe.
214 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
215   DCHECK(message);
216
217   base::AutoLock locker(write_lock_);
218   if (write_stopped_)
219     return false;
220
221   if (!write_buffer_->message_queue_.empty()) {
222     write_buffer_->message_queue_.push_back(message.release());
223     return true;
224   }
225
226   write_buffer_->message_queue_.push_front(message.release());
227   DCHECK_EQ(write_buffer_->data_offset_, 0u);
228
229   size_t platform_handles_written = 0;
230   size_t bytes_written = 0;
231   IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
232   if (io_result == IO_PENDING)
233     return true;
234
235   bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
236                                        platform_handles_written,
237                                        bytes_written);
238   if (!result) {
239     // Even if we're on the I/O thread, don't call |OnFatalError()| in the
240     // nested context.
241     message_loop_for_io_->PostTask(
242         FROM_HERE,
243         base::Bind(&RawChannel::CallOnFatalError,
244                    weak_ptr_factory_.GetWeakPtr(),
245                    Delegate::FATAL_ERROR_FAILED_WRITE));
246   }
247
248   return result;
249 }
250
251 // Reminder: This must be thread-safe.
252 bool RawChannel::IsWriteBufferEmpty() {
253   base::AutoLock locker(write_lock_);
254   return write_buffer_->message_queue_.empty();
255 }
256
257 RawChannel::ReadBuffer* RawChannel::read_buffer() {
258   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
259   return read_buffer_.get();
260 }
261
262 RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
263   write_lock_.AssertAcquired();
264   return write_buffer_.get();
265 }
266
267 void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
268   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
269
270   if (read_stopped_) {
271     NOTREACHED();
272     return;
273   }
274
275   IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
276
277   // Keep reading data in a loop, and dispatch messages if enough data is
278   // received. Exit the loop if any of the following happens:
279   //   - one or more messages were dispatched;
280   //   - the last read failed, was a partial read or would block;
281   //   - |Shutdown()| was called.
282   do {
283     if (io_result != IO_SUCCEEDED) {
284       read_stopped_ = true;
285       CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
286       return;
287     }
288
289     read_buffer_->num_valid_bytes_ += bytes_read;
290
291     // Dispatch all the messages that we can.
292     bool did_dispatch_message = false;
293     // Tracks the offset of the first undispatched message in |read_buffer_|.
294     // Currently, we copy data to ensure that this is zero at the beginning.
295     size_t read_buffer_start = 0;
296     size_t remaining_bytes = read_buffer_->num_valid_bytes_;
297     size_t message_size;
298     // Note that we rely on short-circuit evaluation here:
299     //   - |read_buffer_start| may be an invalid index into
300     //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
301     //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
302     // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
303     // next read).
304     // TODO(vtl): Validate that |message_size| is sane.
305     while (remaining_bytes > 0 &&
306            MessageInTransit::GetNextMessageSize(
307                &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
308                &message_size) &&
309            remaining_bytes >= message_size) {
310       MessageInTransit::View
311           message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
312       DCHECK_EQ(message_view.total_size(), message_size);
313
314       // Dispatch the message.
315       DCHECK(delegate_);
316       delegate_->OnReadMessage(message_view);
317       if (read_stopped_) {
318         // |Shutdown()| was called in |OnReadMessage()|.
319         // TODO(vtl): Add test for this case.
320         return;
321       }
322       did_dispatch_message = true;
323
324       // Update our state.
325       read_buffer_start += message_size;
326       remaining_bytes -= message_size;
327     }
328
329     if (read_buffer_start > 0) {
330       // Move data back to start.
331       read_buffer_->num_valid_bytes_ = remaining_bytes;
332       if (read_buffer_->num_valid_bytes_ > 0) {
333         memmove(&read_buffer_->buffer_[0],
334                 &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
335       }
336       read_buffer_start = 0;
337     }
338
339     if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
340             kReadSize) {
341       // Use power-of-2 buffer sizes.
342       // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
343       // maximum message size to whatever extent necessary).
344       // TODO(vtl): We may often be able to peek at the header and get the real
345       // required extra space (which may be much bigger than |kReadSize|).
346       size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
347       while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
348         new_size *= 2;
349
350       // TODO(vtl): It's suboptimal to zero out the fresh memory.
351       read_buffer_->buffer_.resize(new_size, 0);
352     }
353
354     // (1) If we dispatched any messages, stop reading for now (and let the
355     // message loop do its thing for another round).
356     // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
357     // a single message. Risks: slower, more complex if we want to avoid lots of
358     // copying. ii. Keep reading until there's no more data and dispatch all the
359     // messages we can. Risks: starvation of other users of the message loop.)
360     // (2) If we didn't max out |kReadSize|, stop reading for now.
361     bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
362     bytes_read = 0;
363     io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
364   } while (io_result != IO_PENDING);
365 }
366
367 void RawChannel::OnWriteCompleted(bool result,
368                                   size_t platform_handles_written,
369                                   size_t bytes_written) {
370   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
371
372   bool did_fail = false;
373   {
374     base::AutoLock locker(write_lock_);
375     DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
376
377     if (write_stopped_) {
378       NOTREACHED();
379       return;
380     }
381
382     did_fail = !OnWriteCompletedNoLock(result,
383                                        platform_handles_written,
384                                        bytes_written);
385   }
386
387   if (did_fail)
388     CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
389 }
390
391 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
392   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
393   // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
394   if (delegate_)
395     delegate_->OnFatalError(fatal_error);
396 }
397
398 bool RawChannel::OnWriteCompletedNoLock(bool result,
399                                         size_t platform_handles_written,
400                                         size_t bytes_written) {
401   write_lock_.AssertAcquired();
402
403   DCHECK(!write_stopped_);
404   DCHECK(!write_buffer_->message_queue_.empty());
405
406   if (result) {
407     write_buffer_->platform_handles_offset_ += platform_handles_written;
408     write_buffer_->data_offset_ += bytes_written;
409
410     MessageInTransit* message = write_buffer_->message_queue_.front();
411     if (write_buffer_->data_offset_ >= message->total_size()) {
412       // Complete write.
413       DCHECK_EQ(write_buffer_->data_offset_, message->total_size());
414       write_buffer_->message_queue_.pop_front();
415       delete message;
416       write_buffer_->platform_handles_offset_ = 0;
417       write_buffer_->data_offset_ = 0;
418
419       if (write_buffer_->message_queue_.empty())
420         return true;
421     }
422
423     // Schedule the next write.
424     IOResult io_result = ScheduleWriteNoLock();
425     if (io_result == IO_PENDING)
426       return true;
427     DCHECK_EQ(io_result, IO_FAILED);
428   }
429
430   write_stopped_ = true;
431   STLDeleteElements(&write_buffer_->message_queue_);
432   write_buffer_->platform_handles_offset_ = 0;
433   write_buffer_->data_offset_ = 0;
434   return false;
435 }
436
437 }  // namespace system
438 }  // namespace mojo