Upstream version 11.39.266.0
[platform/framework/web/crosswalk.git] / src / mojo / system / raw_channel.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <string.h>
8
9 #include <algorithm>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17 #include "mojo/system/transport_data.h"
18
19 namespace mojo {
20 namespace system {
21
22 const size_t kReadSize = 4096;
23
24 // RawChannel::ReadBuffer ------------------------------------------------------
25
26 RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
27 }
28
29 RawChannel::ReadBuffer::~ReadBuffer() {
30 }
31
32 void RawChannel::ReadBuffer::GetBuffer(char** addr, size_t* size) {
33   DCHECK_GE(buffer_.size(), num_valid_bytes_ + kReadSize);
34   *addr = &buffer_[0] + num_valid_bytes_;
35   *size = kReadSize;
36 }
37
38 // RawChannel::WriteBuffer -----------------------------------------------------
39
40 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size)
41     : serialized_platform_handle_size_(serialized_platform_handle_size),
42       platform_handles_offset_(0),
43       data_offset_(0) {
44 }
45
46 RawChannel::WriteBuffer::~WriteBuffer() {
47   STLDeleteElements(&message_queue_);
48 }
49
50 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
51   if (message_queue_.empty())
52     return false;
53
54   const TransportData* transport_data =
55       message_queue_.front()->transport_data();
56   if (!transport_data)
57     return false;
58
59   const embedder::PlatformHandleVector* all_platform_handles =
60       transport_data->platform_handles();
61   if (!all_platform_handles) {
62     DCHECK_EQ(platform_handles_offset_, 0u);
63     return false;
64   }
65   if (platform_handles_offset_ >= all_platform_handles->size()) {
66     DCHECK_EQ(platform_handles_offset_, all_platform_handles->size());
67     return false;
68   }
69
70   return true;
71 }
72
73 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
74     size_t* num_platform_handles,
75     embedder::PlatformHandle** platform_handles,
76     void** serialization_data) {
77   DCHECK(HavePlatformHandlesToSend());
78
79   TransportData* transport_data = message_queue_.front()->transport_data();
80   embedder::PlatformHandleVector* all_platform_handles =
81       transport_data->platform_handles();
82   *num_platform_handles =
83       all_platform_handles->size() - platform_handles_offset_;
84   *platform_handles = &(*all_platform_handles)[platform_handles_offset_];
85   size_t serialization_data_offset =
86       transport_data->platform_handle_table_offset();
87   DCHECK_GT(serialization_data_offset, 0u);
88   serialization_data_offset +=
89       platform_handles_offset_ * serialized_platform_handle_size_;
90   *serialization_data =
91       static_cast<char*>(transport_data->buffer()) + serialization_data_offset;
92 }
93
94 void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
95   buffers->clear();
96
97   if (message_queue_.empty())
98     return;
99
100   MessageInTransit* message = message_queue_.front();
101   DCHECK_LT(data_offset_, message->total_size());
102   size_t bytes_to_write = message->total_size() - data_offset_;
103
104   size_t transport_data_buffer_size =
105       message->transport_data() ? message->transport_data()->buffer_size() : 0;
106
107   if (!transport_data_buffer_size) {
108     // Only write from the main buffer.
109     DCHECK_LT(data_offset_, message->main_buffer_size());
110     DCHECK_LE(bytes_to_write, message->main_buffer_size());
111     Buffer buffer = {
112         static_cast<const char*>(message->main_buffer()) + data_offset_,
113         bytes_to_write};
114     buffers->push_back(buffer);
115     return;
116   }
117
118   if (data_offset_ >= message->main_buffer_size()) {
119     // Only write from the transport data buffer.
120     DCHECK_LT(data_offset_ - message->main_buffer_size(),
121               transport_data_buffer_size);
122     DCHECK_LE(bytes_to_write, transport_data_buffer_size);
123     Buffer buffer = {
124         static_cast<const char*>(message->transport_data()->buffer()) +
125             (data_offset_ - message->main_buffer_size()),
126         bytes_to_write};
127     buffers->push_back(buffer);
128     return;
129   }
130
131   // TODO(vtl): We could actually send out buffers from multiple messages, with
132   // the "stopping" condition being reaching a message with platform handles
133   // attached.
134
135   // Write from both buffers.
136   DCHECK_EQ(
137       bytes_to_write,
138       message->main_buffer_size() - data_offset_ + transport_data_buffer_size);
139   Buffer buffer1 = {
140       static_cast<const char*>(message->main_buffer()) + data_offset_,
141       message->main_buffer_size() - data_offset_};
142   buffers->push_back(buffer1);
143   Buffer buffer2 = {
144       static_cast<const char*>(message->transport_data()->buffer()),
145       transport_data_buffer_size};
146   buffers->push_back(buffer2);
147 }
148
149 // RawChannel ------------------------------------------------------------------
150
151 RawChannel::RawChannel()
152     : message_loop_for_io_(nullptr),
153       delegate_(nullptr),
154       read_stopped_(false),
155       write_stopped_(false),
156       weak_ptr_factory_(this) {
157 }
158
159 RawChannel::~RawChannel() {
160   DCHECK(!read_buffer_);
161   DCHECK(!write_buffer_);
162
163   // No need to take the |write_lock_| here -- if there are still weak pointers
164   // outstanding, then we're hosed anyway (since we wouldn't be able to
165   // invalidate them cleanly, since we might not be on the I/O thread).
166   DCHECK(!weak_ptr_factory_.HasWeakPtrs());
167 }
168
169 bool RawChannel::Init(Delegate* delegate) {
170   DCHECK(delegate);
171
172   DCHECK(!delegate_);
173   delegate_ = delegate;
174
175   CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO);
176   DCHECK(!message_loop_for_io_);
177   message_loop_for_io_ =
178       static_cast<base::MessageLoopForIO*>(base::MessageLoop::current());
179
180   // No need to take the lock. No one should be using us yet.
181   DCHECK(!read_buffer_);
182   read_buffer_.reset(new ReadBuffer);
183   DCHECK(!write_buffer_);
184   write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
185
186   if (!OnInit()) {
187     delegate_ = nullptr;
188     message_loop_for_io_ = nullptr;
189     read_buffer_.reset();
190     write_buffer_.reset();
191     return false;
192   }
193
194   IOResult io_result = ScheduleRead();
195   if (io_result != IO_PENDING) {
196     // This will notify the delegate about the read failure. Although we're on
197     // the I/O thread, don't call it in the nested context.
198     message_loop_for_io_->PostTask(FROM_HERE,
199                                    base::Bind(&RawChannel::OnReadCompleted,
200                                               weak_ptr_factory_.GetWeakPtr(),
201                                               io_result,
202                                               0));
203   }
204
205   // ScheduleRead() failure is treated as a read failure (by notifying the
206   // delegate), not as an init failure.
207   return true;
208 }
209
210 void RawChannel::Shutdown() {
211   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
212
213   base::AutoLock locker(write_lock_);
214
215   LOG_IF(WARNING, !write_buffer_->message_queue_.empty())
216       << "Shutting down RawChannel with write buffer nonempty";
217
218   // Reset the delegate so that it won't receive further calls.
219   delegate_ = nullptr;
220   read_stopped_ = true;
221   write_stopped_ = true;
222   weak_ptr_factory_.InvalidateWeakPtrs();
223
224   OnShutdownNoLock(read_buffer_.Pass(), write_buffer_.Pass());
225 }
226
227 // Reminder: This must be thread-safe.
228 bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
229   DCHECK(message);
230
231   base::AutoLock locker(write_lock_);
232   if (write_stopped_)
233     return false;
234
235   if (!write_buffer_->message_queue_.empty()) {
236     EnqueueMessageNoLock(message.Pass());
237     return true;
238   }
239
240   EnqueueMessageNoLock(message.Pass());
241   DCHECK_EQ(write_buffer_->data_offset_, 0u);
242
243   size_t platform_handles_written = 0;
244   size_t bytes_written = 0;
245   IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
246   if (io_result == IO_PENDING)
247     return true;
248
249   bool result = OnWriteCompletedNoLock(
250       io_result, platform_handles_written, bytes_written);
251   if (!result) {
252     // Even if we're on the I/O thread, don't call |OnError()| in the nested
253     // context.
254     message_loop_for_io_->PostTask(FROM_HERE,
255                                    base::Bind(&RawChannel::CallOnError,
256                                               weak_ptr_factory_.GetWeakPtr(),
257                                               Delegate::ERROR_WRITE));
258   }
259
260   return result;
261 }
262
263 // Reminder: This must be thread-safe.
264 bool RawChannel::IsWriteBufferEmpty() {
265   base::AutoLock locker(write_lock_);
266   return write_buffer_->message_queue_.empty();
267 }
268
269 void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
270   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
271
272   if (read_stopped_) {
273     NOTREACHED();
274     return;
275   }
276
277   // Keep reading data in a loop, and dispatch messages if enough data is
278   // received. Exit the loop if any of the following happens:
279   //   - one or more messages were dispatched;
280   //   - the last read failed, was a partial read or would block;
281   //   - |Shutdown()| was called.
282   do {
283     switch (io_result) {
284       case IO_SUCCEEDED:
285         break;
286       case IO_FAILED_SHUTDOWN:
287       case IO_FAILED_BROKEN:
288       case IO_FAILED_UNKNOWN:
289         read_stopped_ = true;
290         CallOnError(ReadIOResultToError(io_result));
291         return;
292       case IO_PENDING:
293         NOTREACHED();
294         return;
295     }
296
297     read_buffer_->num_valid_bytes_ += bytes_read;
298
299     // Dispatch all the messages that we can.
300     bool did_dispatch_message = false;
301     // Tracks the offset of the first undispatched message in |read_buffer_|.
302     // Currently, we copy data to ensure that this is zero at the beginning.
303     size_t read_buffer_start = 0;
304     size_t remaining_bytes = read_buffer_->num_valid_bytes_;
305     size_t message_size;
306     // Note that we rely on short-circuit evaluation here:
307     //   - |read_buffer_start| may be an invalid index into
308     //     |read_buffer_->buffer_| if |remaining_bytes| is zero.
309     //   - |message_size| is only valid if |GetNextMessageSize()| returns true.
310     // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
311     // next read).
312     // TODO(vtl): Validate that |message_size| is sane.
313     while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
314                                       &read_buffer_->buffer_[read_buffer_start],
315                                       remaining_bytes,
316                                       &message_size) &&
317            remaining_bytes >= message_size) {
318       MessageInTransit::View message_view(
319           message_size, &read_buffer_->buffer_[read_buffer_start]);
320       DCHECK_EQ(message_view.total_size(), message_size);
321
322       const char* error_message = nullptr;
323       if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
324                                 &error_message)) {
325         DCHECK(error_message);
326         LOG(ERROR) << "Received invalid message: " << error_message;
327         read_stopped_ = true;
328         CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
329         return;
330       }
331
332       if (message_view.type() == MessageInTransit::kTypeRawChannel) {
333         if (!OnReadMessageForRawChannel(message_view)) {
334           read_stopped_ = true;
335           CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
336           return;
337         }
338       } else {
339         embedder::ScopedPlatformHandleVectorPtr platform_handles;
340         if (message_view.transport_data_buffer()) {
341           size_t num_platform_handles;
342           const void* platform_handle_table;
343           TransportData::GetPlatformHandleTable(
344               message_view.transport_data_buffer(),
345               &num_platform_handles,
346               &platform_handle_table);
347
348           if (num_platform_handles > 0) {
349             platform_handles =
350                 GetReadPlatformHandles(num_platform_handles,
351                                        platform_handle_table).Pass();
352             if (!platform_handles) {
353               LOG(ERROR) << "Invalid number of platform handles received";
354               read_stopped_ = true;
355               CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
356               return;
357             }
358           }
359         }
360
361         // TODO(vtl): In the case that we aren't expecting any platform handles,
362         // for the POSIX implementation, we should confirm that none are stored.
363
364         // Dispatch the message.
365         DCHECK(delegate_);
366         delegate_->OnReadMessage(message_view, platform_handles.Pass());
367         if (read_stopped_) {
368           // |Shutdown()| was called in |OnReadMessage()|.
369           // TODO(vtl): Add test for this case.
370           return;
371         }
372       }
373
374       did_dispatch_message = true;
375
376       // Update our state.
377       read_buffer_start += message_size;
378       remaining_bytes -= message_size;
379     }
380
381     if (read_buffer_start > 0) {
382       // Move data back to start.
383       read_buffer_->num_valid_bytes_ = remaining_bytes;
384       if (read_buffer_->num_valid_bytes_ > 0) {
385         memmove(&read_buffer_->buffer_[0],
386                 &read_buffer_->buffer_[read_buffer_start],
387                 remaining_bytes);
388       }
389       read_buffer_start = 0;
390     }
391
392     if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
393         kReadSize) {
394       // Use power-of-2 buffer sizes.
395       // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
396       // maximum message size to whatever extent necessary).
397       // TODO(vtl): We may often be able to peek at the header and get the real
398       // required extra space (which may be much bigger than |kReadSize|).
399       size_t new_size = std::max(read_buffer_->buffer_.size(), kReadSize);
400       while (new_size < read_buffer_->num_valid_bytes_ + kReadSize)
401         new_size *= 2;
402
403       // TODO(vtl): It's suboptimal to zero out the fresh memory.
404       read_buffer_->buffer_.resize(new_size, 0);
405     }
406
407     // (1) If we dispatched any messages, stop reading for now (and let the
408     // message loop do its thing for another round).
409     // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
410     // a single message. Risks: slower, more complex if we want to avoid lots of
411     // copying. ii. Keep reading until there's no more data and dispatch all the
412     // messages we can. Risks: starvation of other users of the message loop.)
413     // (2) If we didn't max out |kReadSize|, stop reading for now.
414     bool schedule_for_later = did_dispatch_message || bytes_read < kReadSize;
415     bytes_read = 0;
416     io_result = schedule_for_later ? ScheduleRead() : Read(&bytes_read);
417   } while (io_result != IO_PENDING);
418 }
419
420 void RawChannel::OnWriteCompleted(IOResult io_result,
421                                   size_t platform_handles_written,
422                                   size_t bytes_written) {
423   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
424   DCHECK_NE(io_result, IO_PENDING);
425
426   bool did_fail = false;
427   {
428     base::AutoLock locker(write_lock_);
429     DCHECK_EQ(write_stopped_, write_buffer_->message_queue_.empty());
430
431     if (write_stopped_) {
432       NOTREACHED();
433       return;
434     }
435
436     did_fail = !OnWriteCompletedNoLock(
437                    io_result, platform_handles_written, bytes_written);
438   }
439
440   if (did_fail)
441     CallOnError(Delegate::ERROR_WRITE);
442 }
443
444 void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
445   write_lock_.AssertAcquired();
446   write_buffer_->message_queue_.push_back(message.release());
447 }
448
449 bool RawChannel::OnReadMessageForRawChannel(
450     const MessageInTransit::View& message_view) {
451   // No non-implementation specific |RawChannel| control messages.
452   LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
453              << ")";
454   return false;
455 }
456
457 // static
458 RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
459     IOResult io_result) {
460   switch (io_result) {
461     case IO_FAILED_SHUTDOWN:
462       return Delegate::ERROR_READ_SHUTDOWN;
463     case IO_FAILED_BROKEN:
464       return Delegate::ERROR_READ_BROKEN;
465     case IO_FAILED_UNKNOWN:
466       return Delegate::ERROR_READ_UNKNOWN;
467     case IO_SUCCEEDED:
468     case IO_PENDING:
469       NOTREACHED();
470       break;
471   }
472   return Delegate::ERROR_READ_UNKNOWN;
473 }
474
475 void RawChannel::CallOnError(Delegate::Error error) {
476   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
477   // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
478   if (delegate_)
479     delegate_->OnError(error);
480 }
481
482 bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
483                                         size_t platform_handles_written,
484                                         size_t bytes_written) {
485   write_lock_.AssertAcquired();
486
487   DCHECK(!write_stopped_);
488   DCHECK(!write_buffer_->message_queue_.empty());
489
490   if (io_result == IO_SUCCEEDED) {
491     write_buffer_->platform_handles_offset_ += platform_handles_written;
492     write_buffer_->data_offset_ += bytes_written;
493
494     MessageInTransit* message = write_buffer_->message_queue_.front();
495     if (write_buffer_->data_offset_ >= message->total_size()) {
496       // Complete write.
497       CHECK_EQ(write_buffer_->data_offset_, message->total_size());
498       write_buffer_->message_queue_.pop_front();
499       delete message;
500       write_buffer_->platform_handles_offset_ = 0;
501       write_buffer_->data_offset_ = 0;
502
503       if (write_buffer_->message_queue_.empty())
504         return true;
505     }
506
507     // Schedule the next write.
508     io_result = ScheduleWriteNoLock();
509     if (io_result == IO_PENDING)
510       return true;
511     DCHECK_NE(io_result, IO_SUCCEEDED);
512   }
513
514   write_stopped_ = true;
515   STLDeleteElements(&write_buffer_->message_queue_);
516   write_buffer_->platform_handles_offset_ = 0;
517   write_buffer_->data_offset_ = 0;
518   return false;
519 }
520
521 }  // namespace system
522 }  // namespace mojo