- add sources.
[platform/framework/web/crosswalk.git] / src / mojo / system / message_pipe_dispatcher_unittest.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |kEpsilonMicros| may be increased to increase
7 // tolerance and reduce observed flakiness.
8
9 #include "mojo/system/message_pipe_dispatcher.h"
10
11 #include <string.h>
12
13 #include <limits>
14
15 #include "base/memory/ref_counted.h"
16 #include "base/memory/scoped_vector.h"
17 #include "base/rand_util.h"
18 #include "base/threading/platform_thread.h"  // For |Sleep()|.
19 #include "base/threading/simple_thread.h"
20 #include "base/time/time.h"
21 #include "mojo/system/message_pipe.h"
22 #include "mojo/system/test_utils.h"
23 #include "mojo/system/waiter.h"
24 #include "mojo/system/waiter_test_utils.h"
25 #include "testing/gtest/include/gtest/gtest.h"
26
27 namespace mojo {
28 namespace system {
29 namespace {
30
31 const int64_t kMicrosPerMs = 1000;
32 const int64_t kEpsilonMicros = 15 * kMicrosPerMs;  // 15 ms.
33
34 TEST(MessagePipeDispatcherTest, Basic) {
35   test::Stopwatch stopwatch;
36   int32_t buffer[1];
37   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
38   uint32_t buffer_size;
39   int64_t elapsed_micros;
40
41   // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
42   for (unsigned i = 0; i < 2; i++) {
43     scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
44     scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
45     {
46       scoped_refptr<MessagePipe> mp(new MessagePipe());
47       d_0->Init(mp, i);  // 0, 1.
48       d_1->Init(mp, i ^ 1);  // 1, 0.
49     }
50     Waiter w;
51
52     // Try adding a writable waiter when already writable.
53     w.Init();
54     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
55               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 0));
56     // Shouldn't need to remove the waiter (it was not added).
57
58     // Add a readable waiter to |d_0|, then make it readable (by writing to
59     // |d_1|), then wait.
60     w.Init();
61     EXPECT_EQ(MOJO_RESULT_OK,
62               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
63     buffer[0] = 123456789;
64     EXPECT_EQ(MOJO_RESULT_OK,
65               d_1->WriteMessage(buffer, kBufferSize,
66                                 NULL, 0,
67                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
68     stopwatch.Start();
69     EXPECT_EQ(1, w.Wait(MOJO_DEADLINE_INDEFINITE));
70     elapsed_micros = stopwatch.Elapsed();
71     EXPECT_LT(elapsed_micros, kEpsilonMicros);
72     d_0->RemoveWaiter(&w);
73
74     // Try adding a readable waiter when already readable (from above).
75     w.Init();
76     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
77               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
78     // Shouldn't need to remove the waiter (it was not added).
79
80     // Make |d_0| no longer readable (by reading from it).
81     buffer[0] = 0;
82     buffer_size = kBufferSize;
83     EXPECT_EQ(MOJO_RESULT_OK,
84               d_0->ReadMessage(buffer, &buffer_size,
85                                NULL, NULL,
86                                MOJO_READ_MESSAGE_FLAG_NONE));
87     EXPECT_EQ(kBufferSize, buffer_size);
88     EXPECT_EQ(123456789, buffer[0]);
89
90     // Wait for zero time for readability on |d_0| (will time out).
91     w.Init();
92     EXPECT_EQ(MOJO_RESULT_OK,
93               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
94     stopwatch.Start();
95     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0));
96     elapsed_micros = stopwatch.Elapsed();
97     EXPECT_LT(elapsed_micros, kEpsilonMicros);
98     d_0->RemoveWaiter(&w);
99
100     // Wait for non-zero, finite time for readability on |d_0| (will time out).
101     w.Init();
102     EXPECT_EQ(MOJO_RESULT_OK,
103               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
104     stopwatch.Start();
105     EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(2 * kEpsilonMicros));
106     elapsed_micros = stopwatch.Elapsed();
107     EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
108     EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
109     d_0->RemoveWaiter(&w);
110
111     EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
112     EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
113   }
114 }
115
116 TEST(MessagePipeDispatcherTest, InvalidParams) {
117   char buffer[1];
118   MojoHandle handles[1];
119
120   scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
121   scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
122   {
123     scoped_refptr<MessagePipe> mp(new MessagePipe());
124     d_0->Init(mp, 0);
125     d_1->Init(mp, 1);
126   }
127
128   // |WriteMessage|:
129   // Null buffer with nonzero buffer size.
130   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
131             d_0->WriteMessage(NULL, 1,
132                               NULL, 0,
133                               MOJO_WRITE_MESSAGE_FLAG_NONE));
134   // Huge buffer size.
135   EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
136             d_0->WriteMessage(buffer, std::numeric_limits<uint32_t>::max(),
137                               NULL, 0,
138                               MOJO_WRITE_MESSAGE_FLAG_NONE));
139
140   // Null handles with nonzero handle count.
141   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
142             d_0->WriteMessage(buffer, sizeof(buffer),
143                               NULL, 1,
144                               MOJO_WRITE_MESSAGE_FLAG_NONE));
145   // Huge handle count (implausibly big on some systems -- more than can be
146   // stored in a 32-bit address space).
147   // Note: This may return either |MOJO_RESULT_INVALID_ARGUMENT| or
148   // |MOJO_RESULT_RESOURCE_EXHAUSTED|, depending on whether it's plausible or
149   // not.
150   EXPECT_NE(MOJO_RESULT_OK,
151             d_0->WriteMessage(buffer, sizeof(buffer),
152                               handles, std::numeric_limits<uint32_t>::max(),
153                               MOJO_WRITE_MESSAGE_FLAG_NONE));
154   // Huge handle count (plausibly big).
155   EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
156             d_0->WriteMessage(buffer, sizeof(buffer),
157                               handles, std::numeric_limits<uint32_t>::max() /
158                                   sizeof(handles[0]),
159                               MOJO_WRITE_MESSAGE_FLAG_NONE));
160
161   // |ReadMessage|:
162   // Null buffer with nonzero buffer size.
163   uint32_t buffer_size = 1;
164   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
165             d_0->ReadMessage(NULL, &buffer_size,
166                              NULL, NULL,
167                              MOJO_READ_MESSAGE_FLAG_NONE));
168   // Null handles with nonzero handle count.
169   buffer_size = static_cast<uint32_t>(sizeof(buffer));
170   uint32_t handle_count = 1;
171   EXPECT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
172             d_0->ReadMessage(buffer, &buffer_size,
173                              NULL, &handle_count,
174                              MOJO_READ_MESSAGE_FLAG_NONE));
175
176   EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
177   EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
178 }
179
180 // Test what happens when one end is closed (single-threaded test).
181 TEST(MessagePipeDispatcherTest, BasicClosed) {
182   int32_t buffer[1];
183   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
184   uint32_t buffer_size;
185
186   // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
187   for (unsigned i = 0; i < 2; i++) {
188     scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
189     scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
190     {
191       scoped_refptr<MessagePipe> mp(new MessagePipe());
192       d_0->Init(mp, i);  // 0, 1.
193       d_1->Init(mp, i ^ 1);  // 1, 0.
194     }
195     Waiter w;
196
197     // Write (twice) to |d_1|.
198     buffer[0] = 123456789;
199     EXPECT_EQ(MOJO_RESULT_OK,
200               d_1->WriteMessage(buffer, kBufferSize,
201                                 NULL, 0,
202                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
203     buffer[0] = 234567890;
204     EXPECT_EQ(MOJO_RESULT_OK,
205               d_1->WriteMessage(buffer, kBufferSize,
206                                 NULL, 0,
207                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
208
209     // Try waiting for readable on |d_0|; should fail (already satisfied).
210     w.Init();
211     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
212               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0));
213
214     // Close |d_1|.
215     EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
216
217     // Try waiting for readable on |d_0|; should fail (already satisfied).
218     w.Init();
219     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
220               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 1));
221
222     // Read from |d_0|.
223     buffer[0] = 0;
224     buffer_size = kBufferSize;
225     EXPECT_EQ(MOJO_RESULT_OK,
226               d_0->ReadMessage(buffer, &buffer_size,
227                                NULL, NULL,
228                                MOJO_READ_MESSAGE_FLAG_NONE));
229     EXPECT_EQ(kBufferSize, buffer_size);
230     EXPECT_EQ(123456789, buffer[0]);
231
232     // Try waiting for readable on |d_0|; should fail (already satisfied).
233     w.Init();
234     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
235               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 2));
236
237     // Read again from |d_0|.
238     buffer[0] = 0;
239     buffer_size = kBufferSize;
240     EXPECT_EQ(MOJO_RESULT_OK,
241               d_0->ReadMessage(buffer, &buffer_size,
242                                NULL, NULL,
243                                MOJO_READ_MESSAGE_FLAG_NONE));
244     EXPECT_EQ(kBufferSize, buffer_size);
245     EXPECT_EQ(234567890, buffer[0]);
246
247     // Try waiting for readable on |d_0|; should fail (unsatisfiable).
248     w.Init();
249     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
250               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 3));
251
252     // Try waiting for writable on |d_0|; should fail (unsatisfiable).
253     w.Init();
254     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
255               d_0->AddWaiter(&w, MOJO_WAIT_FLAG_WRITABLE, 4));
256
257     // Try reading from |d_0|; should fail (nothing to read).
258     buffer[0] = 0;
259     buffer_size = kBufferSize;
260     EXPECT_EQ(MOJO_RESULT_NOT_FOUND,
261               d_0->ReadMessage(buffer, &buffer_size,
262                                NULL, NULL,
263                                MOJO_READ_MESSAGE_FLAG_NONE));
264
265     // Try writing to |d_0|; should fail (other end closed).
266     buffer[0] = 345678901;
267     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
268               d_0->WriteMessage(buffer, kBufferSize,
269                                 NULL, 0,
270                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
271
272     EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
273   }
274 }
275
276 TEST(MessagePipeDispatcherTest, BasicThreaded) {
277   test::Stopwatch stopwatch;
278   int32_t buffer[1];
279   const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
280   uint32_t buffer_size;
281   bool did_wait;
282   MojoResult result;
283   int64_t elapsed_micros;
284
285   // Run this test both with |d_0| as port 0, |d_1| as port 1 and vice versa.
286   for (unsigned i = 0; i < 2; i++) {
287     scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
288     scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
289     {
290       scoped_refptr<MessagePipe> mp(new MessagePipe());
291       d_0->Init(mp, i);  // 0, 1.
292       d_1->Init(mp, i ^ 1);  // 1, 0.
293     }
294
295     // Wait for readable on |d_1|, which will become readable after some time.
296     {
297       test::WaiterThread thread(d_1,
298                                 MOJO_WAIT_FLAG_READABLE,
299                                 MOJO_DEADLINE_INDEFINITE,
300                                 0,
301                                 &did_wait, &result);
302       stopwatch.Start();
303       thread.Start();
304       base::PlatformThread::Sleep(
305           base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
306       // Wake it up by writing to |d_0|.
307       buffer[0] = 123456789;
308       EXPECT_EQ(MOJO_RESULT_OK,
309                 d_0->WriteMessage(buffer, kBufferSize,
310                                   NULL, 0,
311                                   MOJO_WRITE_MESSAGE_FLAG_NONE));
312     }  // Joins the thread.
313     elapsed_micros = stopwatch.Elapsed();
314     EXPECT_TRUE(did_wait);
315     EXPECT_EQ(0, result);
316     EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
317     EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
318
319     // Now |d_1| is already readable. Try waiting for it again.
320     {
321       test::WaiterThread thread(d_1,
322                                 MOJO_WAIT_FLAG_READABLE,
323                                 MOJO_DEADLINE_INDEFINITE,
324                                 1,
325                                 &did_wait, &result);
326       stopwatch.Start();
327       thread.Start();
328     }  // Joins the thread.
329     elapsed_micros = stopwatch.Elapsed();
330     EXPECT_FALSE(did_wait);
331     EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
332     EXPECT_LT(elapsed_micros, kEpsilonMicros);
333
334     // Consume what we wrote to |d_0|.
335     buffer[0] = 0;
336     buffer_size = kBufferSize;
337     EXPECT_EQ(MOJO_RESULT_OK,
338               d_1->ReadMessage(buffer, &buffer_size,
339                                NULL, NULL,
340                                MOJO_READ_MESSAGE_FLAG_NONE));
341     EXPECT_EQ(kBufferSize, buffer_size);
342     EXPECT_EQ(123456789, buffer[0]);
343
344     // Wait for readable on |d_1| and close |d_0| after some time, which should
345     // cancel that wait.
346     {
347       test::WaiterThread thread(d_1,
348                                 MOJO_WAIT_FLAG_READABLE,
349                                 MOJO_DEADLINE_INDEFINITE,
350                                 0,
351                                 &did_wait, &result);
352       stopwatch.Start();
353       thread.Start();
354       base::PlatformThread::Sleep(
355           base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
356       EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
357     }  // Joins the thread.
358     elapsed_micros = stopwatch.Elapsed();
359     EXPECT_TRUE(did_wait);
360     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
361     EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
362     EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
363
364     EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
365   }
366
367   for (unsigned i = 0; i < 2; i++) {
368     scoped_refptr<MessagePipeDispatcher> d_0(new MessagePipeDispatcher());
369     scoped_refptr<MessagePipeDispatcher> d_1(new MessagePipeDispatcher());
370     {
371       scoped_refptr<MessagePipe> mp(new MessagePipe());
372       d_0->Init(mp, i);  // 0, 1.
373       d_1->Init(mp, i ^ 1);  // 1, 0.
374     }
375
376     // Wait for readable on |d_1| and close |d_1| after some time, which should
377     // cancel that wait.
378     {
379       test::WaiterThread thread(d_1,
380                                 MOJO_WAIT_FLAG_READABLE,
381                                 MOJO_DEADLINE_INDEFINITE,
382                                 0,
383                                 &did_wait, &result);
384       stopwatch.Start();
385       thread.Start();
386       base::PlatformThread::Sleep(
387           base::TimeDelta::FromMicroseconds(2 * kEpsilonMicros));
388       EXPECT_EQ(MOJO_RESULT_OK, d_1->Close());
389     }  // Joins the thread.
390     elapsed_micros = stopwatch.Elapsed();
391     EXPECT_TRUE(did_wait);
392     EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
393     EXPECT_GT(elapsed_micros, (2-1) * kEpsilonMicros);
394     EXPECT_LT(elapsed_micros, (2+1) * kEpsilonMicros);
395
396     EXPECT_EQ(MOJO_RESULT_OK, d_0->Close());
397   }
398 }
399
400 // Stress test -----------------------------------------------------------------
401
402 const size_t kMaxMessageSize = 2000;
403
404 class WriterThread : public base::SimpleThread {
405  public:
406   // |*messages_written| and |*bytes_written| belong to the thread while it's
407   // alive.
408   WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
409                size_t* messages_written, size_t* bytes_written)
410       : base::SimpleThread("writer_thread"),
411         write_dispatcher_(write_dispatcher),
412         messages_written_(messages_written),
413         bytes_written_(bytes_written) {
414     *messages_written_ = 0;
415     *bytes_written_ = 0;
416   }
417
418   virtual ~WriterThread() {
419     Join();
420   }
421
422  private:
423   virtual void Run() OVERRIDE {
424     // Make some data to write.
425     unsigned char buffer[kMaxMessageSize];
426     for (size_t i = 0; i < kMaxMessageSize; i++)
427       buffer[i] = static_cast<unsigned char>(i);
428
429     // Number of messages to write.
430     *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
431
432     // Write messages.
433     for (size_t i = 0; i < *messages_written_; i++) {
434       uint32_t bytes_to_write = static_cast<uint32_t>(
435           base::RandInt(1, static_cast<int>(kMaxMessageSize)));
436       EXPECT_EQ(MOJO_RESULT_OK,
437                 write_dispatcher_->WriteMessage(buffer, bytes_to_write,
438                                                 NULL, 0,
439                                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
440       *bytes_written_ += bytes_to_write;
441     }
442
443     // Write one last "quit" message.
444     EXPECT_EQ(MOJO_RESULT_OK,
445               write_dispatcher_->WriteMessage("quit", 4, NULL, 0,
446                                               MOJO_WRITE_MESSAGE_FLAG_NONE));
447   }
448
449   const scoped_refptr<Dispatcher> write_dispatcher_;
450   size_t* const messages_written_;
451   size_t* const bytes_written_;
452
453   DISALLOW_COPY_AND_ASSIGN(WriterThread);
454 };
455
456 class ReaderThread : public base::SimpleThread {
457  public:
458   // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
459   ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
460                size_t* messages_read, size_t* bytes_read)
461       : base::SimpleThread("reader_thread"),
462         read_dispatcher_(read_dispatcher),
463         messages_read_(messages_read),
464         bytes_read_(bytes_read) {
465     *messages_read_ = 0;
466     *bytes_read_ = 0;
467   }
468
469   virtual ~ReaderThread() {
470     Join();
471   }
472
473  private:
474   virtual void Run() OVERRIDE {
475     unsigned char buffer[kMaxMessageSize];
476     MojoResult result;
477     Waiter w;
478
479     // Read messages.
480     for (;;) {
481       // Wait for it to be readable.
482       w.Init();
483       result = read_dispatcher_->AddWaiter(&w, MOJO_WAIT_FLAG_READABLE, 0);
484       EXPECT_TRUE(result == MOJO_RESULT_OK ||
485                   result == MOJO_RESULT_ALREADY_EXISTS) << "result: " << result;
486       if (result == MOJO_RESULT_OK) {
487         // Actually need to wait.
488         EXPECT_EQ(0, w.Wait(MOJO_DEADLINE_INDEFINITE));
489         read_dispatcher_->RemoveWaiter(&w);
490       }
491
492       // Now, try to do the read.
493       // Clear the buffer so that we can check the result.
494       memset(buffer, 0, sizeof(buffer));
495       uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
496       result = read_dispatcher_->ReadMessage(buffer, &buffer_size, NULL, NULL,
497                                              MOJO_READ_MESSAGE_FLAG_NONE);
498       EXPECT_TRUE(result == MOJO_RESULT_OK ||
499                   result == MOJO_RESULT_NOT_FOUND) << "result: " << result;
500       // We're racing with others to read, so maybe we failed.
501       if (result == MOJO_RESULT_NOT_FOUND)
502         continue;  // In which case, try again.
503       // Check for quit.
504       if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
505         return;
506       EXPECT_GE(buffer_size, 1u);
507       EXPECT_LE(buffer_size, kMaxMessageSize);
508       EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
509
510       (*messages_read_)++;
511       *bytes_read_ += buffer_size;
512     }
513   }
514
515   static bool IsValidMessage(const unsigned char* buffer,
516                              uint32_t message_size) {
517     size_t i;
518     for (i = 0; i < message_size; i++) {
519       if (buffer[i] != static_cast<unsigned char>(i))
520         return false;
521     }
522     // Check that the remaining bytes weren't stomped on.
523     for (; i < kMaxMessageSize; i++) {
524       if (buffer[i] != 0)
525         return false;
526     }
527     return true;
528   }
529
530   const scoped_refptr<Dispatcher> read_dispatcher_;
531   size_t* const messages_read_;
532   size_t* const bytes_read_;
533
534   DISALLOW_COPY_AND_ASSIGN(ReaderThread);
535 };
536
537 TEST(MessagePipeDispatcherTest, Stress) {
538   static const size_t kNumWriters = 30;
539   static const size_t kNumReaders = kNumWriters;
540
541   scoped_refptr<MessagePipeDispatcher> d_write(new MessagePipeDispatcher());
542   scoped_refptr<MessagePipeDispatcher> d_read(new MessagePipeDispatcher());
543   {
544     scoped_refptr<MessagePipe> mp(new MessagePipe());
545     d_write->Init(mp, 0);
546     d_read->Init(mp, 1);
547   }
548
549   size_t messages_written[kNumWriters];
550   size_t bytes_written[kNumWriters];
551   size_t messages_read[kNumReaders];
552   size_t bytes_read[kNumReaders];
553   {
554     // Make writers.
555     ScopedVector<WriterThread> writers;
556     for (size_t i = 0; i < kNumWriters; i++) {
557       writers.push_back(
558           new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
559     }
560
561     // Make readers.
562     ScopedVector<ReaderThread> readers;
563     for (size_t i = 0; i < kNumReaders; i++) {
564       readers.push_back(
565           new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
566     }
567
568     // Start writers.
569     for (size_t i = 0; i < kNumWriters; i++)
570       writers[i]->Start();
571
572     // Start readers.
573     for (size_t i = 0; i < kNumReaders; i++)
574       readers[i]->Start();
575
576     // TODO(vtl): Maybe I should have an event that triggers all the threads to
577     // start doing stuff for real (so that the first ones created/started aren't
578     // advantaged).
579   }  // Joins all the threads.
580
581   size_t total_messages_written = 0;
582   size_t total_bytes_written = 0;
583   for (size_t i = 0; i < kNumWriters; i++) {
584     total_messages_written += messages_written[i];
585     total_bytes_written += bytes_written[i];
586   }
587   size_t total_messages_read = 0;
588   size_t total_bytes_read = 0;
589   for (size_t i = 0; i < kNumReaders; i++) {
590     total_messages_read += messages_read[i];
591     total_bytes_read += bytes_read[i];
592     // We'd have to be really unlucky to have read no messages on a thread.
593     EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
594     EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
595   }
596   EXPECT_EQ(total_messages_written, total_messages_read);
597   EXPECT_EQ(total_bytes_written, total_bytes_read);
598
599   EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
600   EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
601 }
602
603 }  // namespace
604 }  // namespace system
605 }  // namespace mojo