Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / mojo / system / raw_channel.cc
index 548f8af..6533af9 100644 (file)
@@ -23,9 +23,7 @@ const size_t kReadSize = 4096;
 
 // RawChannel::ReadBuffer ------------------------------------------------------
 
-RawChannel::ReadBuffer::ReadBuffer()
-    : buffer_(kReadSize),
-      num_valid_bytes_(0) {
+RawChannel::ReadBuffer::ReadBuffer() : buffer_(kReadSize), num_valid_bytes_(0) {
 }
 
 RawChannel::ReadBuffer::~ReadBuffer() {
@@ -58,7 +56,7 @@ bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
   if (!transport_data)
     return false;
 
-  const std::vector<embedder::PlatformHandle>* all_platform_handles =
+  const embedder::PlatformHandleVector* all_platform_handles =
       transport_data->platform_handles();
   if (!all_platform_handles) {
     DCHECK_EQ(platform_handles_offset_, 0u);
@@ -79,7 +77,7 @@ void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
   DCHECK(HavePlatformHandlesToSend());
 
   TransportData* transport_data = message_queue_.front()->transport_data();
-  std::vector<embedder::PlatformHandle>* all_platform_handles =
+  embedder::PlatformHandleVector* all_platform_handles =
       transport_data->platform_handles();
   *num_platform_handles =
       all_platform_handles->size() - platform_handles_offset_;
@@ -103,8 +101,8 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
   DCHECK_LT(data_offset_, message->total_size());
   size_t bytes_to_write = message->total_size() - data_offset_;
 
-  size_t transport_data_buffer_size = message->transport_data() ?
-      message->transport_data()->buffer_size() : 0;
+  size_t transport_data_buffer_size =
+      message->transport_data() ? message->transport_data()->buffer_size() : 0;
 
   if (!transport_data_buffer_size) {
     // Only write from the main buffer.
@@ -130,26 +128,29 @@ void RawChannel::WriteBuffer::GetBuffers(std::vector<Buffer>* buffers) const {
     return;
   }
 
+  // TODO(vtl): We could actually send out buffers from multiple messages, with
+  // the "stopping" condition being reaching a message with platform handles
+  // attached.
+
   // Write from both buffers.
-  DCHECK_EQ(bytes_to_write, message->main_buffer_size() - data_offset_ +
-                                transport_data_buffer_size);
+  DCHECK_EQ(
+      bytes_to_write,
+      message->main_buffer_size() - data_offset_ + transport_data_buffer_size);
   Buffer buffer1 = {
-    static_cast<const char*>(message->main_buffer()) + data_offset_,
-    message->main_buffer_size() - data_offset_
-  };
+      static_cast<const char*>(message->main_buffer()) + data_offset_,
+      message->main_buffer_size() - data_offset_};
   buffers->push_back(buffer1);
   Buffer buffer2 = {
-    static_cast<const char*>(message->transport_data()->buffer()),
-    transport_data_buffer_size
-  };
+      static_cast<const char*>(message->transport_data()->buffer()),
+      transport_data_buffer_size};
   buffers->push_back(buffer2);
 }
 
 // RawChannel ------------------------------------------------------------------
 
 RawChannel::RawChannel()
-    : message_loop_for_io_(NULL),
-      delegate_(NULL),
+    : message_loop_for_io_(nullptr),
+      delegate_(nullptr),
       read_stopped_(false),
       write_stopped_(false),
       weak_ptr_factory_(this) {
@@ -183,14 +184,27 @@ bool RawChannel::Init(Delegate* delegate) {
   write_buffer_.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
 
   if (!OnInit()) {
-    delegate_ = NULL;
-    message_loop_for_io_ = NULL;
+    delegate_ = nullptr;
+    message_loop_for_io_ = nullptr;
     read_buffer_.reset();
     write_buffer_.reset();
     return false;
   }
 
-  return ScheduleRead() == IO_PENDING;
+  IOResult io_result = ScheduleRead();
+  if (io_result != IO_PENDING) {
+    // This will notify the delegate about the read failure. Although we're on
+    // the I/O thread, don't call it in the nested context.
+    message_loop_for_io_->PostTask(FROM_HERE,
+                                   base::Bind(&RawChannel::OnReadCompleted,
+                                              weak_ptr_factory_.GetWeakPtr(),
+                                              io_result,
+                                              0));
+  }
+
+  // ScheduleRead() failure is treated as a read failure (by notifying the
+  // delegate), not as an init failure.
+  return true;
 }
 
 void RawChannel::Shutdown() {
@@ -202,7 +216,7 @@ void RawChannel::Shutdown() {
       << "Shutting down RawChannel with write buffer nonempty";
 
   // Reset the delegate so that it won't receive further calls.
-  delegate_ = NULL;
+  delegate_ = nullptr;
   read_stopped_ = true;
   write_stopped_ = true;
   weak_ptr_factory_.InvalidateWeakPtrs();
@@ -219,11 +233,11 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
     return false;
 
   if (!write_buffer_->message_queue_.empty()) {
-    write_buffer_->message_queue_.push_back(message.release());
+    EnqueueMessageNoLock(message.Pass());
     return true;
   }
 
-  write_buffer_->message_queue_.push_front(message.release());
+  EnqueueMessageNoLock(message.Pass());
   DCHECK_EQ(write_buffer_->data_offset_, 0u);
 
   size_t platform_handles_written = 0;
@@ -232,17 +246,15 @@ bool RawChannel::WriteMessage(scoped_ptr<MessageInTransit> message) {
   if (io_result == IO_PENDING)
     return true;
 
-  bool result = OnWriteCompletedNoLock(io_result == IO_SUCCEEDED,
-                                       platform_handles_written,
-                                       bytes_written);
+  bool result = OnWriteCompletedNoLock(
+      io_result, platform_handles_written, bytes_written);
   if (!result) {
-    // Even if we're on the I/O thread, don't call |OnFatalError()| in the
-    // nested context.
-    message_loop_for_io_->PostTask(
-        FROM_HERE,
-        base::Bind(&RawChannel::CallOnFatalError,
-                   weak_ptr_factory_.GetWeakPtr(),
-                   Delegate::FATAL_ERROR_FAILED_WRITE));
+    // Even if we're on the I/O thread, don't call |OnError()| in the nested
+    // context.
+    message_loop_for_io_->PostTask(FROM_HERE,
+                                   base::Bind(&RawChannel::CallOnError,
+                                              weak_ptr_factory_.GetWeakPtr(),
+                                              Delegate::ERROR_WRITE));
   }
 
   return result;
@@ -254,17 +266,7 @@ bool RawChannel::IsWriteBufferEmpty() {
   return write_buffer_->message_queue_.empty();
 }
 
-RawChannel::ReadBuffer* RawChannel::read_buffer() {
-  DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
-  return read_buffer_.get();
-}
-
-RawChannel::WriteBuffer* RawChannel::write_buffer_no_lock() {
-  write_lock_.AssertAcquired();
-  return write_buffer_.get();
-}
-
-void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
+void RawChannel::OnReadCompleted(IOResult io_result, size_t bytes_read) {
   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
 
   if (read_stopped_) {
@@ -272,18 +274,24 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
     return;
   }
 
-  IOResult io_result = result ? IO_SUCCEEDED : IO_FAILED;
-
   // Keep reading data in a loop, and dispatch messages if enough data is
   // received. Exit the loop if any of the following happens:
   //   - one or more messages were dispatched;
   //   - the last read failed, was a partial read or would block;
   //   - |Shutdown()| was called.
   do {
-    if (io_result != IO_SUCCEEDED) {
-      read_stopped_ = true;
-      CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
-      return;
+    switch (io_result) {
+      case IO_SUCCEEDED:
+        break;
+      case IO_FAILED_SHUTDOWN:
+      case IO_FAILED_BROKEN:
+      case IO_FAILED_UNKNOWN:
+        read_stopped_ = true;
+        CallOnError(ReadIOResultToError(io_result));
+        return;
+      case IO_PENDING:
+        NOTREACHED();
+        return;
     }
 
     read_buffer_->num_valid_bytes_ += bytes_read;
@@ -302,23 +310,67 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
     // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
     // next read).
     // TODO(vtl): Validate that |message_size| is sane.
-    while (remaining_bytes > 0 &&
-           MessageInTransit::GetNextMessageSize(
-               &read_buffer_->buffer_[read_buffer_start], remaining_bytes,
-               &message_size) &&
+    while (remaining_bytes > 0 && MessageInTransit::GetNextMessageSize(
+                                      &read_buffer_->buffer_[read_buffer_start],
+                                      remaining_bytes,
+                                      &message_size) &&
            remaining_bytes >= message_size) {
-      MessageInTransit::View
-          message_view(message_size, &read_buffer_->buffer_[read_buffer_start]);
+      MessageInTransit::View message_view(
+          message_size, &read_buffer_->buffer_[read_buffer_start]);
       DCHECK_EQ(message_view.total_size(), message_size);
 
-      // Dispatch the message.
-      DCHECK(delegate_);
-      delegate_->OnReadMessage(message_view);
-      if (read_stopped_) {
-        // |Shutdown()| was called in |OnReadMessage()|.
-        // TODO(vtl): Add test for this case.
+      const char* error_message = nullptr;
+      if (!message_view.IsValid(GetSerializedPlatformHandleSize(),
+                                &error_message)) {
+        DCHECK(error_message);
+        LOG(ERROR) << "Received invalid message: " << error_message;
+        read_stopped_ = true;
+        CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
         return;
       }
+
+      if (message_view.type() == MessageInTransit::kTypeRawChannel) {
+        if (!OnReadMessageForRawChannel(message_view)) {
+          read_stopped_ = true;
+          CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+          return;
+        }
+      } else {
+        embedder::ScopedPlatformHandleVectorPtr platform_handles;
+        if (message_view.transport_data_buffer()) {
+          size_t num_platform_handles;
+          const void* platform_handle_table;
+          TransportData::GetPlatformHandleTable(
+              message_view.transport_data_buffer(),
+              &num_platform_handles,
+              &platform_handle_table);
+
+          if (num_platform_handles > 0) {
+            platform_handles =
+                GetReadPlatformHandles(num_platform_handles,
+                                       platform_handle_table).Pass();
+            if (!platform_handles) {
+              LOG(ERROR) << "Invalid number of platform handles received";
+              read_stopped_ = true;
+              CallOnError(Delegate::ERROR_READ_BAD_MESSAGE);
+              return;
+            }
+          }
+        }
+
+        // TODO(vtl): In the case that we aren't expecting any platform handles,
+        // for the POSIX implementation, we should confirm that none are stored.
+
+        // Dispatch the message.
+        DCHECK(delegate_);
+        delegate_->OnReadMessage(message_view, platform_handles.Pass());
+        if (read_stopped_) {
+          // |Shutdown()| was called in |OnReadMessage()|.
+          // TODO(vtl): Add test for this case.
+          return;
+        }
+      }
+
       did_dispatch_message = true;
 
       // Update our state.
@@ -331,13 +383,14 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
       read_buffer_->num_valid_bytes_ = remaining_bytes;
       if (read_buffer_->num_valid_bytes_ > 0) {
         memmove(&read_buffer_->buffer_[0],
-                &read_buffer_->buffer_[read_buffer_start], remaining_bytes);
+                &read_buffer_->buffer_[read_buffer_start],
+                remaining_bytes);
       }
       read_buffer_start = 0;
     }
 
     if (read_buffer_->buffer_.size() - read_buffer_->num_valid_bytes_ <
-            kReadSize) {
+        kReadSize) {
       // Use power-of-2 buffer sizes.
       // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
       // maximum message size to whatever extent necessary).
@@ -364,10 +417,11 @@ void RawChannel::OnReadCompleted(bool result, size_t bytes_read) {
   } while (io_result != IO_PENDING);
 }
 
-void RawChannel::OnWriteCompleted(bool result,
+void RawChannel::OnWriteCompleted(IOResult io_result,
                                   size_t platform_handles_written,
                                   size_t bytes_written) {
   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
+  DCHECK_NE(io_result, IO_PENDING);
 
   bool did_fail = false;
   {
@@ -379,23 +433,53 @@ void RawChannel::OnWriteCompleted(bool result,
       return;
     }
 
-    did_fail = !OnWriteCompletedNoLock(result,
-                                       platform_handles_written,
-                                       bytes_written);
+    did_fail = !OnWriteCompletedNoLock(
+                   io_result, platform_handles_written, bytes_written);
   }
 
   if (did_fail)
-    CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
+    CallOnError(Delegate::ERROR_WRITE);
+}
+
+void RawChannel::EnqueueMessageNoLock(scoped_ptr<MessageInTransit> message) {
+  write_lock_.AssertAcquired();
+  write_buffer_->message_queue_.push_back(message.release());
+}
+
+bool RawChannel::OnReadMessageForRawChannel(
+    const MessageInTransit::View& message_view) {
+  // No non-implementation specific |RawChannel| control messages.
+  LOG(ERROR) << "Invalid control message (subtype " << message_view.subtype()
+             << ")";
+  return false;
+}
+
+// static
+RawChannel::Delegate::Error RawChannel::ReadIOResultToError(
+    IOResult io_result) {
+  switch (io_result) {
+    case IO_FAILED_SHUTDOWN:
+      return Delegate::ERROR_READ_SHUTDOWN;
+    case IO_FAILED_BROKEN:
+      return Delegate::ERROR_READ_BROKEN;
+    case IO_FAILED_UNKNOWN:
+      return Delegate::ERROR_READ_UNKNOWN;
+    case IO_SUCCEEDED:
+    case IO_PENDING:
+      NOTREACHED();
+      break;
+  }
+  return Delegate::ERROR_READ_UNKNOWN;
 }
 
-void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error) {
+void RawChannel::CallOnError(Delegate::Error error) {
   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_);
   // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
   if (delegate_)
-    delegate_->OnFatalError(fatal_error);
+    delegate_->OnError(error);
 }
 
-bool RawChannel::OnWriteCompletedNoLock(bool result,
+bool RawChannel::OnWriteCompletedNoLock(IOResult io_result,
                                         size_t platform_handles_written,
                                         size_t bytes_written) {
   write_lock_.AssertAcquired();
@@ -403,14 +487,14 @@ bool RawChannel::OnWriteCompletedNoLock(bool result,
   DCHECK(!write_stopped_);
   DCHECK(!write_buffer_->message_queue_.empty());
 
-  if (result) {
+  if (io_result == IO_SUCCEEDED) {
     write_buffer_->platform_handles_offset_ += platform_handles_written;
     write_buffer_->data_offset_ += bytes_written;
 
     MessageInTransit* message = write_buffer_->message_queue_.front();
     if (write_buffer_->data_offset_ >= message->total_size()) {
       // Complete write.
-      DCHECK_EQ(write_buffer_->data_offset_, message->total_size());
+      CHECK_EQ(write_buffer_->data_offset_, message->total_size());
       write_buffer_->message_queue_.pop_front();
       delete message;
       write_buffer_->platform_handles_offset_ = 0;
@@ -421,10 +505,10 @@ bool RawChannel::OnWriteCompletedNoLock(bool result,
     }
 
     // Schedule the next write.
-    IOResult io_result = ScheduleWriteNoLock();
+    io_result = ScheduleWriteNoLock();
     if (io_result == IO_PENDING)
       return true;
-    DCHECK_EQ(io_result, IO_FAILED);
+    DCHECK_NE(io_result, IO_SUCCEEDED);
   }
 
   write_stopped_ = true;