Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / mojo / system / remote_message_pipe_unittest.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/files/file_path.h"
13 #include "base/files/file_util.h"
14 #include "base/files/scoped_file.h"
15 #include "base/files/scoped_temp_dir.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/test/test_io_thread.h"
21 #include "base/threading/platform_thread.h"  // For |Sleep()|.
22 #include "build/build_config.h"              // TODO(vtl): Remove this.
23 #include "mojo/common/test/test_utils.h"
24 #include "mojo/embedder/platform_channel_pair.h"
25 #include "mojo/embedder/platform_shared_buffer.h"
26 #include "mojo/embedder/scoped_platform_handle.h"
27 #include "mojo/embedder/simple_platform_support.h"
28 #include "mojo/system/channel.h"
29 #include "mojo/system/channel_endpoint.h"
30 #include "mojo/system/message_pipe.h"
31 #include "mojo/system/message_pipe_dispatcher.h"
32 #include "mojo/system/platform_handle_dispatcher.h"
33 #include "mojo/system/raw_channel.h"
34 #include "mojo/system/shared_buffer_dispatcher.h"
35 #include "mojo/system/test_utils.h"
36 #include "mojo/system/waiter.h"
37 #include "testing/gtest/include/gtest/gtest.h"
38
39 namespace mojo {
40 namespace system {
41 namespace {
42
43 class RemoteMessagePipeTest : public testing::Test {
44  public:
45   RemoteMessagePipeTest() : io_thread_(base::TestIOThread::kAutoStart) {}
46   virtual ~RemoteMessagePipeTest() {}
47
48   virtual void SetUp() OVERRIDE {
49     io_thread_.PostTaskAndWait(
50         FROM_HERE,
51         base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
52                    base::Unretained(this)));
53   }
54
55   virtual void TearDown() OVERRIDE {
56     io_thread_.PostTaskAndWait(
57         FROM_HERE,
58         base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
59                    base::Unretained(this)));
60   }
61
62  protected:
63   // This connects the two given |ChannelEndpoint|s.
64   void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
65                                scoped_refptr<ChannelEndpoint> ep1) {
66     io_thread_.PostTaskAndWait(
67         FROM_HERE,
68         base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
69                    base::Unretained(this),
70                    ep0,
71                    ep1));
72   }
73
74   // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
75   // that this is the bootstrap case, i.e., that the endpoint IDs are both/will
76   // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
77   // it to finish connecting.
78   void BootstrapChannelEndpointNoWait(unsigned channel_index,
79                                       scoped_refptr<ChannelEndpoint> ep) {
80     io_thread_.PostTask(
81         FROM_HERE,
82         base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
83                    base::Unretained(this),
84                    channel_index,
85                    ep));
86   }
87
88   void RestoreInitialState() {
89     io_thread_.PostTaskAndWait(
90         FROM_HERE,
91         base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
92                    base::Unretained(this)));
93   }
94
95   embedder::PlatformSupport* platform_support() { return &platform_support_; }
96   base::TestIOThread* io_thread() { return &io_thread_; }
97
98  private:
99   void SetUpOnIOThread() {
100     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
101
102     embedder::PlatformChannelPair channel_pair;
103     platform_handles_[0] = channel_pair.PassServerHandle();
104     platform_handles_[1] = channel_pair.PassClientHandle();
105   }
106
107   void TearDownOnIOThread() {
108     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
109
110     if (channels_[0].get()) {
111       channels_[0]->Shutdown();
112       channels_[0] = nullptr;
113     }
114     if (channels_[1].get()) {
115       channels_[1]->Shutdown();
116       channels_[1] = nullptr;
117     }
118   }
119
120   void CreateAndInitChannel(unsigned channel_index) {
121     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
122     CHECK(channel_index == 0 || channel_index == 1);
123     CHECK(!channels_[channel_index].get());
124
125     channels_[channel_index] = new Channel(&platform_support_);
126     CHECK(channels_[channel_index]->Init(
127         RawChannel::Create(platform_handles_[channel_index].Pass())));
128   }
129
130   void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
131                                          scoped_refptr<ChannelEndpoint> ep1) {
132     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
133
134     if (!channels_[0].get())
135       CreateAndInitChannel(0);
136     if (!channels_[1].get())
137       CreateAndInitChannel(1);
138
139     MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
140     MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);
141
142     CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
143     CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
144   }
145
146   void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
147                                           scoped_refptr<ChannelEndpoint> ep) {
148     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
149     CHECK(channel_index == 0 || channel_index == 1);
150
151     CreateAndInitChannel(channel_index);
152     MessageInTransit::EndpointId endpoint_id =
153         channels_[channel_index]->AttachEndpoint(ep);
154     if (endpoint_id == MessageInTransit::kInvalidEndpointId)
155       return;
156
157     CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
158     CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
159         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
160   }
161
162   void RestoreInitialStateOnIOThread() {
163     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
164
165     TearDownOnIOThread();
166     SetUpOnIOThread();
167   }
168
169   embedder::SimplePlatformSupport platform_support_;
170   base::TestIOThread io_thread_;
171   embedder::ScopedPlatformHandle platform_handles_[2];
172   scoped_refptr<Channel> channels_[2];
173
174   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
175 };
176
177 TEST_F(RemoteMessagePipeTest, Basic) {
178   static const char kHello[] = "hello";
179   static const char kWorld[] = "world!!!1!!!1!";
180   char buffer[100] = {0};
181   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
182   Waiter waiter;
183   HandleSignalsState hss;
184   uint32_t context = 0;
185
186   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
187   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
188   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
189
190   scoped_refptr<ChannelEndpoint> ep0;
191   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
192   scoped_refptr<ChannelEndpoint> ep1;
193   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
194   ConnectChannelEndpoints(ep0, ep1);
195
196   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
197
198   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
199   // it later, it might already be readable.)
200   waiter.Init();
201   ASSERT_EQ(
202       MOJO_RESULT_OK,
203       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
204
205   // Write to MP 0, port 0.
206   EXPECT_EQ(MOJO_RESULT_OK,
207             mp0->WriteMessage(0,
208                               UserPointer<const void>(kHello),
209                               sizeof(kHello),
210                               nullptr,
211                               MOJO_WRITE_MESSAGE_FLAG_NONE));
212
213   // Wait.
214   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
215   EXPECT_EQ(123u, context);
216   hss = HandleSignalsState();
217   mp1->RemoveWaiter(1, &waiter, &hss);
218   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
219             hss.satisfied_signals);
220   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
221             hss.satisfiable_signals);
222
223   // Read from MP 1, port 1.
224   EXPECT_EQ(MOJO_RESULT_OK,
225             mp1->ReadMessage(1,
226                              UserPointer<void>(buffer),
227                              MakeUserPointer(&buffer_size),
228                              nullptr,
229                              nullptr,
230                              MOJO_READ_MESSAGE_FLAG_NONE));
231   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
232   EXPECT_STREQ(kHello, buffer);
233
234   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
235
236   waiter.Init();
237   ASSERT_EQ(
238       MOJO_RESULT_OK,
239       mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
240
241   EXPECT_EQ(MOJO_RESULT_OK,
242             mp1->WriteMessage(1,
243                               UserPointer<const void>(kWorld),
244                               sizeof(kWorld),
245                               nullptr,
246                               MOJO_WRITE_MESSAGE_FLAG_NONE));
247
248   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
249   EXPECT_EQ(456u, context);
250   hss = HandleSignalsState();
251   mp0->RemoveWaiter(0, &waiter, &hss);
252   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
253             hss.satisfied_signals);
254   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
255             hss.satisfiable_signals);
256
257   buffer_size = static_cast<uint32_t>(sizeof(buffer));
258   EXPECT_EQ(MOJO_RESULT_OK,
259             mp0->ReadMessage(0,
260                              UserPointer<void>(buffer),
261                              MakeUserPointer(&buffer_size),
262                              nullptr,
263                              nullptr,
264                              MOJO_READ_MESSAGE_FLAG_NONE));
265   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
266   EXPECT_STREQ(kWorld, buffer);
267
268   // Close MP 0, port 0.
269   mp0->Close(0);
270
271   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
272   // when it realizes that MP 0, port 0 has been closed. (It may also fail
273   // immediately.)
274   waiter.Init();
275   hss = HandleSignalsState();
276   MojoResult result =
277       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
278   if (result == MOJO_RESULT_OK) {
279     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
280               waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
281     EXPECT_EQ(789u, context);
282     hss = HandleSignalsState();
283     mp1->RemoveWaiter(1, &waiter, &hss);
284   }
285   EXPECT_EQ(0u, hss.satisfied_signals);
286   EXPECT_EQ(0u, hss.satisfiable_signals);
287
288   // And MP 1, port 1.
289   mp1->Close(1);
290 }
291
292 TEST_F(RemoteMessagePipeTest, Multiplex) {
293   static const char kHello[] = "hello";
294   static const char kWorld[] = "world!!!1!!!1!";
295   char buffer[100] = {0};
296   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
297   Waiter waiter;
298   HandleSignalsState hss;
299   uint32_t context = 0;
300
301   // Connect message pipes as in the |Basic| test.
302
303   scoped_refptr<ChannelEndpoint> ep0;
304   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
305   scoped_refptr<ChannelEndpoint> ep1;
306   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
307   ConnectChannelEndpoints(ep0, ep1);
308
309   // Now put another message pipe on the channel.
310
311   scoped_refptr<ChannelEndpoint> ep2;
312   scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
313   scoped_refptr<ChannelEndpoint> ep3;
314   scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
315   ConnectChannelEndpoints(ep2, ep3);
316
317   // Write: MP 2, port 0 -> MP 3, port 1.
318
319   waiter.Init();
320   ASSERT_EQ(
321       MOJO_RESULT_OK,
322       mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
323
324   EXPECT_EQ(MOJO_RESULT_OK,
325             mp2->WriteMessage(0,
326                               UserPointer<const void>(kHello),
327                               sizeof(kHello),
328                               nullptr,
329                               MOJO_WRITE_MESSAGE_FLAG_NONE));
330
331   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
332   EXPECT_EQ(789u, context);
333   hss = HandleSignalsState();
334   mp3->RemoveWaiter(1, &waiter, &hss);
335   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
336             hss.satisfied_signals);
337   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
338             hss.satisfiable_signals);
339
340   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
341   buffer_size = static_cast<uint32_t>(sizeof(buffer));
342   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
343             mp0->ReadMessage(0,
344                              UserPointer<void>(buffer),
345                              MakeUserPointer(&buffer_size),
346                              nullptr,
347                              nullptr,
348                              MOJO_READ_MESSAGE_FLAG_NONE));
349   buffer_size = static_cast<uint32_t>(sizeof(buffer));
350   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
351             mp1->ReadMessage(1,
352                              UserPointer<void>(buffer),
353                              MakeUserPointer(&buffer_size),
354                              nullptr,
355                              nullptr,
356                              MOJO_READ_MESSAGE_FLAG_NONE));
357   buffer_size = static_cast<uint32_t>(sizeof(buffer));
358   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
359             mp2->ReadMessage(0,
360                              UserPointer<void>(buffer),
361                              MakeUserPointer(&buffer_size),
362                              nullptr,
363                              nullptr,
364                              MOJO_READ_MESSAGE_FLAG_NONE));
365
366   // Read from MP 3, port 1.
367   buffer_size = static_cast<uint32_t>(sizeof(buffer));
368   EXPECT_EQ(MOJO_RESULT_OK,
369             mp3->ReadMessage(1,
370                              UserPointer<void>(buffer),
371                              MakeUserPointer(&buffer_size),
372                              nullptr,
373                              nullptr,
374                              MOJO_READ_MESSAGE_FLAG_NONE));
375   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
376   EXPECT_STREQ(kHello, buffer);
377
378   // Write: MP 0, port 0 -> MP 1, port 1 again.
379
380   waiter.Init();
381   ASSERT_EQ(
382       MOJO_RESULT_OK,
383       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
384
385   EXPECT_EQ(MOJO_RESULT_OK,
386             mp0->WriteMessage(0,
387                               UserPointer<const void>(kWorld),
388                               sizeof(kWorld),
389                               nullptr,
390                               MOJO_WRITE_MESSAGE_FLAG_NONE));
391
392   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
393   EXPECT_EQ(123u, context);
394   hss = HandleSignalsState();
395   mp1->RemoveWaiter(1, &waiter, &hss);
396   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
397             hss.satisfied_signals);
398   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
399             hss.satisfiable_signals);
400
401   // Make sure there's nothing on the other ports.
402   buffer_size = static_cast<uint32_t>(sizeof(buffer));
403   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
404             mp0->ReadMessage(0,
405                              UserPointer<void>(buffer),
406                              MakeUserPointer(&buffer_size),
407                              nullptr,
408                              nullptr,
409                              MOJO_READ_MESSAGE_FLAG_NONE));
410   buffer_size = static_cast<uint32_t>(sizeof(buffer));
411   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
412             mp2->ReadMessage(0,
413                              UserPointer<void>(buffer),
414                              MakeUserPointer(&buffer_size),
415                              nullptr,
416                              nullptr,
417                              MOJO_READ_MESSAGE_FLAG_NONE));
418   buffer_size = static_cast<uint32_t>(sizeof(buffer));
419   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
420             mp3->ReadMessage(1,
421                              UserPointer<void>(buffer),
422                              MakeUserPointer(&buffer_size),
423                              nullptr,
424                              nullptr,
425                              MOJO_READ_MESSAGE_FLAG_NONE));
426
427   buffer_size = static_cast<uint32_t>(sizeof(buffer));
428   EXPECT_EQ(MOJO_RESULT_OK,
429             mp1->ReadMessage(1,
430                              UserPointer<void>(buffer),
431                              MakeUserPointer(&buffer_size),
432                              nullptr,
433                              nullptr,
434                              MOJO_READ_MESSAGE_FLAG_NONE));
435   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
436   EXPECT_STREQ(kWorld, buffer);
437
438   mp0->Close(0);
439   mp1->Close(1);
440   mp2->Close(0);
441   mp3->Close(1);
442 }
443
444 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
445   static const char kHello[] = "hello";
446   char buffer[100] = {0};
447   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
448   Waiter waiter;
449   HandleSignalsState hss;
450   uint32_t context = 0;
451
452   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
453   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
454   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
455
456   scoped_refptr<ChannelEndpoint> ep0;
457   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
458
459   // Write to MP 0, port 0.
460   EXPECT_EQ(MOJO_RESULT_OK,
461             mp0->WriteMessage(0,
462                               UserPointer<const void>(kHello),
463                               sizeof(kHello),
464                               nullptr,
465                               MOJO_WRITE_MESSAGE_FLAG_NONE));
466
467   BootstrapChannelEndpointNoWait(0, ep0);
468
469   // Close MP 0, port 0 before channel 1 is even connected.
470   mp0->Close(0);
471
472   scoped_refptr<ChannelEndpoint> ep1;
473   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
474
475   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
476   // it later, it might already be readable.)
477   waiter.Init();
478   ASSERT_EQ(
479       MOJO_RESULT_OK,
480       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
481
482   BootstrapChannelEndpointNoWait(1, ep1);
483
484   // Wait.
485   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
486   EXPECT_EQ(123u, context);
487   hss = HandleSignalsState();
488   // Note: MP 1, port 1 should definitely should be readable, but it may or may
489   // not appear as writable (there's a race, and it may not have noticed that
490   // the other side was closed yet -- e.g., inserting a sleep here would make it
491   // much more likely to notice that it's no longer writable).
492   mp1->RemoveWaiter(1, &waiter, &hss);
493   EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
494   EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
495
496   // Read from MP 1, port 1.
497   EXPECT_EQ(MOJO_RESULT_OK,
498             mp1->ReadMessage(1,
499                              UserPointer<void>(buffer),
500                              MakeUserPointer(&buffer_size),
501                              nullptr,
502                              nullptr,
503                              MOJO_READ_MESSAGE_FLAG_NONE));
504   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
505   EXPECT_STREQ(kHello, buffer);
506
507   // And MP 1, port 1.
508   mp1->Close(1);
509 }
510
511 TEST_F(RemoteMessagePipeTest, HandlePassing) {
512   static const char kHello[] = "hello";
513   Waiter waiter;
514   HandleSignalsState hss;
515   uint32_t context = 0;
516
517   scoped_refptr<ChannelEndpoint> ep0;
518   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
519   scoped_refptr<ChannelEndpoint> ep1;
520   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
521   ConnectChannelEndpoints(ep0, ep1);
522
523   // We'll try to pass this dispatcher.
524   scoped_refptr<MessagePipeDispatcher> dispatcher(
525       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
526   scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
527   dispatcher->Init(local_mp, 0);
528
529   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
530   // it later, it might already be readable.)
531   waiter.Init();
532   ASSERT_EQ(
533       MOJO_RESULT_OK,
534       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
535
536   // Write to MP 0, port 0.
537   {
538     DispatcherTransport transport(
539         test::DispatcherTryStartTransport(dispatcher.get()));
540     EXPECT_TRUE(transport.is_valid());
541
542     std::vector<DispatcherTransport> transports;
543     transports.push_back(transport);
544     EXPECT_EQ(MOJO_RESULT_OK,
545               mp0->WriteMessage(0,
546                                 UserPointer<const void>(kHello),
547                                 sizeof(kHello),
548                                 &transports,
549                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
550     transport.End();
551
552     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
553     // |dispatcher| is destroyed.
554     EXPECT_TRUE(dispatcher->HasOneRef());
555     dispatcher = nullptr;
556   }
557
558   // Wait.
559   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
560   EXPECT_EQ(123u, context);
561   hss = HandleSignalsState();
562   mp1->RemoveWaiter(1, &waiter, &hss);
563   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
564             hss.satisfied_signals);
565   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
566             hss.satisfiable_signals);
567
568   // Read from MP 1, port 1.
569   char read_buffer[100] = {0};
570   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
571   DispatcherVector read_dispatchers;
572   uint32_t read_num_dispatchers = 10;  // Maximum to get.
573   EXPECT_EQ(MOJO_RESULT_OK,
574             mp1->ReadMessage(1,
575                              UserPointer<void>(read_buffer),
576                              MakeUserPointer(&read_buffer_size),
577                              &read_dispatchers,
578                              &read_num_dispatchers,
579                              MOJO_READ_MESSAGE_FLAG_NONE));
580   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
581   EXPECT_STREQ(kHello, read_buffer);
582   EXPECT_EQ(1u, read_dispatchers.size());
583   EXPECT_EQ(1u, read_num_dispatchers);
584   ASSERT_TRUE(read_dispatchers[0].get());
585   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
586
587   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
588   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
589
590   // Add the waiter now, before it becomes readable to avoid a race.
591   waiter.Init();
592   ASSERT_EQ(MOJO_RESULT_OK,
593             dispatcher->AddWaiter(
594                 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
595
596   // Write to "local_mp", port 1.
597   EXPECT_EQ(MOJO_RESULT_OK,
598             local_mp->WriteMessage(1,
599                                    UserPointer<const void>(kHello),
600                                    sizeof(kHello),
601                                    nullptr,
602                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
603
604   // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
605   // here. (We don't crash if I sleep and then close.)
606
607   // Wait for the dispatcher to become readable.
608   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
609   EXPECT_EQ(456u, context);
610   hss = HandleSignalsState();
611   dispatcher->RemoveWaiter(&waiter, &hss);
612   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
613             hss.satisfied_signals);
614   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
615             hss.satisfiable_signals);
616
617   // Read from the dispatcher.
618   memset(read_buffer, 0, sizeof(read_buffer));
619   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
620   EXPECT_EQ(MOJO_RESULT_OK,
621             dispatcher->ReadMessage(UserPointer<void>(read_buffer),
622                                     MakeUserPointer(&read_buffer_size),
623                                     0,
624                                     nullptr,
625                                     MOJO_READ_MESSAGE_FLAG_NONE));
626   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
627   EXPECT_STREQ(kHello, read_buffer);
628
629   // Prepare to wait on "local_mp", port 1.
630   waiter.Init();
631   ASSERT_EQ(MOJO_RESULT_OK,
632             local_mp->AddWaiter(
633                 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
634
635   // Write to the dispatcher.
636   EXPECT_EQ(MOJO_RESULT_OK,
637             dispatcher->WriteMessage(UserPointer<const void>(kHello),
638                                      sizeof(kHello),
639                                      nullptr,
640                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
641
642   // Wait.
643   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
644   EXPECT_EQ(789u, context);
645   hss = HandleSignalsState();
646   local_mp->RemoveWaiter(1, &waiter, &hss);
647   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
648             hss.satisfied_signals);
649   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
650             hss.satisfiable_signals);
651
652   // Read from "local_mp", port 1.
653   memset(read_buffer, 0, sizeof(read_buffer));
654   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
655   EXPECT_EQ(MOJO_RESULT_OK,
656             local_mp->ReadMessage(1,
657                                   UserPointer<void>(read_buffer),
658                                   MakeUserPointer(&read_buffer_size),
659                                   nullptr,
660                                   nullptr,
661                                   MOJO_READ_MESSAGE_FLAG_NONE));
662   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
663   EXPECT_STREQ(kHello, read_buffer);
664
665   // TODO(vtl): Also test that messages queued up before the handle was sent are
666   // delivered properly.
667
668   // Close everything that belongs to us.
669   mp0->Close(0);
670   mp1->Close(1);
671   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
672   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
673   local_mp->Close(1);
674 }
675
676 #if defined(OS_POSIX)
677 #define MAYBE_SharedBufferPassing SharedBufferPassing
678 #else
679 // Not yet implemented (on Windows).
680 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
681 #endif
682 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
683   static const char kHello[] = "hello";
684   Waiter waiter;
685   HandleSignalsState hss;
686   uint32_t context = 0;
687
688   scoped_refptr<ChannelEndpoint> ep0;
689   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
690   scoped_refptr<ChannelEndpoint> ep1;
691   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
692   ConnectChannelEndpoints(ep0, ep1);
693
694   // We'll try to pass this dispatcher.
695   scoped_refptr<SharedBufferDispatcher> dispatcher;
696   EXPECT_EQ(MOJO_RESULT_OK,
697             SharedBufferDispatcher::Create(
698                 platform_support(),
699                 SharedBufferDispatcher::kDefaultCreateOptions,
700                 100,
701                 &dispatcher));
702   ASSERT_TRUE(dispatcher.get());
703
704   // Make a mapping.
705   scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
706   EXPECT_EQ(
707       MOJO_RESULT_OK,
708       dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
709   ASSERT_TRUE(mapping0);
710   ASSERT_TRUE(mapping0->GetBase());
711   ASSERT_EQ(100u, mapping0->GetLength());
712   static_cast<char*>(mapping0->GetBase())[0] = 'A';
713   static_cast<char*>(mapping0->GetBase())[50] = 'B';
714   static_cast<char*>(mapping0->GetBase())[99] = 'C';
715
716   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
717   // it later, it might already be readable.)
718   waiter.Init();
719   ASSERT_EQ(
720       MOJO_RESULT_OK,
721       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
722
723   // Write to MP 0, port 0.
724   {
725     DispatcherTransport transport(
726         test::DispatcherTryStartTransport(dispatcher.get()));
727     EXPECT_TRUE(transport.is_valid());
728
729     std::vector<DispatcherTransport> transports;
730     transports.push_back(transport);
731     EXPECT_EQ(MOJO_RESULT_OK,
732               mp0->WriteMessage(0,
733                                 UserPointer<const void>(kHello),
734                                 sizeof(kHello),
735                                 &transports,
736                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
737     transport.End();
738
739     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
740     // |dispatcher| is destroyed.
741     EXPECT_TRUE(dispatcher->HasOneRef());
742     dispatcher = nullptr;
743   }
744
745   // Wait.
746   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
747   EXPECT_EQ(123u, context);
748   hss = HandleSignalsState();
749   mp1->RemoveWaiter(1, &waiter, &hss);
750   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
751             hss.satisfied_signals);
752   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
753             hss.satisfiable_signals);
754
755   // Read from MP 1, port 1.
756   char read_buffer[100] = {0};
757   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
758   DispatcherVector read_dispatchers;
759   uint32_t read_num_dispatchers = 10;  // Maximum to get.
760   EXPECT_EQ(MOJO_RESULT_OK,
761             mp1->ReadMessage(1,
762                              UserPointer<void>(read_buffer),
763                              MakeUserPointer(&read_buffer_size),
764                              &read_dispatchers,
765                              &read_num_dispatchers,
766                              MOJO_READ_MESSAGE_FLAG_NONE));
767   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
768   EXPECT_STREQ(kHello, read_buffer);
769   EXPECT_EQ(1u, read_dispatchers.size());
770   EXPECT_EQ(1u, read_num_dispatchers);
771   ASSERT_TRUE(read_dispatchers[0].get());
772   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
773
774   EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
775   dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
776
777   // Make another mapping.
778   scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
779   EXPECT_EQ(
780       MOJO_RESULT_OK,
781       dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
782   ASSERT_TRUE(mapping1);
783   ASSERT_TRUE(mapping1->GetBase());
784   ASSERT_EQ(100u, mapping1->GetLength());
785   EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
786   EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
787   EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
788   EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
789
790   // Write stuff either way.
791   static_cast<char*>(mapping1->GetBase())[1] = 'x';
792   EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
793   static_cast<char*>(mapping0->GetBase())[2] = 'y';
794   EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
795
796   // Kill the first mapping; the second should still be valid.
797   mapping0.reset();
798   EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
799
800   // Close everything that belongs to us.
801   mp0->Close(0);
802   mp1->Close(1);
803   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
804
805   // The second mapping should still be good.
806   EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
807 }
808
809 #if defined(OS_POSIX)
810 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
811 #else
812 // Not yet implemented (on Windows).
813 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
814 #endif
815 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
816   base::ScopedTempDir temp_dir;
817   ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
818
819   static const char kHello[] = "hello";
820   static const char kWorld[] = "world";
821   Waiter waiter;
822   uint32_t context = 0;
823   HandleSignalsState hss;
824
825   scoped_refptr<ChannelEndpoint> ep0;
826   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
827   scoped_refptr<ChannelEndpoint> ep1;
828   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
829   ConnectChannelEndpoints(ep0, ep1);
830
831   base::FilePath unused;
832   base::ScopedFILE fp(
833       CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
834   EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
835   // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
836   // be passed.
837   scoped_refptr<PlatformHandleDispatcher> dispatcher(
838       new PlatformHandleDispatcher(
839           mojo::test::PlatformHandleFromFILE(fp.Pass())));
840
841   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
842   // it later, it might already be readable.)
843   waiter.Init();
844   ASSERT_EQ(
845       MOJO_RESULT_OK,
846       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
847
848   // Write to MP 0, port 0.
849   {
850     DispatcherTransport transport(
851         test::DispatcherTryStartTransport(dispatcher.get()));
852     EXPECT_TRUE(transport.is_valid());
853
854     std::vector<DispatcherTransport> transports;
855     transports.push_back(transport);
856     EXPECT_EQ(MOJO_RESULT_OK,
857               mp0->WriteMessage(0,
858                                 UserPointer<const void>(kWorld),
859                                 sizeof(kWorld),
860                                 &transports,
861                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
862     transport.End();
863
864     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
865     // |dispatcher| is destroyed.
866     EXPECT_TRUE(dispatcher->HasOneRef());
867     dispatcher = nullptr;
868   }
869
870   // Wait.
871   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
872   EXPECT_EQ(123u, context);
873   hss = HandleSignalsState();
874   mp1->RemoveWaiter(1, &waiter, &hss);
875   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
876             hss.satisfied_signals);
877   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
878             hss.satisfiable_signals);
879
880   // Read from MP 1, port 1.
881   char read_buffer[100] = {0};
882   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
883   DispatcherVector read_dispatchers;
884   uint32_t read_num_dispatchers = 10;  // Maximum to get.
885   EXPECT_EQ(MOJO_RESULT_OK,
886             mp1->ReadMessage(1,
887                              UserPointer<void>(read_buffer),
888                              MakeUserPointer(&read_buffer_size),
889                              &read_dispatchers,
890                              &read_num_dispatchers,
891                              MOJO_READ_MESSAGE_FLAG_NONE));
892   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
893   EXPECT_STREQ(kWorld, read_buffer);
894   EXPECT_EQ(1u, read_dispatchers.size());
895   EXPECT_EQ(1u, read_num_dispatchers);
896   ASSERT_TRUE(read_dispatchers[0].get());
897   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
898
899   EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
900   dispatcher =
901       static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
902
903   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
904   EXPECT_TRUE(h.is_valid());
905
906   fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
907   EXPECT_FALSE(h.is_valid());
908   EXPECT_TRUE(fp);
909
910   rewind(fp.get());
911   memset(read_buffer, 0, sizeof(read_buffer));
912   EXPECT_EQ(sizeof(kHello),
913             fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
914   EXPECT_STREQ(kHello, read_buffer);
915
916   // Close everything that belongs to us.
917   mp0->Close(0);
918   mp1->Close(1);
919   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
920 }
921
922 // Test racing closes (on each end).
923 // Note: A flaky failure would almost certainly indicate a problem in the code
924 // itself (not in the test). Also, any logged warnings/errors would also
925 // probably be indicative of bugs.
926 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
927   base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
928
929   for (unsigned i = 0; i < 256; i++) {
930     DVLOG(2) << "---------------------------------------- " << i;
931     scoped_refptr<ChannelEndpoint> ep0;
932     scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
933     BootstrapChannelEndpointNoWait(0, ep0);
934
935     scoped_refptr<ChannelEndpoint> ep1;
936     scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
937     BootstrapChannelEndpointNoWait(1, ep1);
938
939     if (i & 1u) {
940       io_thread()->task_runner()->PostTask(
941           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
942     }
943     if (i & 2u)
944       base::PlatformThread::Sleep(delay);
945
946     mp0->Close(0);
947
948     if (i & 4u) {
949       io_thread()->task_runner()->PostTask(
950           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
951     }
952     if (i & 8u)
953       base::PlatformThread::Sleep(delay);
954
955     mp1->Close(1);
956
957     RestoreInitialState();
958   }
959 }
960
961 // Tests passing an end of a message pipe over a remote message pipe, and then
962 // passing that end back.
963 // TODO(vtl): Also test passing a message pipe across two remote message pipes.
964 TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
965   static const char kHello[] = "hello";
966   static const char kWorld[] = "world";
967   Waiter waiter;
968   HandleSignalsState hss;
969   uint32_t context = 0;
970
971   scoped_refptr<ChannelEndpoint> ep0;
972   scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
973   scoped_refptr<ChannelEndpoint> ep1;
974   scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
975   ConnectChannelEndpoints(ep0, ep1);
976
977   // We'll try to pass this dispatcher.
978   scoped_refptr<MessagePipeDispatcher> dispatcher(
979       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
980   scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
981   dispatcher->Init(local_mp, 0);
982
983   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
984   // it later, it might already be readable.)
985   waiter.Init();
986   ASSERT_EQ(
987       MOJO_RESULT_OK,
988       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));
989
990   // Write to MP 0, port 0.
991   {
992     DispatcherTransport transport(
993         test::DispatcherTryStartTransport(dispatcher.get()));
994     EXPECT_TRUE(transport.is_valid());
995
996     std::vector<DispatcherTransport> transports;
997     transports.push_back(transport);
998     EXPECT_EQ(MOJO_RESULT_OK,
999               mp0->WriteMessage(0,
1000                                 UserPointer<const void>(kHello),
1001                                 sizeof(kHello),
1002                                 &transports,
1003                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
1004     transport.End();
1005
1006     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
1007     // |dispatcher| is destroyed.
1008     EXPECT_TRUE(dispatcher->HasOneRef());
1009     dispatcher = nullptr;
1010   }
1011
1012   // Wait.
1013   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1014   EXPECT_EQ(123u, context);
1015   hss = HandleSignalsState();
1016   mp1->RemoveWaiter(1, &waiter, &hss);
1017   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1018             hss.satisfied_signals);
1019   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1020             hss.satisfiable_signals);
1021
1022   // Read from MP 1, port 1.
1023   char read_buffer[100] = {0};
1024   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1025   DispatcherVector read_dispatchers;
1026   uint32_t read_num_dispatchers = 10;  // Maximum to get.
1027   EXPECT_EQ(MOJO_RESULT_OK,
1028             mp1->ReadMessage(1,
1029                              UserPointer<void>(read_buffer),
1030                              MakeUserPointer(&read_buffer_size),
1031                              &read_dispatchers,
1032                              &read_num_dispatchers,
1033                              MOJO_READ_MESSAGE_FLAG_NONE));
1034   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1035   EXPECT_STREQ(kHello, read_buffer);
1036   EXPECT_EQ(1u, read_dispatchers.size());
1037   EXPECT_EQ(1u, read_num_dispatchers);
1038   ASSERT_TRUE(read_dispatchers[0].get());
1039   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
1040
1041   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
1042   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
1043   read_dispatchers.clear();
1044
1045   // Now pass it back.
1046
1047   // Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do
1048   // it later, it might already be readable.)
1049   waiter.Init();
1050   ASSERT_EQ(
1051       MOJO_RESULT_OK,
1052       mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));
1053
1054   // Write to MP 1, port 1.
1055   {
1056     DispatcherTransport transport(
1057         test::DispatcherTryStartTransport(dispatcher.get()));
1058     EXPECT_TRUE(transport.is_valid());
1059
1060     std::vector<DispatcherTransport> transports;
1061     transports.push_back(transport);
1062     EXPECT_EQ(MOJO_RESULT_OK,
1063               mp1->WriteMessage(1,
1064                                 UserPointer<const void>(kWorld),
1065                                 sizeof(kWorld),
1066                                 &transports,
1067                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
1068     transport.End();
1069
1070     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
1071     // |dispatcher| is destroyed.
1072     EXPECT_TRUE(dispatcher->HasOneRef());
1073     dispatcher = nullptr;
1074   }
1075
1076   // Wait.
1077   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1078   EXPECT_EQ(456u, context);
1079   hss = HandleSignalsState();
1080   mp0->RemoveWaiter(0, &waiter, &hss);
1081   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1082             hss.satisfied_signals);
1083   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1084             hss.satisfiable_signals);
1085
1086   // Read from MP 0, port 0.
1087   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1088   read_num_dispatchers = 10;  // Maximum to get.
1089   EXPECT_EQ(MOJO_RESULT_OK,
1090             mp0->ReadMessage(0,
1091                              UserPointer<void>(read_buffer),
1092                              MakeUserPointer(&read_buffer_size),
1093                              &read_dispatchers,
1094                              &read_num_dispatchers,
1095                              MOJO_READ_MESSAGE_FLAG_NONE));
1096   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
1097   EXPECT_STREQ(kWorld, read_buffer);
1098   EXPECT_EQ(1u, read_dispatchers.size());
1099   EXPECT_EQ(1u, read_num_dispatchers);
1100   ASSERT_TRUE(read_dispatchers[0].get());
1101   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
1102
1103   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
1104   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
1105   read_dispatchers.clear();
1106
1107   // Add the waiter now, before it becomes readable to avoid a race.
1108   waiter.Init();
1109   ASSERT_EQ(MOJO_RESULT_OK,
1110             dispatcher->AddWaiter(
1111                 &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
1112
1113   // Write to "local_mp", port 1.
1114   EXPECT_EQ(MOJO_RESULT_OK,
1115             local_mp->WriteMessage(1,
1116                                    UserPointer<const void>(kHello),
1117                                    sizeof(kHello),
1118                                    nullptr,
1119                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
1120
1121   // Wait for the dispatcher to become readable.
1122   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1123   EXPECT_EQ(789u, context);
1124   hss = HandleSignalsState();
1125   dispatcher->RemoveWaiter(&waiter, &hss);
1126   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1127             hss.satisfied_signals);
1128   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1129             hss.satisfiable_signals);
1130
1131   // Read from the dispatcher.
1132   memset(read_buffer, 0, sizeof(read_buffer));
1133   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1134   EXPECT_EQ(MOJO_RESULT_OK,
1135             dispatcher->ReadMessage(UserPointer<void>(read_buffer),
1136                                     MakeUserPointer(&read_buffer_size),
1137                                     0,
1138                                     nullptr,
1139                                     MOJO_READ_MESSAGE_FLAG_NONE));
1140   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1141   EXPECT_STREQ(kHello, read_buffer);
1142
1143   // Prepare to wait on "local_mp", port 1.
1144   waiter.Init();
1145   ASSERT_EQ(MOJO_RESULT_OK,
1146             local_mp->AddWaiter(
1147                 1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));
1148
1149   // Write to the dispatcher.
1150   EXPECT_EQ(MOJO_RESULT_OK,
1151             dispatcher->WriteMessage(UserPointer<const void>(kHello),
1152                                      sizeof(kHello),
1153                                      nullptr,
1154                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
1155
1156   // Wait.
1157   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
1158   EXPECT_EQ(789u, context);
1159   hss = HandleSignalsState();
1160   local_mp->RemoveWaiter(1, &waiter, &hss);
1161   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1162             hss.satisfied_signals);
1163   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
1164             hss.satisfiable_signals);
1165
1166   // Read from "local_mp", port 1.
1167   memset(read_buffer, 0, sizeof(read_buffer));
1168   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
1169   EXPECT_EQ(MOJO_RESULT_OK,
1170             local_mp->ReadMessage(1,
1171                                   UserPointer<void>(read_buffer),
1172                                   MakeUserPointer(&read_buffer_size),
1173                                   nullptr,
1174                                   nullptr,
1175                                   MOJO_READ_MESSAGE_FLAG_NONE));
1176   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
1177   EXPECT_STREQ(kHello, read_buffer);
1178
1179   // TODO(vtl): Also test the cases where messages are written and read (at
1180   // various points) on the message pipe being passed around.
1181
1182   // Close everything that belongs to us.
1183   mp0->Close(0);
1184   mp1->Close(1);
1185   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
1186   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
1187   local_mp->Close(1);
1188 }
1189
1190 }  // namespace
1191 }  // namespace system
1192 }  // namespace mojo