#include "mojo/system/raw_channel.h"
#include <errno.h>
-#include <sys/socket.h>
-#include <sys/types.h>
#include <sys/uio.h>
#include <unistd.h>
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
-#include "base/posix/eintr_wrapper.h"
#include "base/synchronization/lock.h"
-#include "build/build_config.h"
+#include "mojo/embedder/platform_channel_utils_posix.h"
#include "mojo/embedder/platform_handle.h"
namespace mojo {
class RawChannelPosix : public RawChannel,
public base::MessageLoopForIO::Watcher {
public:
- RawChannelPosix(embedder::ScopedPlatformHandle handle,
- Delegate* delegate,
- base::MessageLoopForIO* message_loop_for_io);
+ RawChannelPosix(embedder::ScopedPlatformHandle handle);
virtual ~RawChannelPosix();
+ // |RawChannel| public methods:
+ virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE;
+
private:
- // |RawChannel| implementation:
+ // |RawChannel| protected methods:
virtual IOResult Read(size_t* bytes_read) OVERRIDE;
virtual IOResult ScheduleRead() OVERRIDE;
- virtual IOResult WriteNoLock(size_t* bytes_written) OVERRIDE;
+ virtual IOResult WriteNoLock(size_t* platform_handles_written,
+ size_t* bytes_written) OVERRIDE;
virtual IOResult ScheduleWriteNoLock() OVERRIDE;
virtual bool OnInit() OVERRIDE;
virtual void OnShutdownNoLock(
DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
};
-RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle,
- Delegate* delegate,
- base::MessageLoopForIO* message_loop_for_io)
- : RawChannel(delegate, message_loop_for_io),
- fd_(handle.Pass()),
+RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle)
+ : fd_(handle.Pass()),
pending_read_(false),
pending_write_(false),
weak_ptr_factory_(this) {
DCHECK(!write_watcher_.get());
}
+size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
+ // We don't actually need any space on POSIX (since we just send FDs).
+ return 0;
+}
+
RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
DCHECK(!pending_read_);
size_t bytes_to_read = 0;
read_buffer()->GetBuffer(&buffer, &bytes_to_read);
- ssize_t read_result = HANDLE_EINTR(read(fd_.get().fd, buffer, bytes_to_read));
-
+ scoped_ptr<embedder::PlatformHandleVector> handles;
+ ssize_t read_result = embedder::PlatformChannelRecvmsg(fd_.get(),
+ buffer,
+ bytes_to_read,
+ &handles);
if (read_result > 0) {
*bytes_read = static_cast<size_t>(read_result);
return IO_SUCCEEDED;
}
+ if (handles) {
+ if (read_result != 1) {
+ LOG(WARNING) << "Invalid control message with handles";
+ return IO_FAILED;
+ }
+
+ // TODO(vtl): Implement this ("buffer" received handles). For now, just drop
+ // them on the floor. (Discard this message entirely.)
+ NOTIMPLEMENTED();
+ for (size_t i = 0; i < handles->size(); i++)
+ (*handles)[i].CloseIfNecessary();
+ return ScheduleRead();
+ }
+
// |read_result == 0| means "end of file".
if (read_result == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) {
- if (read_result != 0)
- PLOG(ERROR) << "read";
+ PLOG_IF(ERROR, read_result != 0) << "recvmsg";
// Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
read_watcher_.reset();
return IO_PENDING;
}
-RawChannel::IOResult RawChannelPosix::WriteNoLock(size_t* bytes_written) {
+RawChannel::IOResult RawChannelPosix::WriteNoLock(
+ size_t* platform_handles_written,
+ size_t* bytes_written) {
write_lock().AssertAcquired();
DCHECK(!pending_write_);
- std::vector<WriteBuffer::Buffer> buffers;
- write_buffer_no_lock()->GetBuffers(&buffers);
- DCHECK(!buffers.empty());
-
- ssize_t write_result = -1;
- if (buffers.size() == 1) {
- write_result = HANDLE_EINTR(
- write(fd_.get().fd, buffers[0].addr, buffers[0].size));
+ if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
+ size_t num_platform_handles;
+ embedder::PlatformHandle* platform_handles;
+ void* serialization_data; // Actually unused.
+ write_buffer_no_lock()->GetPlatformHandlesToSend(&num_platform_handles,
+ &platform_handles,
+ &serialization_data);
+ DCHECK_GT(num_platform_handles, 0u);
+ DCHECK(platform_handles);
+ DCHECK(serialization_data);
+
+ size_t num_to_send = std::min(num_platform_handles,
+ embedder::kPlatformChannelMaxNumHandles);
+ bool succeeded = embedder::PlatformChannelSendHandles(fd_.get(),
+ platform_handles,
+ num_to_send);
+ if (succeeded) {
+ *platform_handles_written = num_to_send;
+ *bytes_written = 0;
+ return IO_SUCCEEDED;
+ }
} else {
- // Note that using |writev()|/|sendmsg()| is measurably slower than using
- // |write()| -- at least in a microbenchmark -- but much faster than using
- // multiple |write()|s. (|sendmsg()| is also measurably slightly slower than
- // |writev()|.)
- //
- // On Linux, we need to use |sendmsg()| since it's the only way to suppress
- // |SIGPIPE| (on Mac, this is suppressed on the socket itself using
- // |setsockopt()|, since |MSG_NOSIGNAL| is not supported -- see
- // platform_channel_pair_posix.cc).
- const size_t kMaxBufferCount = 10;
- iovec iov[kMaxBufferCount];
- size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
-
- for (size_t i = 0; i < buffer_count; ++i) {
- iov[i].iov_base = const_cast<char*>(buffers[i].addr);
- iov[i].iov_len = buffers[i].size;
+ std::vector<WriteBuffer::Buffer> buffers;
+ write_buffer_no_lock()->GetBuffers(&buffers);
+ DCHECK(!buffers.empty());
+
+ ssize_t write_result;
+ if (buffers.size() == 1) {
+ write_result = embedder::PlatformChannelWrite(fd_.get(), buffers[0].addr,
+ buffers[0].size);
+ } else {
+ const size_t kMaxBufferCount = 10;
+ iovec iov[kMaxBufferCount];
+ size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
+
+ for (size_t i = 0; i < buffer_count; ++i) {
+ iov[i].iov_base = const_cast<char*>(buffers[i].addr);
+ iov[i].iov_len = buffers[i].size;
+ }
+
+ write_result = embedder::PlatformChannelWritev(fd_.get(), iov,
+ buffer_count);
}
- // On Mac, we can use |writev()|, which is slightly faster, but on Linux we
- // need to use |sendmsg()|. See comment above.
- // TODO(vtl): We should have an actual test that |SIGPIPE| is suppressed for
- // |RawChannelPosix|, since it has to be suppressed at "use" time on Linux.
- // Or maybe I should abstract out |write()|/|send()| and
- // |writev()|/|sendmsg()|. crbug.com/356195
-#if defined(OS_MACOSX)
- write_result = HANDLE_EINTR(writev(fd_.get().fd, iov, buffer_count));
-#else
- struct msghdr msg = {};
- msg.msg_iov = iov;
- msg.msg_iovlen = buffer_count;
- write_result = HANDLE_EINTR(sendmsg(fd_.get().fd, &msg, MSG_NOSIGNAL));
-#endif
- }
-
- if (write_result >= 0) {
- *bytes_written = static_cast<size_t>(write_result);
- return IO_SUCCEEDED;
+ if (write_result >= 0) {
+ *platform_handles_written = 0;
+ *bytes_written = static_cast<size_t>(write_result);
+ return IO_SUCCEEDED;
+ }
}
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- PLOG(ERROR) << "write of size "
- << write_buffer_no_lock()->GetTotalBytesToWrite();
+ PLOG(ERROR) << "sendmsg/write/writev";
return IO_FAILED;
}
return IO_PENDING;
}
- if (message_loop_for_io()->WatchFileDescriptor(
- fd_.get().fd, false, base::MessageLoopForIO::WATCH_WRITE,
- write_watcher_.get(), this)) {
+ if (message_loop_for_io()->WatchFileDescriptor(fd_.get().fd, false,
+ base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this)) {
pending_write_ = true;
return IO_PENDING;
}
DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
IOResult result = IO_FAILED;
+ size_t platform_handles_written = 0;
size_t bytes_written = 0;
{
base::AutoLock locker(write_lock());
DCHECK(pending_write_);
pending_write_ = false;
- result = WriteNoLock(&bytes_written);
+ result = WriteNoLock(&platform_handles_written, &bytes_written);
}
- if (result != IO_PENDING)
- OnWriteCompleted(result == IO_SUCCEEDED, bytes_written);
+ if (result != IO_PENDING) {
+ OnWriteCompleted(result == IO_SUCCEEDED,
+ platform_handles_written,
+ bytes_written);
+ }
}
void RawChannelPosix::WaitToWrite() {
DCHECK(pending_write_);
pending_write_ = false;
}
- OnWriteCompleted(false, 0);
+ OnWriteCompleted(false, 0, 0);
}
}
// Static factory method declared in raw_channel.h.
// static
-RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle,
- Delegate* delegate,
- base::MessageLoopForIO* message_loop_for_io) {
- return new RawChannelPosix(handle.Pass(), delegate, message_loop_for_io);
+scoped_ptr<RawChannel> RawChannel::Create(
+ embedder::ScopedPlatformHandle handle) {
+ return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass()));
}
} // namespace system