1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
11 #include "base/bind.h"
12 #include "base/file_util.h"
13 #include "base/files/file_path.h"
14 #include "base/files/scoped_file.h"
15 #include "base/files/scoped_temp_dir.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/threading/platform_thread.h" // For |Sleep()|.
21 #include "build/build_config.h" // TODO(vtl): Remove this.
22 #include "mojo/common/test/test_utils.h"
23 #include "mojo/embedder/platform_channel_pair.h"
24 #include "mojo/embedder/platform_shared_buffer.h"
25 #include "mojo/embedder/scoped_platform_handle.h"
26 #include "mojo/embedder/simple_platform_support.h"
27 #include "mojo/system/channel.h"
28 #include "mojo/system/local_message_pipe_endpoint.h"
29 #include "mojo/system/message_pipe.h"
30 #include "mojo/system/message_pipe_dispatcher.h"
31 #include "mojo/system/platform_handle_dispatcher.h"
32 #include "mojo/system/proxy_message_pipe_endpoint.h"
33 #include "mojo/system/raw_channel.h"
34 #include "mojo/system/shared_buffer_dispatcher.h"
35 #include "mojo/system/test_utils.h"
36 #include "mojo/system/waiter.h"
37 #include "testing/gtest/include/gtest/gtest.h"
43 class RemoteMessagePipeTest : public testing::Test {
45 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
46 virtual ~RemoteMessagePipeTest() {}
48 virtual void SetUp() OVERRIDE {
49 io_thread_.PostTaskAndWait(
51 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
52 base::Unretained(this)));
55 virtual void TearDown() OVERRIDE {
56 io_thread_.PostTaskAndWait(
58 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
59 base::Unretained(this)));
63 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
64 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
65 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
66 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
67 scoped_refptr<MessagePipe> mp1) {
68 io_thread_.PostTaskAndWait(
70 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
71 base::Unretained(this),
76 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
77 // It assumes/requires that this is the bootstrap case, i.e., that the
78 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
79 // returns *without* waiting for it to finish connecting.
80 void BootstrapMessagePipeNoWait(unsigned channel_index,
81 scoped_refptr<MessagePipe> mp) {
84 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
85 base::Unretained(this),
90 void RestoreInitialState() {
91 io_thread_.PostTaskAndWait(
93 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
94 base::Unretained(this)));
97 test::TestIOThread* io_thread() { return &io_thread_; }
100 void SetUpOnIOThread() {
101 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
103 embedder::PlatformChannelPair channel_pair;
104 platform_handles_[0] = channel_pair.PassServerHandle();
105 platform_handles_[1] = channel_pair.PassClientHandle();
108 void TearDownOnIOThread() {
109 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
112 channels_[0]->Shutdown();
116 channels_[1]->Shutdown();
121 void CreateAndInitChannel(unsigned channel_index) {
122 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
123 CHECK(channel_index == 0 || channel_index == 1);
124 CHECK(!channels_[channel_index]);
126 channels_[channel_index] = new Channel();
127 CHECK(channels_[channel_index]->Init(
128 RawChannel::Create(platform_handles_[channel_index].Pass())));
131 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
132 scoped_refptr<MessagePipe> mp1) {
133 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
136 CreateAndInitChannel(0);
138 CreateAndInitChannel(1);
140 MessageInTransit::EndpointId local_id0 =
141 channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
142 MessageInTransit::EndpointId local_id1 =
143 channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
145 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
146 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
149 void BootstrapMessagePipeOnIOThread(unsigned channel_index,
150 scoped_refptr<MessagePipe> mp) {
151 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
152 CHECK(channel_index == 0 || channel_index == 1);
154 unsigned port = channel_index ^ 1u;
156 CreateAndInitChannel(channel_index);
157 MessageInTransit::EndpointId endpoint_id =
158 channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
159 if (endpoint_id == MessageInTransit::kInvalidEndpointId)
162 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
163 CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
164 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
167 void RestoreInitialStateOnIOThread() {
168 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
170 TearDownOnIOThread();
174 test::TestIOThread io_thread_;
175 embedder::ScopedPlatformHandle platform_handles_[2];
176 scoped_refptr<Channel> channels_[2];
178 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
181 TEST_F(RemoteMessagePipeTest, Basic) {
182 static const char kHello[] = "hello";
183 static const char kWorld[] = "world!!!1!!!1!";
184 char buffer[100] = {0};
185 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
187 HandleSignalsState hss;
188 uint32_t context = 0;
190 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
191 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
192 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
194 scoped_refptr<MessagePipe> mp0(new MessagePipe(
195 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
196 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
197 scoped_refptr<MessagePipe> mp1(new MessagePipe(
198 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
199 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
200 ConnectMessagePipes(mp0, mp1);
202 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
204 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
205 // it later, it might already be readable.)
207 ASSERT_EQ(MOJO_RESULT_OK,
208 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
210 // Write to MP 0, port 0.
211 EXPECT_EQ(MOJO_RESULT_OK,
213 UserPointer<const void>(kHello),
216 MOJO_WRITE_MESSAGE_FLAG_NONE));
219 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
220 EXPECT_EQ(123u, context);
221 hss = HandleSignalsState();
222 mp1->RemoveWaiter(1, &waiter, &hss);
223 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
224 hss.satisfied_signals);
225 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
226 hss.satisfiable_signals);
228 // Read from MP 1, port 1.
229 EXPECT_EQ(MOJO_RESULT_OK,
231 UserPointer<void>(buffer),
232 MakeUserPointer(&buffer_size),
235 MOJO_READ_MESSAGE_FLAG_NONE));
236 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
237 EXPECT_STREQ(kHello, buffer);
239 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
242 ASSERT_EQ(MOJO_RESULT_OK,
243 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, NULL));
245 EXPECT_EQ(MOJO_RESULT_OK,
247 UserPointer<const void>(kWorld),
250 MOJO_WRITE_MESSAGE_FLAG_NONE));
252 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
253 EXPECT_EQ(456u, context);
254 hss = HandleSignalsState();
255 mp0->RemoveWaiter(0, &waiter, &hss);
256 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
257 hss.satisfied_signals);
258 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
259 hss.satisfiable_signals);
261 buffer_size = static_cast<uint32_t>(sizeof(buffer));
262 EXPECT_EQ(MOJO_RESULT_OK,
264 UserPointer<void>(buffer),
265 MakeUserPointer(&buffer_size),
268 MOJO_READ_MESSAGE_FLAG_NONE));
269 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
270 EXPECT_STREQ(kWorld, buffer);
272 // Close MP 0, port 0.
275 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
276 // when it realizes that MP 0, port 0 has been closed. (It may also fail
279 hss = HandleSignalsState();
281 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
282 if (result == MOJO_RESULT_OK) {
283 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
284 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
285 EXPECT_EQ(789u, context);
286 hss = HandleSignalsState();
287 mp1->RemoveWaiter(1, &waiter, &hss);
289 EXPECT_EQ(0u, hss.satisfied_signals);
290 EXPECT_EQ(0u, hss.satisfiable_signals);
296 TEST_F(RemoteMessagePipeTest, Multiplex) {
297 static const char kHello[] = "hello";
298 static const char kWorld[] = "world!!!1!!!1!";
299 char buffer[100] = {0};
300 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
302 HandleSignalsState hss;
303 uint32_t context = 0;
305 // Connect message pipes as in the |Basic| test.
307 scoped_refptr<MessagePipe> mp0(new MessagePipe(
308 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
309 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
310 scoped_refptr<MessagePipe> mp1(new MessagePipe(
311 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
312 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
313 ConnectMessagePipes(mp0, mp1);
315 // Now put another message pipe on the channel.
317 scoped_refptr<MessagePipe> mp2(new MessagePipe(
318 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
319 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
320 scoped_refptr<MessagePipe> mp3(new MessagePipe(
321 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
322 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
323 ConnectMessagePipes(mp2, mp3);
325 // Write: MP 2, port 0 -> MP 3, port 1.
328 ASSERT_EQ(MOJO_RESULT_OK,
329 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, NULL));
331 EXPECT_EQ(MOJO_RESULT_OK,
333 UserPointer<const void>(kHello),
336 MOJO_WRITE_MESSAGE_FLAG_NONE));
338 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
339 EXPECT_EQ(789u, context);
340 hss = HandleSignalsState();
341 mp3->RemoveWaiter(1, &waiter, &hss);
342 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
343 hss.satisfied_signals);
344 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
345 hss.satisfiable_signals);
347 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
348 buffer_size = static_cast<uint32_t>(sizeof(buffer));
349 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
351 UserPointer<void>(buffer),
352 MakeUserPointer(&buffer_size),
355 MOJO_READ_MESSAGE_FLAG_NONE));
356 buffer_size = static_cast<uint32_t>(sizeof(buffer));
357 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
359 UserPointer<void>(buffer),
360 MakeUserPointer(&buffer_size),
363 MOJO_READ_MESSAGE_FLAG_NONE));
364 buffer_size = static_cast<uint32_t>(sizeof(buffer));
365 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
367 UserPointer<void>(buffer),
368 MakeUserPointer(&buffer_size),
371 MOJO_READ_MESSAGE_FLAG_NONE));
373 // Read from MP 3, port 1.
374 buffer_size = static_cast<uint32_t>(sizeof(buffer));
375 EXPECT_EQ(MOJO_RESULT_OK,
377 UserPointer<void>(buffer),
378 MakeUserPointer(&buffer_size),
381 MOJO_READ_MESSAGE_FLAG_NONE));
382 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
383 EXPECT_STREQ(kHello, buffer);
385 // Write: MP 0, port 0 -> MP 1, port 1 again.
388 ASSERT_EQ(MOJO_RESULT_OK,
389 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
391 EXPECT_EQ(MOJO_RESULT_OK,
393 UserPointer<const void>(kWorld),
396 MOJO_WRITE_MESSAGE_FLAG_NONE));
398 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
399 EXPECT_EQ(123u, context);
400 hss = HandleSignalsState();
401 mp1->RemoveWaiter(1, &waiter, &hss);
402 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
403 hss.satisfied_signals);
404 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
405 hss.satisfiable_signals);
407 // Make sure there's nothing on the other ports.
408 buffer_size = static_cast<uint32_t>(sizeof(buffer));
409 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
411 UserPointer<void>(buffer),
412 MakeUserPointer(&buffer_size),
415 MOJO_READ_MESSAGE_FLAG_NONE));
416 buffer_size = static_cast<uint32_t>(sizeof(buffer));
417 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
419 UserPointer<void>(buffer),
420 MakeUserPointer(&buffer_size),
423 MOJO_READ_MESSAGE_FLAG_NONE));
424 buffer_size = static_cast<uint32_t>(sizeof(buffer));
425 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
427 UserPointer<void>(buffer),
428 MakeUserPointer(&buffer_size),
431 MOJO_READ_MESSAGE_FLAG_NONE));
433 buffer_size = static_cast<uint32_t>(sizeof(buffer));
434 EXPECT_EQ(MOJO_RESULT_OK,
436 UserPointer<void>(buffer),
437 MakeUserPointer(&buffer_size),
440 MOJO_READ_MESSAGE_FLAG_NONE));
441 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
442 EXPECT_STREQ(kWorld, buffer);
450 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
451 static const char kHello[] = "hello";
452 char buffer[100] = {0};
453 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
455 HandleSignalsState hss;
456 uint32_t context = 0;
458 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
459 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
460 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
462 scoped_refptr<MessagePipe> mp0(new MessagePipe(
463 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
464 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
466 // Write to MP 0, port 0.
467 EXPECT_EQ(MOJO_RESULT_OK,
469 UserPointer<const void>(kHello),
472 MOJO_WRITE_MESSAGE_FLAG_NONE));
474 BootstrapMessagePipeNoWait(0, mp0);
476 // Close MP 0, port 0 before channel 1 is even connected.
479 scoped_refptr<MessagePipe> mp1(new MessagePipe(
480 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
481 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
483 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
484 // it later, it might already be readable.)
486 ASSERT_EQ(MOJO_RESULT_OK,
487 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
489 BootstrapMessagePipeNoWait(1, mp1);
492 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
493 EXPECT_EQ(123u, context);
494 hss = HandleSignalsState();
495 // Note: MP 1, port 1 should definitely should be readable, but it may or may
496 // not appear as writable (there's a race, and it may not have noticed that
497 // the other side was closed yet -- e.g., inserting a sleep here would make it
498 // much more likely to notice that it's no longer writable).
499 mp1->RemoveWaiter(1, &waiter, &hss);
500 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
501 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
503 // Read from MP 1, port 1.
504 EXPECT_EQ(MOJO_RESULT_OK,
506 UserPointer<void>(buffer),
507 MakeUserPointer(&buffer_size),
510 MOJO_READ_MESSAGE_FLAG_NONE));
511 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
512 EXPECT_STREQ(kHello, buffer);
518 TEST_F(RemoteMessagePipeTest, HandlePassing) {
519 static const char kHello[] = "hello";
521 HandleSignalsState hss;
522 uint32_t context = 0;
524 scoped_refptr<MessagePipe> mp0(new MessagePipe(
525 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
526 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
527 scoped_refptr<MessagePipe> mp1(new MessagePipe(
528 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
529 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
530 ConnectMessagePipes(mp0, mp1);
532 // We'll try to pass this dispatcher.
533 scoped_refptr<MessagePipeDispatcher> dispatcher(
534 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
535 scoped_refptr<MessagePipe> local_mp(new MessagePipe());
536 dispatcher->Init(local_mp, 0);
538 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
539 // it later, it might already be readable.)
541 ASSERT_EQ(MOJO_RESULT_OK,
542 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
544 // Write to MP 0, port 0.
546 DispatcherTransport transport(
547 test::DispatcherTryStartTransport(dispatcher.get()));
548 EXPECT_TRUE(transport.is_valid());
550 std::vector<DispatcherTransport> transports;
551 transports.push_back(transport);
552 EXPECT_EQ(MOJO_RESULT_OK,
554 UserPointer<const void>(kHello),
557 MOJO_WRITE_MESSAGE_FLAG_NONE));
560 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
561 // |dispatcher| is destroyed.
562 EXPECT_TRUE(dispatcher->HasOneRef());
567 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
568 EXPECT_EQ(123u, context);
569 hss = HandleSignalsState();
570 mp1->RemoveWaiter(1, &waiter, &hss);
571 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
572 hss.satisfied_signals);
573 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
574 hss.satisfiable_signals);
576 // Read from MP 1, port 1.
577 char read_buffer[100] = {0};
578 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
579 DispatcherVector read_dispatchers;
580 uint32_t read_num_dispatchers = 10; // Maximum to get.
581 EXPECT_EQ(MOJO_RESULT_OK,
583 UserPointer<void>(read_buffer),
584 MakeUserPointer(&read_buffer_size),
586 &read_num_dispatchers,
587 MOJO_READ_MESSAGE_FLAG_NONE));
588 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
589 EXPECT_STREQ(kHello, read_buffer);
590 EXPECT_EQ(1u, read_dispatchers.size());
591 EXPECT_EQ(1u, read_num_dispatchers);
592 ASSERT_TRUE(read_dispatchers[0]);
593 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
595 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
596 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
598 // Add the waiter now, before it becomes readable to avoid a race.
602 dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, NULL));
604 // Write to "local_mp", port 1.
605 EXPECT_EQ(MOJO_RESULT_OK,
606 local_mp->WriteMessage(1,
607 UserPointer<const void>(kHello),
610 MOJO_WRITE_MESSAGE_FLAG_NONE));
612 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
613 // here. (We don't crash if I sleep and then close.)
615 // Wait for the dispatcher to become readable.
616 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
617 EXPECT_EQ(456u, context);
618 hss = HandleSignalsState();
619 dispatcher->RemoveWaiter(&waiter, &hss);
620 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
621 hss.satisfied_signals);
622 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
623 hss.satisfiable_signals);
625 // Read from the dispatcher.
626 memset(read_buffer, 0, sizeof(read_buffer));
627 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
628 EXPECT_EQ(MOJO_RESULT_OK,
629 dispatcher->ReadMessage(UserPointer<void>(read_buffer),
630 MakeUserPointer(&read_buffer_size),
633 MOJO_READ_MESSAGE_FLAG_NONE));
634 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
635 EXPECT_STREQ(kHello, read_buffer);
637 // Prepare to wait on "local_mp", port 1.
641 local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, NULL));
643 // Write to the dispatcher.
644 EXPECT_EQ(MOJO_RESULT_OK,
645 dispatcher->WriteMessage(UserPointer<const void>(kHello),
648 MOJO_WRITE_MESSAGE_FLAG_NONE));
651 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
652 EXPECT_EQ(789u, context);
653 hss = HandleSignalsState();
654 local_mp->RemoveWaiter(1, &waiter, &hss);
655 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
656 hss.satisfied_signals);
657 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
658 hss.satisfiable_signals);
660 // Read from "local_mp", port 1.
661 memset(read_buffer, 0, sizeof(read_buffer));
662 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
663 EXPECT_EQ(MOJO_RESULT_OK,
664 local_mp->ReadMessage(1,
665 UserPointer<void>(read_buffer),
666 MakeUserPointer(&read_buffer_size),
669 MOJO_READ_MESSAGE_FLAG_NONE));
670 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
671 EXPECT_STREQ(kHello, read_buffer);
673 // TODO(vtl): Also test that messages queued up before the handle was sent are
674 // delivered properly.
676 // Close everything that belongs to us.
679 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
680 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
684 #if defined(OS_POSIX)
685 #define MAYBE_SharedBufferPassing SharedBufferPassing
687 // Not yet implemented (on Windows).
688 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
690 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
691 static const char kHello[] = "hello";
693 HandleSignalsState hss;
694 uint32_t context = 0;
696 scoped_refptr<MessagePipe> mp0(new MessagePipe(
697 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
698 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
699 scoped_refptr<MessagePipe> mp1(new MessagePipe(
700 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
701 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
702 ConnectMessagePipes(mp0, mp1);
704 // We'll try to pass this dispatcher.
705 embedder::SimplePlatformSupport platform_support;
706 scoped_refptr<SharedBufferDispatcher> dispatcher;
707 EXPECT_EQ(MOJO_RESULT_OK,
708 SharedBufferDispatcher::Create(
710 SharedBufferDispatcher::kDefaultCreateOptions,
713 ASSERT_TRUE(dispatcher);
716 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
719 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
720 ASSERT_TRUE(mapping0);
721 ASSERT_TRUE(mapping0->GetBase());
722 ASSERT_EQ(100u, mapping0->GetLength());
723 static_cast<char*>(mapping0->GetBase())[0] = 'A';
724 static_cast<char*>(mapping0->GetBase())[50] = 'B';
725 static_cast<char*>(mapping0->GetBase())[99] = 'C';
727 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
728 // it later, it might already be readable.)
730 ASSERT_EQ(MOJO_RESULT_OK,
731 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
733 // Write to MP 0, port 0.
735 DispatcherTransport transport(
736 test::DispatcherTryStartTransport(dispatcher.get()));
737 EXPECT_TRUE(transport.is_valid());
739 std::vector<DispatcherTransport> transports;
740 transports.push_back(transport);
741 EXPECT_EQ(MOJO_RESULT_OK,
743 UserPointer<const void>(kHello),
746 MOJO_WRITE_MESSAGE_FLAG_NONE));
749 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
750 // |dispatcher| is destroyed.
751 EXPECT_TRUE(dispatcher->HasOneRef());
756 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
757 EXPECT_EQ(123u, context);
758 hss = HandleSignalsState();
759 mp1->RemoveWaiter(1, &waiter, &hss);
760 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
761 hss.satisfied_signals);
762 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
763 hss.satisfiable_signals);
765 // Read from MP 1, port 1.
766 char read_buffer[100] = {0};
767 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
768 DispatcherVector read_dispatchers;
769 uint32_t read_num_dispatchers = 10; // Maximum to get.
770 EXPECT_EQ(MOJO_RESULT_OK,
772 UserPointer<void>(read_buffer),
773 MakeUserPointer(&read_buffer_size),
775 &read_num_dispatchers,
776 MOJO_READ_MESSAGE_FLAG_NONE));
777 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
778 EXPECT_STREQ(kHello, read_buffer);
779 EXPECT_EQ(1u, read_dispatchers.size());
780 EXPECT_EQ(1u, read_num_dispatchers);
781 ASSERT_TRUE(read_dispatchers[0]);
782 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
784 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
785 dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
787 // Make another mapping.
788 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
791 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
792 ASSERT_TRUE(mapping1);
793 ASSERT_TRUE(mapping1->GetBase());
794 ASSERT_EQ(100u, mapping1->GetLength());
795 EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
796 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
797 EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
798 EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
800 // Write stuff either way.
801 static_cast<char*>(mapping1->GetBase())[1] = 'x';
802 EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
803 static_cast<char*>(mapping0->GetBase())[2] = 'y';
804 EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
806 // Kill the first mapping; the second should still be valid.
808 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
810 // Close everything that belongs to us.
813 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
815 // The second mapping should still be good.
816 EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
819 #if defined(OS_POSIX)
820 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
822 // Not yet implemented (on Windows).
823 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
825 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
826 base::ScopedTempDir temp_dir;
827 ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
829 static const char kHello[] = "hello";
830 static const char kWorld[] = "world";
832 uint32_t context = 0;
833 HandleSignalsState hss;
835 scoped_refptr<MessagePipe> mp0(new MessagePipe(
836 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
837 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
838 scoped_refptr<MessagePipe> mp1(new MessagePipe(
839 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
840 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
841 ConnectMessagePipes(mp0, mp1);
843 base::FilePath unused;
845 CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
846 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
847 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
849 scoped_refptr<PlatformHandleDispatcher> dispatcher(
850 new PlatformHandleDispatcher(
851 mojo::test::PlatformHandleFromFILE(fp.Pass())));
853 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
854 // it later, it might already be readable.)
856 ASSERT_EQ(MOJO_RESULT_OK,
857 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
859 // Write to MP 0, port 0.
861 DispatcherTransport transport(
862 test::DispatcherTryStartTransport(dispatcher.get()));
863 EXPECT_TRUE(transport.is_valid());
865 std::vector<DispatcherTransport> transports;
866 transports.push_back(transport);
867 EXPECT_EQ(MOJO_RESULT_OK,
869 UserPointer<const void>(kWorld),
872 MOJO_WRITE_MESSAGE_FLAG_NONE));
875 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
876 // |dispatcher| is destroyed.
877 EXPECT_TRUE(dispatcher->HasOneRef());
882 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
883 EXPECT_EQ(123u, context);
884 hss = HandleSignalsState();
885 mp1->RemoveWaiter(1, &waiter, &hss);
886 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
887 hss.satisfied_signals);
888 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
889 hss.satisfiable_signals);
891 // Read from MP 1, port 1.
892 char read_buffer[100] = {0};
893 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
894 DispatcherVector read_dispatchers;
895 uint32_t read_num_dispatchers = 10; // Maximum to get.
896 EXPECT_EQ(MOJO_RESULT_OK,
898 UserPointer<void>(read_buffer),
899 MakeUserPointer(&read_buffer_size),
901 &read_num_dispatchers,
902 MOJO_READ_MESSAGE_FLAG_NONE));
903 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
904 EXPECT_STREQ(kWorld, read_buffer);
905 EXPECT_EQ(1u, read_dispatchers.size());
906 EXPECT_EQ(1u, read_num_dispatchers);
907 ASSERT_TRUE(read_dispatchers[0]);
908 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
910 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
912 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
914 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
915 EXPECT_TRUE(h.is_valid());
917 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
918 EXPECT_FALSE(h.is_valid());
922 memset(read_buffer, 0, sizeof(read_buffer));
923 EXPECT_EQ(sizeof(kHello),
924 fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
925 EXPECT_STREQ(kHello, read_buffer);
927 // Close everything that belongs to us.
930 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
933 // Test racing closes (on each end).
934 // Note: A flaky failure would almost certainly indicate a problem in the code
935 // itself (not in the test). Also, any logged warnings/errors would also
936 // probably be indicative of bugs.
937 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
938 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
940 for (unsigned i = 0; i < 256; i++) {
941 DVLOG(2) << "---------------------------------------- " << i;
942 scoped_refptr<MessagePipe> mp0(new MessagePipe(
943 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
944 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
945 BootstrapMessagePipeNoWait(0, mp0);
947 scoped_refptr<MessagePipe> mp1(new MessagePipe(
948 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
949 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
950 BootstrapMessagePipeNoWait(1, mp1);
953 io_thread()->task_runner()->PostTask(
954 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
957 base::PlatformThread::Sleep(delay);
962 io_thread()->task_runner()->PostTask(
963 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
966 base::PlatformThread::Sleep(delay);
970 RestoreInitialState();
975 } // namespace system