1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase
7 // tolerance and reduce observed flakiness.
9 #include "mojo/system/message_pipe_dispatcher.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/memory/scoped_vector.h"
17 #include "base/rand_util.h"
18 #include "base/threading/platform_thread.h" // For |Sleep()|.
19 #include "base/threading/simple_thread.h"
20 #include "base/time/time.h"
21 #include "mojo/system/message_pipe.h"
22 #include "mojo/system/test_utils.h"
23 #include "mojo/system/waiter.h"
24 #include "mojo/system/waiter_test_utils.h"
25 #include "testing/gtest/include/gtest/gtest.h"
31 const int64_t kMicrosPerMs = 1000;
32 const int64_t kEpsilonMicros = 15 * kMicrosPerMs; // 15 ms.
34 TEST(MessagePipeDispatcherTest, Basic) {
35 test::Stopwatch stopwatch;
37 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
39 int64_t elapsed_micros;
41 // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
42 for (unsigned i = 0; i < 2; i++) {
43 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
44 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
46 scoped_refptr<MessagePipe> mp(new MessagePipe());
47 d_0->Init(mp, i); // 0, 1.
48 d_1->Init(mp, i ^ 1); // 1, 0.
52 // Try adding a writable waiter when already writable.
54 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
55 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0));
56 // Shouldn't need to remove the waiter (it was not added).
58 // Add a readable waiter to |d_0|, then make it readable (by writing to
61 EXPECT_EQ(MOJO_RESULT_OK,
62 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
63 buffer[0] = 123456789;
64 EXPECT_EQ(MOJO_RESULT_OK,
65 d_1->WriteMessage(buffer, kBufferSize,
67 MOJO_WRITE_MESSAGE_FLAG_NONE));
69 EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE));
70 elapsed_micros = stopwatch.Elapsed();
71 EXPECT_LT(elapsed_micros, kEpsilonMicros);
72 d_0->RemoveWaiter(&w);
74 // Try adding a readable waiter when already readable (from above).
76 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
77 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
78 // Shouldn't need to remove the waiter (it was not added).
80 // Make |d_0| no longer readable (by reading from it).
82 buffer_size = kBufferSize;
83 EXPECT_EQ(MOJO_RESULT_OK,
84 d_0->ReadMessage(buffer, &buffer_size,
86 MOJO_READ_MESSAGE_FLAG_NONE));
87 EXPECT_EQ(kBufferSize, buffer_size);
88 EXPECT_EQ(123456789, buffer[0]);
90 // Wait for zero time for readability on |d_0| (will time out).
92 EXPECT_EQ(MOJO_RESULT_OK,
93 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
95 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0));
96 elapsed_micros = stopwatch.Elapsed();
97 EXPECT_LT(elapsed_micros, kEpsilonMicros);
98 d_0->RemoveWaiter(&w);
100 // Wait for non-zero, finite time for readability on |d_0| (will time out).
102 EXPECT_EQ(MOJO_RESULT_OK,
103 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
105 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros));
106 elapsed_micros = stopwatch.Elapsed();
107 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
108 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
109 d_0->RemoveWaiter(&w);
111 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
112 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
116 TEST(MessagePipeDispatcherTest, InvalidParams) {
118 MojoHandle handles[1];
120 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
121 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
123 scoped_refptr<MessagePipe> mp(new MessagePipe());
129 // Null buffer with nonzero buffer size.
130 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
131 d_0->WriteMessage(NULL, 1,
133 MOJO_WRITE_MESSAGE_FLAG_NONE));
135 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
136 d_0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(),
138 MOJO_WRITE_MESSAGE_FLAG_NONE));
140 // Null handles with nonzero handle count.
141 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
142 d_0->WriteMessage(buffer, sizeof(buffer),
144 MOJO_WRITE_MESSAGE_FLAG_NONE));
145 // Huge handle count (implausibly big on some systems -- more than can be
146 // stored in a 32-bit address space).
147 // Note: This may return either |MOJO_RESULT_INVALID_ARGUMENT| or
148 // |MOJO_RESULT_RESOURCE_EXHAUSTED|, depending on whether it's plausible or
150 EXPECT_NE(MOJO_RESULT_OK,
151 d_0->WriteMessage(buffer, sizeof(buffer),
152 handles, std::numeric_limits<uint32_t>::max(),
153 MOJO_WRITE_MESSAGE_FLAG_NONE));
154 // Huge handle count (plausibly big).
155 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
156 d_0->WriteMessage(buffer, sizeof(buffer),
157 handles, std::numeric_limits<uint32_t>::max() /
159 MOJO_WRITE_MESSAGE_FLAG_NONE));
162 // Null buffer with nonzero buffer size.
163 uint32_t buffer_size = 1;
164 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
165 d_0->ReadMessage(NULL, &buffer_size,
167 MOJO_READ_MESSAGE_FLAG_NONE));
168 // Null handles with nonzero handle count.
169 buffer_size = static_cast<uint32_t>(sizeof(buffer));
170 uint32_t handle_count = 1;
171 EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
172 d_0->ReadMessage(buffer, &buffer_size,
174 MOJO_READ_MESSAGE_FLAG_NONE));
176 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
177 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
180 // Test what happens when one end is closed (single-threaded test).
181 TEST(MessagePipeDispatcherTest, BasicClosed) {
183 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
184 uint32_t buffer_size;
186 // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
187 for (unsigned i = 0; i < 2; i++) {
188 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
189 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
191 scoped_refptr<MessagePipe> mp(new MessagePipe());
192 d_0->Init(mp, i); // 0, 1.
193 d_1->Init(mp, i ^ 1); // 1, 0.
197 // Write (twice) to |d_1|.
198 buffer[0] = 123456789;
199 EXPECT_EQ(MOJO_RESULT_OK,
200 d_1->WriteMessage(buffer, kBufferSize,
202 MOJO_WRITE_MESSAGE_FLAG_NONE));
203 buffer[0] = 234567890;
204 EXPECT_EQ(MOJO_RESULT_OK,
205 d_1->WriteMessage(buffer, kBufferSize,
207 MOJO_WRITE_MESSAGE_FLAG_NONE));
209 // Try waiting for readable on |d_0|; should fail (already satisfied).
211 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
212 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
215 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
217 // Try waiting for readable on |d_0|; should fail (already satisfied).
219 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
220 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
224 buffer_size = kBufferSize;
225 EXPECT_EQ(MOJO_RESULT_OK,
226 d_0->ReadMessage(buffer, &buffer_size,
228 MOJO_READ_MESSAGE_FLAG_NONE));
229 EXPECT_EQ(kBufferSize, buffer_size);
230 EXPECT_EQ(123456789, buffer[0]);
232 // Try waiting for readable on |d_0|; should fail (already satisfied).
234 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
235 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
237 // Read again from |d_0|.
239 buffer_size = kBufferSize;
240 EXPECT_EQ(MOJO_RESULT_OK,
241 d_0->ReadMessage(buffer, &buffer_size,
243 MOJO_READ_MESSAGE_FLAG_NONE));
244 EXPECT_EQ(kBufferSize, buffer_size);
245 EXPECT_EQ(234567890, buffer[0]);
247 // Try waiting for readable on |d_0|; should fail (unsatisfiable).
249 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
250 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
252 // Try waiting for writable on |d_0|; should fail (unsatisfiable).
254 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
255 d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
257 // Try reading from |d_0|; should fail (nothing to read).
259 buffer_size = kBufferSize;
260 EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
261 d_0->ReadMessage(buffer, &buffer_size,
263 MOJO_READ_MESSAGE_FLAG_NONE));
265 // Try writing to |d_0|; should fail (other end closed).
266 buffer[0] = 345678901;
267 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
268 d_0->WriteMessage(buffer, kBufferSize,
270 MOJO_WRITE_MESSAGE_FLAG_NONE));
272 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
276 TEST(MessagePipeDispatcherTest, BasicThreaded) {
277 test::Stopwatch stopwatch;
279 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
280 uint32_t buffer_size;
283 int64_t elapsed_micros;
285 // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
286 for (unsigned i = 0; i < 2; i++) {
287 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
288 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
290 scoped_refptr<MessagePipe> mp(new MessagePipe());
291 d_0->Init(mp, i); // 0, 1.
292 d_1->Init(mp, i ^ 1); // 1, 0.
295 // Wait for readable on |d_1|, which will become readable after some time.
297 test::WaiterThread thread(d_1,
298 MOJO_WAIT_FLAG_READABLE,
299 MOJO_DEADLINE_INDEFINITE,
304 base::PlatformThread::Sleep(
305 base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
306 // Wake it up by writing to |d_0|.
307 buffer[0] = 123456789;
308 EXPECT_EQ(MOJO_RESULT_OK,
309 d_0->WriteMessage(buffer, kBufferSize,
311 MOJO_WRITE_MESSAGE_FLAG_NONE));
312 } // Joins the thread.
313 elapsed_micros = stopwatch.Elapsed();
314 EXPECT_TRUE(did_wait);
315 EXPECT_EQ(0, result);
316 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
317 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
319 // Now |d_1| is already readable. Try waiting for it again.
321 test::WaiterThread thread(d_1,
322 MOJO_WAIT_FLAG_READABLE,
323 MOJO_DEADLINE_INDEFINITE,
328 } // Joins the thread.
329 elapsed_micros = stopwatch.Elapsed();
330 EXPECT_FALSE(did_wait);
331 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
332 EXPECT_LT(elapsed_micros, kEpsilonMicros);
334 // Consume what we wrote to |d_0|.
336 buffer_size = kBufferSize;
337 EXPECT_EQ(MOJO_RESULT_OK,
338 d_1->ReadMessage(buffer, &buffer_size,
340 MOJO_READ_MESSAGE_FLAG_NONE));
341 EXPECT_EQ(kBufferSize, buffer_size);
342 EXPECT_EQ(123456789, buffer[0]);
344 // Wait for readable on |d_1| and close |d_0| after some time, which should
347 test::WaiterThread thread(d_1,
348 MOJO_WAIT_FLAG_READABLE,
349 MOJO_DEADLINE_INDEFINITE,
354 base::PlatformThread::Sleep(
355 base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
356 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
357 } // Joins the thread.
358 elapsed_micros = stopwatch.Elapsed();
359 EXPECT_TRUE(did_wait);
360 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
361 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
362 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
364 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
367 for (unsigned i = 0; i < 2; i++) {
368 scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
369 scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
371 scoped_refptr<MessagePipe> mp(new MessagePipe());
372 d_0->Init(mp, i); // 0, 1.
373 d_1->Init(mp, i ^ 1); // 1, 0.
376 // Wait for readable on |d_1| and close |d_1| after some time, which should
379 test::WaiterThread thread(d_1,
380 MOJO_WAIT_FLAG_READABLE,
381 MOJO_DEADLINE_INDEFINITE,
386 base::PlatformThread::Sleep(
387 base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
388 EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
389 } // Joins the thread.
390 elapsed_micros = stopwatch.Elapsed();
391 EXPECT_TRUE(did_wait);
392 EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
393 EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
394 EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
396 EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
400 // Stress test -----------------------------------------------------------------
402 const size_t kMaxMessageSize = 2000;
404 class WriterThread : public base::SimpleThread {
406 // |*messages_written| and |*bytes_written| belong to the thread while it's
408 WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
409 size_t* messages_written, size_t* bytes_written)
410 : base::SimpleThread("writer_thread"),
411 write_dispatcher_(write_dispatcher),
412 messages_written_(messages_written),
413 bytes_written_(bytes_written) {
414 *messages_written_ = 0;
418 virtual ~WriterThread() {
423 virtual void Run() OVERRIDE {
424 // Make some data to write.
425 unsigned char buffer[kMaxMessageSize];
426 for (size_t i = 0; i < kMaxMessageSize; i++)
427 buffer[i] = static_cast<unsigned char>(i);
429 // Number of messages to write.
430 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
433 for (size_t i = 0; i < *messages_written_; i++) {
434 uint32_t bytes_to_write = static_cast<uint32_t>(
435 base::RandInt(1, static_cast<int>(kMaxMessageSize)));
436 EXPECT_EQ(MOJO_RESULT_OK,
437 write_dispatcher_->WriteMessage(buffer, bytes_to_write,
439 MOJO_WRITE_MESSAGE_FLAG_NONE));
440 *bytes_written_ += bytes_to_write;
443 // Write one last "quit" message.
444 EXPECT_EQ(MOJO_RESULT_OK,
445 write_dispatcher_->WriteMessage("quit", 4, NULL, 0,
446 MOJO_WRITE_MESSAGE_FLAG_NONE));
449 const scoped_refptr<Dispatcher> write_dispatcher_;
450 size_t* const messages_written_;
451 size_t* const bytes_written_;
453 DISALLOW_COPY_AND_ASSIGN(WriterThread);
456 class ReaderThread : public base::SimpleThread {
458 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
459 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
460 size_t* messages_read, size_t* bytes_read)
461 : base::SimpleThread("reader_thread"),
462 read_dispatcher_(read_dispatcher),
463 messages_read_(messages_read),
464 bytes_read_(bytes_read) {
469 virtual ~ReaderThread() {
474 virtual void Run() OVERRIDE {
475 unsigned char buffer[kMaxMessageSize];
481 // Wait for it to be readable.
483 result = read_dispatcher_->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0);
484 EXPECT_TRUE(result == MOJO_RESULT_OK ||
485 result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
486 if (result == MOJO_RESULT_OK) {
487 // Actually need to wait.
488 EXPECT_EQ(0, w.Wait(MOJO_DEADLINE_INDEFINITE));
489 read_dispatcher_->RemoveWaiter(&w);
492 // Now, try to do the read.
493 // Clear the buffer so that we can check the result.
494 memset(buffer, 0, sizeof(buffer));
495 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
496 result = read_dispatcher_->ReadMessage(buffer, &buffer_size, NULL, NULL,
497 MOJO_READ_MESSAGE_FLAG_NONE);
498 EXPECT_TRUE(result == MOJO_RESULT_OK ||
499 result == MOJO_RESULT_NOT_FOUND) << "result: " << result;
500 // We're racing with others to read, so maybe we failed.
501 if (result == MOJO_RESULT_NOT_FOUND)
502 continue; // In which case, try again.
504 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
506 EXPECT_GE(buffer_size, 1u);
507 EXPECT_LE(buffer_size, kMaxMessageSize);
508 EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
511 *bytes_read_ += buffer_size;
515 static bool IsValidMessage(const unsigned char* buffer,
516 uint32_t message_size) {
518 for (i = 0; i < message_size; i++) {
519 if (buffer[i] != static_cast<unsigned char>(i))
522 // Check that the remaining bytes weren't stomped on.
523 for (; i < kMaxMessageSize; i++) {
530 const scoped_refptr<Dispatcher> read_dispatcher_;
531 size_t* const messages_read_;
532 size_t* const bytes_read_;
534 DISALLOW_COPY_AND_ASSIGN(ReaderThread);
537 TEST(MessagePipeDispatcherTest, Stress) {
538 static const size_t kNumWriters = 30;
539 static const size_t kNumReaders = kNumWriters;
541 scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher());
542 scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher());
544 scoped_refptr<MessagePipe> mp(new MessagePipe());
545 d_write->Init(mp, 0);
549 size_t messages_written[kNumWriters];
550 size_t bytes_written[kNumWriters];
551 size_t messages_read[kNumReaders];
552 size_t bytes_read[kNumReaders];
555 ScopedVector<WriterThread> writers;
556 for (size_t i = 0; i < kNumWriters; i++) {
558 new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
562 ScopedVector<ReaderThread> readers;
563 for (size_t i = 0; i < kNumReaders; i++) {
565 new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
569 for (size_t i = 0; i < kNumWriters; i++)
573 for (size_t i = 0; i < kNumReaders; i++)
576 // TODO(vtl): Maybe I should have an event that triggers all the threads to
577 // start doing stuff for real (so that the first ones created/started aren't
579 } // Joins all the threads.
581 size_t total_messages_written = 0;
582 size_t total_bytes_written = 0;
583 for (size_t i = 0; i < kNumWriters; i++) {
584 total_messages_written += messages_written[i];
585 total_bytes_written += bytes_written[i];
587 size_t total_messages_read = 0;
588 size_t total_bytes_read = 0;
589 for (size_t i = 0; i < kNumReaders; i++) {
590 total_messages_read += messages_read[i];
591 total_bytes_read += bytes_read[i];
592 // We'd have to be really unlucky to have read no messages on a thread.
593 EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
594 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
596 EXPECT_EQ(total_messages_written, total_messages_read);
597 EXPECT_EQ(total_bytes_written, total_bytes_read);
599 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
600 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
604 } // namespace system