- add sources.
[platform/framework/web/crosswalk.git] / src / mojo / system / raw_channel_posix_unittest.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // TODO(vtl): Factor out the POSIX-specific bits of this test (once we have a
6 // non-POSIX implementation).
7
8 #include "mojo/system/raw_channel.h"
9
10 #include <fcntl.h>
11 #include <stdint.h>
12 #include <sys/socket.h>
13 #include <sys/types.h>
14 #include <unistd.h>
15
16 #include <vector>
17
18 #include "base/basictypes.h"
19 #include "base/bind.h"
20 #include "base/callback.h"
21 #include "base/compiler_specific.h"
22 #include "base/location.h"
23 #include "base/logging.h"
24 #include "base/memory/scoped_ptr.h"
25 #include "base/memory/scoped_vector.h"
26 #include "base/message_loop/message_loop.h"
27 #include "base/posix/eintr_wrapper.h"
28 #include "base/rand_util.h"
29 #include "base/synchronization/lock.h"
30 #include "base/synchronization/waitable_event.h"
31 #include "base/threading/platform_thread.h"  // For |Sleep()|.
32 #include "base/threading/simple_thread.h"
33 #include "base/threading/thread.h"
34 #include "base/time/time.h"
35 #include "mojo/system/message_in_transit.h"
36 #include "mojo/system/platform_channel_handle.h"
37 #include "mojo/system/test_utils.h"
38 #include "testing/gtest/include/gtest/gtest.h"
39
40 namespace mojo {
41 namespace system {
42 namespace {
43
44 MessageInTransit* MakeTestMessage(uint32_t num_bytes) {
45   std::vector<unsigned char> bytes(num_bytes, 0);
46   for (size_t i = 0; i < num_bytes; i++)
47     bytes[i] = static_cast<unsigned char>(i + num_bytes);
48   return MessageInTransit::Create(bytes.data(), num_bytes);
49 }
50
51 bool CheckMessageData(const void* bytes, uint32_t num_bytes) {
52   const unsigned char* b = static_cast<const unsigned char*>(bytes);
53   for (uint32_t i = 0; i < num_bytes; i++) {
54     if (b[i] != static_cast<unsigned char>(i + num_bytes))
55       return false;
56   }
57   return true;
58 }
59
60 // -----------------------------------------------------------------------------
61
62 class RawChannelPosixTest : public testing::Test {
63  public:
64   RawChannelPosixTest() : io_thread_("io_thread") {
65     fds_[0] = -1;
66     fds_[1] = -1;
67   }
68
69   virtual ~RawChannelPosixTest() {
70   }
71
72   virtual void SetUp() OVERRIDE {
73     io_thread_.StartWithOptions(
74         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
75
76     // Create the socket.
77     PCHECK(socketpair(AF_UNIX, SOCK_STREAM, 0, fds_) == 0);
78
79     // Set the ends to non-blocking.
80     PCHECK(fcntl(fds_[0], F_SETFL, O_NONBLOCK) == 0);
81     PCHECK(fcntl(fds_[1], F_SETFL, O_NONBLOCK) == 0);
82   }
83
84   virtual void TearDown() OVERRIDE {
85     if (fds_[0] != -1)
86       CHECK_EQ(close(fds_[0]), 0);
87     if (fds_[1] != -1)
88       CHECK_EQ(close(fds_[1]), 0);
89
90     io_thread_.Stop();
91   }
92
93  protected:
94   int fd(size_t i) { return fds_[i]; }
95   void clear_fd(size_t i) { fds_[i] = -1; }
96
97   base::MessageLoop* io_thread_message_loop() {
98     return io_thread_.message_loop();
99   }
100
101   scoped_refptr<base::TaskRunner> io_thread_task_runner() {
102     return io_thread_message_loop()->message_loop_proxy();
103   }
104
105  private:
106   base::Thread io_thread_;
107   int fds_[2];
108
109   DISALLOW_COPY_AND_ASSIGN(RawChannelPosixTest);
110 };
111
112 // RawChannelPosixTest.WriteMessage --------------------------------------------
113
114 class WriteOnlyRawChannelDelegate : public RawChannel::Delegate {
115  public:
116   WriteOnlyRawChannelDelegate() {}
117   virtual ~WriteOnlyRawChannelDelegate() {}
118
119   // |RawChannel::Delegate| implementation:
120   virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE {
121     NOTREACHED();
122   }
123   virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
124     NOTREACHED();
125   }
126
127  private:
128   DISALLOW_COPY_AND_ASSIGN(WriteOnlyRawChannelDelegate);
129 };
130
131 static const int64_t kMessageReaderSleepMs = 1;
132 static const size_t kMessageReaderMaxPollIterations = 3000;
133
134 class TestMessageReaderAndChecker {
135  public:
136   explicit TestMessageReaderAndChecker(int fd) : fd_(fd) {}
137   ~TestMessageReaderAndChecker() { CHECK(bytes_.empty()); }
138
139   bool ReadAndCheckNextMessage(uint32_t expected_size) {
140     unsigned char buffer[4096];
141
142     for (size_t i = 0; i < kMessageReaderMaxPollIterations;) {
143       ssize_t read_size = HANDLE_EINTR(read(fd_, buffer, sizeof(buffer)));
144       if (read_size < 0) {
145         PCHECK(errno == EAGAIN || errno == EWOULDBLOCK);
146         read_size = 0;
147       }
148
149       // Append newly-read data to |bytes_|.
150       bytes_.insert(bytes_.end(), buffer, buffer + read_size);
151
152       // If we have the header....
153       if (bytes_.size() >= sizeof(MessageInTransit)) {
154         const MessageInTransit* message =
155             reinterpret_cast<const MessageInTransit*>(bytes_.data());
156         CHECK_EQ(reinterpret_cast<size_t>(message) %
157                      MessageInTransit::kMessageAlignment, 0u);
158
159         if (message->data_size() != expected_size) {
160           LOG(ERROR) << "Wrong size: " << message->data_size() << " instead of "
161                      << expected_size << " bytes.";
162           return false;
163         }
164
165         // If we've read the whole message....
166         if (bytes_.size() >= message->size_with_header_and_padding()) {
167           if (!CheckMessageData(message->data(), message->data_size())) {
168             LOG(ERROR) << "Incorrect message data.";
169             return false;
170           }
171
172           // Erase message data.
173           bytes_.erase(bytes_.begin(),
174                        bytes_.begin() +
175                            message->size_with_header_and_padding());
176           return true;
177         }
178       }
179
180       if (static_cast<size_t>(read_size) < sizeof(buffer)) {
181         i++;
182         base::PlatformThread::Sleep(
183             base::TimeDelta::FromMilliseconds(kMessageReaderSleepMs));
184       }
185     }
186
187     LOG(ERROR) << "Too many iterations.";
188     return false;
189   }
190
191  private:
192   const int fd_;
193
194   // The start of the received data should always be on a message boundary.
195   std::vector<unsigned char> bytes_;
196
197   DISALLOW_COPY_AND_ASSIGN(TestMessageReaderAndChecker);
198 };
199
200 // Tests writing (and verifies reading using our own custom reader).
201 TEST_F(RawChannelPosixTest, WriteMessage) {
202   WriteOnlyRawChannelDelegate delegate;
203   scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
204                                                &delegate,
205                                                io_thread_message_loop()));
206   // |RawChannel::Create()| takes ownership of the FD.
207   clear_fd(0);
208
209   TestMessageReaderAndChecker checker(fd(1));
210
211   test::PostTaskAndWait(io_thread_task_runner(),
212                         FROM_HERE,
213                         base::Bind(&RawChannel::Init,
214                                    base::Unretained(rc.get())));
215
216   // Write and read, for a variety of sizes.
217   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
218     EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size)));
219     EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
220   }
221
222   // Write/queue and read afterwards, for a variety of sizes.
223   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
224     EXPECT_TRUE(rc->WriteMessage(MakeTestMessage(size)));
225   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
226     EXPECT_TRUE(checker.ReadAndCheckNextMessage(size)) << size;
227
228   test::PostTaskAndWait(io_thread_task_runner(),
229                         FROM_HERE,
230                         base::Bind(&RawChannel::Shutdown,
231                                    base::Unretained(rc.get())));
232 }
233
234 // RawChannelPosixTest.OnReadMessage -------------------------------------------
235
236 class ReadCheckerRawChannelDelegate : public RawChannel::Delegate {
237  public:
238   ReadCheckerRawChannelDelegate()
239       : done_event_(false, false),
240         position_(0) {}
241   virtual ~ReadCheckerRawChannelDelegate() {}
242
243   // |RawChannel::Delegate| implementation (called on the I/O thread):
244   virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
245     size_t position;
246     size_t expected_size;
247     bool should_signal = false;
248     {
249       base::AutoLock locker(lock_);
250       CHECK_LT(position_, expected_sizes_.size());
251       position = position_;
252       expected_size = expected_sizes_[position];
253       position_++;
254       if (position_ >= expected_sizes_.size())
255         should_signal = true;
256     }
257
258     EXPECT_EQ(expected_size, message.data_size()) << position;
259     if (message.data_size() == expected_size) {
260       EXPECT_TRUE(CheckMessageData(message.data(), message.data_size()))
261           << position;
262     }
263
264     if (should_signal)
265       done_event_.Signal();
266   }
267   virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
268     NOTREACHED();
269   }
270
271   // Wait for all the messages (of sizes |expected_sizes_|) to be seen.
272   void Wait() {
273     done_event_.Wait();
274   }
275
276   void SetExpectedSizes(const std::vector<uint32_t>& expected_sizes) {
277     base::AutoLock locker(lock_);
278     CHECK_EQ(position_, expected_sizes_.size());
279     expected_sizes_ = expected_sizes;
280     position_ = 0;
281   }
282
283  private:
284   base::WaitableEvent done_event_;
285
286   base::Lock lock_;  // Protects the following members.
287   std::vector<uint32_t> expected_sizes_;
288   size_t position_;
289
290   DISALLOW_COPY_AND_ASSIGN(ReadCheckerRawChannelDelegate);
291 };
292
293 // Tests reading (writing using our own custom writer).
294 TEST_F(RawChannelPosixTest, OnReadMessage) {
295   // We're going to write to |fd(1)|. We'll do so in a blocking manner.
296   PCHECK(fcntl(fd(1), F_SETFL, 0) == 0);
297
298   ReadCheckerRawChannelDelegate delegate;
299   scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
300                                                &delegate,
301                                                io_thread_message_loop()));
302   // |RawChannel::Create()| takes ownership of the FD.
303   clear_fd(0);
304
305   test::PostTaskAndWait(io_thread_task_runner(),
306                         FROM_HERE,
307                         base::Bind(&RawChannel::Init,
308                                    base::Unretained(rc.get())));
309
310   // Write and read, for a variety of sizes.
311   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
312     delegate.SetExpectedSizes(std::vector<uint32_t>(1, size));
313     MessageInTransit* message = MakeTestMessage(size);
314     EXPECT_EQ(static_cast<ssize_t>(message->size_with_header_and_padding()),
315               write(fd(1), message, message->size_with_header_and_padding()));
316     message->Destroy();
317     delegate.Wait();
318   }
319
320   // Set up reader and write as fast as we can.
321   // Write/queue and read afterwards, for a variety of sizes.
322   std::vector<uint32_t> expected_sizes;
323   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1)
324     expected_sizes.push_back(size);
325   delegate.SetExpectedSizes(expected_sizes);
326   for (uint32_t size = 1; size < 5 * 1000 * 1000; size += size / 2 + 1) {
327     MessageInTransit* message = MakeTestMessage(size);
328     EXPECT_EQ(static_cast<ssize_t>(message->size_with_header_and_padding()),
329               write(fd(1), message, message->size_with_header_and_padding()));
330     message->Destroy();
331   }
332   delegate.Wait();
333
334   test::PostTaskAndWait(io_thread_task_runner(),
335                         FROM_HERE,
336                         base::Bind(&RawChannel::Shutdown,
337                                    base::Unretained(rc.get())));
338 }
339
340 // RawChannelPosixTest.WriteMessageAndOnReadMessage ----------------------------
341
342 class RawChannelWriterThread : public base::SimpleThread {
343  public:
344   RawChannelWriterThread(RawChannel* raw_channel, size_t write_count)
345       : base::SimpleThread("raw_channel_writer_thread"),
346         raw_channel_(raw_channel),
347         left_to_write_(write_count) {
348   }
349
350   virtual ~RawChannelWriterThread() {
351     Join();
352   }
353
354  private:
355   virtual void Run() OVERRIDE {
356     static const int kMaxRandomMessageSize = 25000;
357
358     while (left_to_write_-- > 0) {
359       EXPECT_TRUE(raw_channel_->WriteMessage(MakeTestMessage(
360           static_cast<uint32_t>(base::RandInt(1, kMaxRandomMessageSize)))));
361     }
362   }
363
364   RawChannel* const raw_channel_;
365   size_t left_to_write_;
366
367   DISALLOW_COPY_AND_ASSIGN(RawChannelWriterThread);
368 };
369
370 class ReadCountdownRawChannelDelegate : public RawChannel::Delegate {
371  public:
372   explicit ReadCountdownRawChannelDelegate(size_t expected_count)
373       : done_event_(false, false),
374         expected_count_(expected_count),
375         count_(0) {}
376   virtual ~ReadCountdownRawChannelDelegate() {}
377
378   // |RawChannel::Delegate| implementation (called on the I/O thread):
379   virtual void OnReadMessage(const MessageInTransit& message) OVERRIDE {
380     EXPECT_LT(count_, expected_count_);
381     count_++;
382
383     EXPECT_TRUE(CheckMessageData(message.data(), message.data_size()));
384
385     if (count_ >= expected_count_)
386       done_event_.Signal();
387   }
388   virtual void OnFatalError(FatalError /*fatal_error*/) OVERRIDE {
389     NOTREACHED();
390   }
391
392   // Wait for all the messages to have been seen.
393   void Wait() {
394     done_event_.Wait();
395   }
396
397  private:
398   base::WaitableEvent done_event_;
399   size_t expected_count_;
400   size_t count_;
401
402   DISALLOW_COPY_AND_ASSIGN(ReadCountdownRawChannelDelegate);
403 };
404
405 TEST_F(RawChannelPosixTest, WriteMessageAndOnReadMessage) {
406   static const size_t kNumWriterThreads = 10;
407   static const size_t kNumWriteMessagesPerThread = 4000;
408
409   WriteOnlyRawChannelDelegate writer_delegate;
410   scoped_ptr<RawChannel> writer_rc(
411       RawChannel::Create(PlatformChannelHandle(fd(0)),
412                                                &writer_delegate,
413                                                io_thread_message_loop()));
414   // |RawChannel::Create()| takes ownership of the FD.
415   clear_fd(0);
416
417   test::PostTaskAndWait(io_thread_task_runner(),
418                         FROM_HERE,
419                         base::Bind(&RawChannel::Init,
420                                    base::Unretained(writer_rc.get())));
421
422   ReadCountdownRawChannelDelegate reader_delegate(
423       kNumWriterThreads * kNumWriteMessagesPerThread);
424   scoped_ptr<RawChannel> reader_rc(
425       RawChannel::Create(PlatformChannelHandle(fd(1)),
426                                                &reader_delegate,
427                                                io_thread_message_loop()));
428   // |RawChannel::Create()| takes ownership of the FD.
429   clear_fd(1);
430
431   test::PostTaskAndWait(io_thread_task_runner(),
432                         FROM_HERE,
433                         base::Bind(&RawChannel::Init,
434                                    base::Unretained(reader_rc.get())));
435
436   {
437     ScopedVector<RawChannelWriterThread> writer_threads;
438     for (size_t i = 0; i < kNumWriterThreads; i++) {
439       writer_threads.push_back(new RawChannelWriterThread(
440           writer_rc.get(), kNumWriteMessagesPerThread));
441     }
442     for (size_t i = 0; i < writer_threads.size(); i++)
443       writer_threads[i]->Start();
444   }  // Joins all the writer threads.
445
446   // Sleep a bit, to let any extraneous reads be processed. (There shouldn't be
447   // any, but we want to know about them.)
448   base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
449
450   // Wait for reading to finish.
451   reader_delegate.Wait();
452
453   test::PostTaskAndWait(io_thread_task_runner(),
454                         FROM_HERE,
455                         base::Bind(&RawChannel::Shutdown,
456                                    base::Unretained(reader_rc.get())));
457
458   test::PostTaskAndWait(io_thread_task_runner(),
459                         FROM_HERE,
460                         base::Bind(&RawChannel::Shutdown,
461                                    base::Unretained(writer_rc.get())));
462 }
463
464 // RawChannelPosixTest.OnFatalError --------------------------------------------
465
466 class FatalErrorRecordingRawChannelDelegate : public RawChannel::Delegate {
467  public:
468   FatalErrorRecordingRawChannelDelegate()
469       : got_fatal_error_event_(false, false),
470         on_fatal_error_call_count_(0),
471         last_fatal_error_(FATAL_ERROR_UNKNOWN) {}
472   virtual ~FatalErrorRecordingRawChannelDelegate() {}
473
474   // |RawChannel::Delegate| implementation:
475   virtual void OnReadMessage(const MessageInTransit& /*message*/) OVERRIDE {
476     NOTREACHED();
477   }
478   virtual void OnFatalError(FatalError fatal_error) OVERRIDE {
479     CHECK_EQ(on_fatal_error_call_count_, 0);
480     on_fatal_error_call_count_++;
481     last_fatal_error_ = fatal_error;
482     got_fatal_error_event_.Signal();
483   }
484
485   FatalError WaitForFatalError() {
486     got_fatal_error_event_.Wait();
487     CHECK_EQ(on_fatal_error_call_count_, 1);
488     return last_fatal_error_;
489   }
490
491  private:
492   base::WaitableEvent got_fatal_error_event_;
493
494   int on_fatal_error_call_count_;
495   FatalError last_fatal_error_;
496
497   DISALLOW_COPY_AND_ASSIGN(FatalErrorRecordingRawChannelDelegate);
498 };
499
500 // Tests fatal errors.
501 // TODO(vtl): Figure out how to make reading fail reliably. (I'm not convinced
502 // that it does.)
503 TEST_F(RawChannelPosixTest, OnFatalError) {
504   FatalErrorRecordingRawChannelDelegate delegate;
505   scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
506                                                &delegate,
507                                                io_thread_message_loop()));
508   // |RawChannel::Create()| takes ownership of the FD.
509   clear_fd(0);
510
511   test::PostTaskAndWait(io_thread_task_runner(),
512                         FROM_HERE,
513                         base::Bind(&RawChannel::Init,
514                                    base::Unretained(rc.get())));
515
516   // Close the other end, which should make writing fail.
517   CHECK_EQ(close(fd(1)), 0);
518   clear_fd(1);
519
520   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
521
522   // TODO(vtl): In theory, it's conceivable that closing the other end might
523   // lead to read failing. In practice, it doesn't seem to.
524   EXPECT_EQ(RawChannel::Delegate::FATAL_ERROR_FAILED_WRITE,
525             delegate.WaitForFatalError());
526
527   test::PostTaskAndWait(io_thread_task_runner(),
528                         FROM_HERE,
529                         base::Bind(&RawChannel::Shutdown,
530                                    base::Unretained(rc.get())));
531
532 }
533
534 // RawChannelPosixTest.WriteMessageAfterShutdown -------------------------------
535
536 // Makes sure that calling |WriteMessage()| after |Shutdown()| behaves
537 // correctly.
538 TEST_F(RawChannelPosixTest, WriteMessageAfterShutdown) {
539   WriteOnlyRawChannelDelegate delegate;
540   scoped_ptr<RawChannel> rc(RawChannel::Create(PlatformChannelHandle(fd(0)),
541                                                &delegate,
542                                                io_thread_message_loop()));
543   // |RawChannel::Create()| takes ownership of the FD.
544   clear_fd(0);
545
546   test::PostTaskAndWait(io_thread_task_runner(),
547                         FROM_HERE,
548                         base::Bind(&RawChannel::Init,
549                                    base::Unretained(rc.get())));
550   test::PostTaskAndWait(io_thread_task_runner(),
551                         FROM_HERE,
552                         base::Bind(&RawChannel::Shutdown,
553                                    base::Unretained(rc.get())));
554
555   EXPECT_FALSE(rc->WriteMessage(MakeTestMessage(1)));
556 }
557
558 }  // namespace
559 }  // namespace system
560 }  // namespace mojo