Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / mojo / system / raw_channel_posix.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/raw_channel.h"
6
7 #include <errno.h>
8 #include <sys/uio.h>
9 #include <unistd.h>
10
11 #include <algorithm>
12 #include <deque>
13
14 #include "base/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/memory/scoped_ptr.h"
19 #include "base/memory/weak_ptr.h"
20 #include "base/message_loop/message_loop.h"
21 #include "base/synchronization/lock.h"
22 #include "mojo/embedder/platform_channel_utils_posix.h"
23 #include "mojo/embedder/platform_handle.h"
24 #include "mojo/embedder/platform_handle_vector.h"
25 #include "mojo/system/transport_data.h"
26
27 namespace mojo {
28 namespace system {
29
30 namespace {
31
32 class RawChannelPosix : public RawChannel,
33                         public base::MessageLoopForIO::Watcher {
34  public:
35   explicit RawChannelPosix(embedder::ScopedPlatformHandle handle);
36   virtual ~RawChannelPosix();
37
38   // |RawChannel| public methods:
39   virtual size_t GetSerializedPlatformHandleSize() const OVERRIDE;
40
41  private:
42   // |RawChannel| protected methods:
43   // Actually override this so that we can send multiple messages with (only)
44   // FDs if necessary.
45   virtual void EnqueueMessageNoLock(
46       scoped_ptr<MessageInTransit> message) OVERRIDE;
47   // Override this to handle those extra FD-only messages.
48   virtual bool OnReadMessageForRawChannel(
49       const MessageInTransit::View& message_view) OVERRIDE;
50   virtual IOResult Read(size_t* bytes_read) OVERRIDE;
51   virtual IOResult ScheduleRead() OVERRIDE;
52   virtual embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
53       size_t num_platform_handles,
54       const void* platform_handle_table) OVERRIDE;
55   virtual IOResult WriteNoLock(size_t* platform_handles_written,
56                                size_t* bytes_written) OVERRIDE;
57   virtual IOResult ScheduleWriteNoLock() OVERRIDE;
58   virtual bool OnInit() OVERRIDE;
59   virtual void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
60                                 scoped_ptr<WriteBuffer> write_buffer) OVERRIDE;
61
62   // |base::MessageLoopForIO::Watcher| implementation:
63   virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
64   virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
65
66   // Implements most of |Read()| (except for a bit of clean-up):
67   IOResult ReadImpl(size_t* bytes_read);
68
69   // Watches for |fd_| to become writable. Must be called on the I/O thread.
70   void WaitToWrite();
71
72   embedder::ScopedPlatformHandle fd_;
73
74   // The following members are only used on the I/O thread:
75   scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
76   scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
77
78   bool pending_read_;
79
80   std::deque<embedder::PlatformHandle> read_platform_handles_;
81
82   // The following members are used on multiple threads and protected by
83   // |write_lock()|:
84   bool pending_write_;
85
86   // This is used for posting tasks from write threads to the I/O thread. It
87   // must only be accessed under |write_lock_|. The weak pointers it produces
88   // are only used/invalidated on the I/O thread.
89   base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
90
91   DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
92 };
93
94 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle)
95     : fd_(handle.Pass()),
96       pending_read_(false),
97       pending_write_(false),
98       weak_ptr_factory_(this) {
99   DCHECK(fd_.is_valid());
100 }
101
102 RawChannelPosix::~RawChannelPosix() {
103   DCHECK(!pending_read_);
104   DCHECK(!pending_write_);
105
106   // No need to take the |write_lock()| here -- if there are still weak pointers
107   // outstanding, then we're hosed anyway (since we wouldn't be able to
108   // invalidate them cleanly, since we might not be on the I/O thread).
109   DCHECK(!weak_ptr_factory_.HasWeakPtrs());
110
111   // These must have been shut down/destroyed on the I/O thread.
112   DCHECK(!read_watcher_);
113   DCHECK(!write_watcher_);
114
115   embedder::CloseAllPlatformHandles(&read_platform_handles_);
116 }
117
118 size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
119   // We don't actually need any space on POSIX (since we just send FDs).
120   return 0;
121 }
122
123 void RawChannelPosix::EnqueueMessageNoLock(
124     scoped_ptr<MessageInTransit> message) {
125   if (message->transport_data()) {
126     embedder::PlatformHandleVector* const platform_handles =
127         message->transport_data()->platform_handles();
128     if (platform_handles &&
129         platform_handles->size() > embedder::kPlatformChannelMaxNumHandles) {
130       // We can't attach all the FDs to a single message, so we have to "split"
131       // the message. Send as many control messages as needed first with FDs
132       // attached (and no data).
133       size_t i = 0;
134       for (; platform_handles->size() - i >
135                  embedder::kPlatformChannelMaxNumHandles;
136            i += embedder::kPlatformChannelMaxNumHandles) {
137         scoped_ptr<MessageInTransit> fd_message(new MessageInTransit(
138             MessageInTransit::kTypeRawChannel,
139             MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles,
140             0,
141             NULL));
142         embedder::ScopedPlatformHandleVectorPtr fds(
143             new embedder::PlatformHandleVector(
144                 platform_handles->begin() + i,
145                 platform_handles->begin() + i +
146                     embedder::kPlatformChannelMaxNumHandles));
147         fd_message->SetTransportData(
148             make_scoped_ptr(new TransportData(fds.Pass())));
149         RawChannel::EnqueueMessageNoLock(fd_message.Pass());
150       }
151
152       // Remove the handles that we "moved" into the other messages.
153       platform_handles->erase(platform_handles->begin(),
154                               platform_handles->begin() + i);
155     }
156   }
157
158   RawChannel::EnqueueMessageNoLock(message.Pass());
159 }
160
161 bool RawChannelPosix::OnReadMessageForRawChannel(
162     const MessageInTransit::View& message_view) {
163   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeRawChannel);
164
165   if (message_view.subtype() ==
166       MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles) {
167     // We don't need to do anything. |RawChannel| won't extract the platform
168     // handles, and they'll be accumulated in |Read()|.
169     return true;
170   }
171
172   return RawChannel::OnReadMessageForRawChannel(message_view);
173 }
174
175 RawChannel::IOResult RawChannelPosix::Read(size_t* bytes_read) {
176   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
177   DCHECK(!pending_read_);
178
179   IOResult rv = ReadImpl(bytes_read);
180   if (rv != IO_SUCCEEDED && rv != IO_PENDING) {
181     // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
182     read_watcher_.reset();
183   }
184   return rv;
185 }
186
187 RawChannel::IOResult RawChannelPosix::ScheduleRead() {
188   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
189   DCHECK(!pending_read_);
190
191   pending_read_ = true;
192
193   return IO_PENDING;
194 }
195
196 embedder::ScopedPlatformHandleVectorPtr RawChannelPosix::GetReadPlatformHandles(
197     size_t num_platform_handles,
198     const void* /*platform_handle_table*/) {
199   DCHECK_GT(num_platform_handles, 0u);
200
201   if (read_platform_handles_.size() < num_platform_handles) {
202     embedder::CloseAllPlatformHandles(&read_platform_handles_);
203     read_platform_handles_.clear();
204     return embedder::ScopedPlatformHandleVectorPtr();
205   }
206
207   embedder::ScopedPlatformHandleVectorPtr rv(
208       new embedder::PlatformHandleVector(num_platform_handles));
209   rv->assign(read_platform_handles_.begin(),
210              read_platform_handles_.begin() + num_platform_handles);
211   read_platform_handles_.erase(
212       read_platform_handles_.begin(),
213       read_platform_handles_.begin() + num_platform_handles);
214   return rv.Pass();
215 }
216
217 RawChannel::IOResult RawChannelPosix::WriteNoLock(
218     size_t* platform_handles_written,
219     size_t* bytes_written) {
220   write_lock().AssertAcquired();
221
222   DCHECK(!pending_write_);
223
224   size_t num_platform_handles = 0;
225   ssize_t write_result;
226   if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
227     embedder::PlatformHandle* platform_handles;
228     void* serialization_data;  // Actually unused.
229     write_buffer_no_lock()->GetPlatformHandlesToSend(
230         &num_platform_handles, &platform_handles, &serialization_data);
231     DCHECK_GT(num_platform_handles, 0u);
232     DCHECK_LE(num_platform_handles, embedder::kPlatformChannelMaxNumHandles);
233     DCHECK(platform_handles);
234
235     // TODO(vtl): Reduce code duplication. (This is duplicated from below.)
236     std::vector<WriteBuffer::Buffer> buffers;
237     write_buffer_no_lock()->GetBuffers(&buffers);
238     DCHECK(!buffers.empty());
239     const size_t kMaxBufferCount = 10;
240     iovec iov[kMaxBufferCount];
241     size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
242     for (size_t i = 0; i < buffer_count; ++i) {
243       iov[i].iov_base = const_cast<char*>(buffers[i].addr);
244       iov[i].iov_len = buffers[i].size;
245     }
246
247     write_result = embedder::PlatformChannelSendmsgWithHandles(
248         fd_.get(), iov, buffer_count, platform_handles, num_platform_handles);
249     for (size_t i = 0; i < num_platform_handles; i++)
250       platform_handles[i].CloseIfNecessary();
251   } else {
252     std::vector<WriteBuffer::Buffer> buffers;
253     write_buffer_no_lock()->GetBuffers(&buffers);
254     DCHECK(!buffers.empty());
255
256     if (buffers.size() == 1) {
257       write_result = embedder::PlatformChannelWrite(
258           fd_.get(), buffers[0].addr, buffers[0].size);
259     } else {
260       const size_t kMaxBufferCount = 10;
261       iovec iov[kMaxBufferCount];
262       size_t buffer_count = std::min(buffers.size(), kMaxBufferCount);
263       for (size_t i = 0; i < buffer_count; ++i) {
264         iov[i].iov_base = const_cast<char*>(buffers[i].addr);
265         iov[i].iov_len = buffers[i].size;
266       }
267
268       write_result =
269           embedder::PlatformChannelWritev(fd_.get(), iov, buffer_count);
270     }
271   }
272
273   if (write_result >= 0) {
274     *platform_handles_written = num_platform_handles;
275     *bytes_written = static_cast<size_t>(write_result);
276     return IO_SUCCEEDED;
277   }
278
279   if (errno == EPIPE)
280     return IO_FAILED_SHUTDOWN;
281
282   if (errno != EAGAIN && errno != EWOULDBLOCK) {
283     PLOG(WARNING) << "sendmsg/write/writev";
284     return IO_FAILED_UNKNOWN;
285   }
286
287   return ScheduleWriteNoLock();
288 }
289
290 RawChannel::IOResult RawChannelPosix::ScheduleWriteNoLock() {
291   write_lock().AssertAcquired();
292
293   DCHECK(!pending_write_);
294
295   // Set up to wait for the FD to become writable.
296   // If we're not on the I/O thread, we have to post a task to do this.
297   if (base::MessageLoop::current() != message_loop_for_io()) {
298     message_loop_for_io()->PostTask(FROM_HERE,
299                                     base::Bind(&RawChannelPosix::WaitToWrite,
300                                                weak_ptr_factory_.GetWeakPtr()));
301     pending_write_ = true;
302     return IO_PENDING;
303   }
304
305   if (message_loop_for_io()->WatchFileDescriptor(
306           fd_.get().fd,
307           false,
308           base::MessageLoopForIO::WATCH_WRITE,
309           write_watcher_.get(),
310           this)) {
311     pending_write_ = true;
312     return IO_PENDING;
313   }
314
315   return IO_FAILED_UNKNOWN;
316 }
317
318 bool RawChannelPosix::OnInit() {
319   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
320
321   DCHECK(!read_watcher_);
322   read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
323   DCHECK(!write_watcher_);
324   write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
325
326   if (!message_loop_for_io()->WatchFileDescriptor(
327           fd_.get().fd,
328           true,
329           base::MessageLoopForIO::WATCH_READ,
330           read_watcher_.get(),
331           this)) {
332     // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
333     // (in the sense of returning the message loop's state to what it was before
334     // it was called).
335     read_watcher_.reset();
336     write_watcher_.reset();
337     return false;
338   }
339
340   return true;
341 }
342
343 void RawChannelPosix::OnShutdownNoLock(
344     scoped_ptr<ReadBuffer> /*read_buffer*/,
345     scoped_ptr<WriteBuffer> /*write_buffer*/) {
346   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
347   write_lock().AssertAcquired();
348
349   read_watcher_.reset();   // This will stop watching (if necessary).
350   write_watcher_.reset();  // This will stop watching (if necessary).
351
352   pending_read_ = false;
353   pending_write_ = false;
354
355   DCHECK(fd_.is_valid());
356   fd_.reset();
357
358   weak_ptr_factory_.InvalidateWeakPtrs();
359 }
360
361 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
362   DCHECK_EQ(fd, fd_.get().fd);
363   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
364
365   if (!pending_read_) {
366     NOTREACHED();
367     return;
368   }
369
370   pending_read_ = false;
371   size_t bytes_read = 0;
372   IOResult io_result = Read(&bytes_read);
373   if (io_result != IO_PENDING)
374     OnReadCompleted(io_result, bytes_read);
375
376   // On failure, |read_watcher_| must have been reset; on success,
377   // we assume that |OnReadCompleted()| always schedules another read.
378   // Otherwise, we could end up spinning -- getting
379   // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
380   // read.
381   // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
382   // schedule a new read. But that code won't be reached under the current
383   // RawChannel implementation.
384   DCHECK(!read_watcher_ || pending_read_);
385 }
386
387 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
388   DCHECK_EQ(fd, fd_.get().fd);
389   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
390
391   IOResult io_result;
392   size_t platform_handles_written = 0;
393   size_t bytes_written = 0;
394   {
395     base::AutoLock locker(write_lock());
396
397     DCHECK(pending_write_);
398
399     pending_write_ = false;
400     io_result = WriteNoLock(&platform_handles_written, &bytes_written);
401   }
402
403   if (io_result != IO_PENDING)
404     OnWriteCompleted(io_result, platform_handles_written, bytes_written);
405 }
406
407 RawChannel::IOResult RawChannelPosix::ReadImpl(size_t* bytes_read) {
408   char* buffer = NULL;
409   size_t bytes_to_read = 0;
410   read_buffer()->GetBuffer(&buffer, &bytes_to_read);
411
412   size_t old_num_platform_handles = read_platform_handles_.size();
413   ssize_t read_result = embedder::PlatformChannelRecvmsg(
414       fd_.get(), buffer, bytes_to_read, &read_platform_handles_);
415   if (read_platform_handles_.size() > old_num_platform_handles) {
416     DCHECK_LE(read_platform_handles_.size() - old_num_platform_handles,
417               embedder::kPlatformChannelMaxNumHandles);
418
419     // We should never accumulate more than |TransportData::kMaxPlatformHandles
420     // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
421     // possible because we could have accumulated all the handles for a message,
422     // then received the message data plus the first set of handles for the next
423     // message in the subsequent |recvmsg()|.)
424     if (read_platform_handles_.size() >
425         (TransportData::kMaxPlatformHandles +
426          embedder::kPlatformChannelMaxNumHandles)) {
427       LOG(ERROR) << "Received too many platform handles";
428       embedder::CloseAllPlatformHandles(&read_platform_handles_);
429       read_platform_handles_.clear();
430       return IO_FAILED_UNKNOWN;
431     }
432   }
433
434   if (read_result > 0) {
435     *bytes_read = static_cast<size_t>(read_result);
436     return IO_SUCCEEDED;
437   }
438
439   // |read_result == 0| means "end of file".
440   if (read_result == 0)
441     return IO_FAILED_SHUTDOWN;
442
443   if (errno == EAGAIN || errno == EWOULDBLOCK)
444     return ScheduleRead();
445
446   if (errno == ECONNRESET)
447     return IO_FAILED_BROKEN;
448
449   PLOG(WARNING) << "recvmsg";
450   return IO_FAILED_UNKNOWN;
451 }
452
453 void RawChannelPosix::WaitToWrite() {
454   DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
455
456   DCHECK(write_watcher_);
457
458   if (!message_loop_for_io()->WatchFileDescriptor(
459           fd_.get().fd,
460           false,
461           base::MessageLoopForIO::WATCH_WRITE,
462           write_watcher_.get(),
463           this)) {
464     {
465       base::AutoLock locker(write_lock());
466
467       DCHECK(pending_write_);
468       pending_write_ = false;
469     }
470     OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
471   }
472 }
473
474 }  // namespace
475
476 // -----------------------------------------------------------------------------
477
478 // Static factory method declared in raw_channel.h.
479 // static
480 scoped_ptr<RawChannel> RawChannel::Create(
481     embedder::ScopedPlatformHandle handle) {
482   return scoped_ptr<RawChannel>(new RawChannelPosix(handle.Pass()));
483 }
484
485 }  // namespace system
486 }  // namespace mojo