1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
11 #include "base/basictypes.h"
12 #include "base/bind.h"
13 #include "base/file_util.h"
14 #include "base/files/file_path.h"
15 #include "base/files/scoped_file.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "build/build_config.h" // TODO(vtl): Remove this.
21 #include "mojo/common/test/test_utils.h"
22 #include "mojo/embedder/platform_channel_pair.h"
23 #include "mojo/embedder/scoped_platform_handle.h"
24 #include "mojo/system/channel.h"
25 #include "mojo/system/local_message_pipe_endpoint.h"
26 #include "mojo/system/message_pipe.h"
27 #include "mojo/system/message_pipe_dispatcher.h"
28 #include "mojo/system/platform_handle_dispatcher.h"
29 #include "mojo/system/proxy_message_pipe_endpoint.h"
30 #include "mojo/system/raw_channel.h"
31 #include "mojo/system/shared_buffer_dispatcher.h"
32 #include "mojo/system/test_utils.h"
33 #include "mojo/system/waiter.h"
34 #include "testing/gtest/include/gtest/gtest.h"
40 class RemoteMessagePipeTest : public testing::Test {
42 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
43 virtual ~RemoteMessagePipeTest() {}
45 virtual void SetUp() OVERRIDE {
46 io_thread_.PostTaskAndWait(
48 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
49 base::Unretained(this)));
52 virtual void TearDown() OVERRIDE {
53 io_thread_.PostTaskAndWait(
55 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
56 base::Unretained(this)));
60 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
61 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
62 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
63 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
64 scoped_refptr<MessagePipe> mp1) {
65 io_thread_.PostTaskAndWait(
67 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
68 base::Unretained(this), mp0, mp1));
71 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
72 // It assumes/requires that this is the bootstrap case, i.e., that the
73 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
74 // returns *without* waiting for it to finish connecting.
75 void BootstrapMessagePipeNoWait(unsigned channel_index,
76 scoped_refptr<MessagePipe> mp) {
79 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
80 base::Unretained(this), channel_index, mp));
83 void RestoreInitialState() {
84 io_thread_.PostTaskAndWait(
86 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
87 base::Unretained(this)));
90 test::TestIOThread* io_thread() { return &io_thread_; }
93 void SetUpOnIOThread() {
94 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
96 embedder::PlatformChannelPair channel_pair;
97 platform_handles_[0] = channel_pair.PassServerHandle();
98 platform_handles_[1] = channel_pair.PassClientHandle();
101 void TearDownOnIOThread() {
102 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
105 channels_[0]->Shutdown();
109 channels_[1]->Shutdown();
114 void CreateAndInitChannel(unsigned channel_index) {
115 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
116 CHECK(channel_index == 0 || channel_index == 1);
117 CHECK(!channels_[channel_index]);
119 channels_[channel_index] = new Channel();
120 CHECK(channels_[channel_index]->Init(
121 RawChannel::Create(platform_handles_[channel_index].Pass())));
124 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
125 scoped_refptr<MessagePipe> mp1) {
126 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
129 CreateAndInitChannel(0);
131 CreateAndInitChannel(1);
133 MessageInTransit::EndpointId local_id0 =
134 channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
135 MessageInTransit::EndpointId local_id1 =
136 channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
138 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
139 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
142 void BootstrapMessagePipeOnIOThread(unsigned channel_index,
143 scoped_refptr<MessagePipe> mp) {
144 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
145 CHECK(channel_index == 0 || channel_index == 1);
147 unsigned port = channel_index ^ 1u;
149 CreateAndInitChannel(channel_index);
150 MessageInTransit::EndpointId endpoint_id =
151 channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
152 if (endpoint_id == MessageInTransit::kInvalidEndpointId)
155 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
156 CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
157 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
160 void RestoreInitialStateOnIOThread() {
161 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
163 TearDownOnIOThread();
167 test::TestIOThread io_thread_;
168 embedder::ScopedPlatformHandle platform_handles_[2];
169 scoped_refptr<Channel> channels_[2];
171 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
174 TEST_F(RemoteMessagePipeTest, Basic) {
175 static const char kHello[] = "hello";
176 static const char kWorld[] = "world!!!1!!!1!";
177 char buffer[100] = { 0 };
178 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
180 uint32_t context = 0;
182 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
183 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
184 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
186 scoped_refptr<MessagePipe> mp0(new MessagePipe(
187 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
188 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
189 scoped_refptr<MessagePipe> mp1(new MessagePipe(
190 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
191 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
192 ConnectMessagePipes(mp0, mp1);
194 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
196 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
197 // it later, it might already be readable.)
199 EXPECT_EQ(MOJO_RESULT_OK,
200 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
202 // Write to MP 0, port 0.
203 EXPECT_EQ(MOJO_RESULT_OK,
205 kHello, sizeof(kHello),
207 MOJO_WRITE_MESSAGE_FLAG_NONE));
210 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
211 EXPECT_EQ(123u, context);
212 mp1->RemoveWaiter(1, &waiter);
214 // Read from MP 1, port 1.
215 EXPECT_EQ(MOJO_RESULT_OK,
217 buffer, &buffer_size,
219 MOJO_READ_MESSAGE_FLAG_NONE));
220 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
221 EXPECT_STREQ(kHello, buffer);
223 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
226 EXPECT_EQ(MOJO_RESULT_OK,
227 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
229 EXPECT_EQ(MOJO_RESULT_OK,
231 kWorld, sizeof(kWorld),
233 MOJO_WRITE_MESSAGE_FLAG_NONE));
235 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
236 EXPECT_EQ(456u, context);
237 mp0->RemoveWaiter(0, &waiter);
239 buffer_size = static_cast<uint32_t>(sizeof(buffer));
240 EXPECT_EQ(MOJO_RESULT_OK,
242 buffer, &buffer_size,
244 MOJO_READ_MESSAGE_FLAG_NONE));
245 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
246 EXPECT_STREQ(kWorld, buffer);
248 // Close MP 0, port 0.
251 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
252 // when it realizes that MP 0, port 0 has been closed. (It may also fail
256 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789);
257 if (result == MOJO_RESULT_OK) {
258 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
259 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
260 EXPECT_EQ(789u, context);
261 mp1->RemoveWaiter(1, &waiter);
263 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
270 TEST_F(RemoteMessagePipeTest, Multiplex) {
271 static const char kHello[] = "hello";
272 static const char kWorld[] = "world!!!1!!!1!";
273 char buffer[100] = { 0 };
274 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
276 uint32_t context = 0;
278 // Connect message pipes as in the |Basic| test.
280 scoped_refptr<MessagePipe> mp0(new MessagePipe(
281 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
282 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
283 scoped_refptr<MessagePipe> mp1(new MessagePipe(
284 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
285 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
286 ConnectMessagePipes(mp0, mp1);
288 // Now put another message pipe on the channel.
290 scoped_refptr<MessagePipe> mp2(new MessagePipe(
291 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
292 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
293 scoped_refptr<MessagePipe> mp3(new MessagePipe(
294 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
295 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
296 ConnectMessagePipes(mp2, mp3);
298 // Write: MP 2, port 0 -> MP 3, port 1.
301 EXPECT_EQ(MOJO_RESULT_OK,
302 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
304 EXPECT_EQ(MOJO_RESULT_OK,
306 kHello, sizeof(kHello),
308 MOJO_WRITE_MESSAGE_FLAG_NONE));
310 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
311 EXPECT_EQ(789u, context);
312 mp3->RemoveWaiter(1, &waiter);
314 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
315 buffer_size = static_cast<uint32_t>(sizeof(buffer));
316 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
318 buffer, &buffer_size,
320 MOJO_READ_MESSAGE_FLAG_NONE));
321 buffer_size = static_cast<uint32_t>(sizeof(buffer));
322 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
324 buffer, &buffer_size,
326 MOJO_READ_MESSAGE_FLAG_NONE));
327 buffer_size = static_cast<uint32_t>(sizeof(buffer));
328 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
330 buffer, &buffer_size,
332 MOJO_READ_MESSAGE_FLAG_NONE));
334 // Read from MP 3, port 1.
335 buffer_size = static_cast<uint32_t>(sizeof(buffer));
336 EXPECT_EQ(MOJO_RESULT_OK,
338 buffer, &buffer_size,
340 MOJO_READ_MESSAGE_FLAG_NONE));
341 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
342 EXPECT_STREQ(kHello, buffer);
344 // Write: MP 0, port 0 -> MP 1, port 1 again.
347 EXPECT_EQ(MOJO_RESULT_OK,
348 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
350 EXPECT_EQ(MOJO_RESULT_OK,
352 kWorld, sizeof(kWorld),
354 MOJO_WRITE_MESSAGE_FLAG_NONE));
356 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
357 EXPECT_EQ(123u, context);
358 mp1->RemoveWaiter(1, &waiter);
360 // Make sure there's nothing on the other ports.
361 buffer_size = static_cast<uint32_t>(sizeof(buffer));
362 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
364 buffer, &buffer_size,
366 MOJO_READ_MESSAGE_FLAG_NONE));
367 buffer_size = static_cast<uint32_t>(sizeof(buffer));
368 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
370 buffer, &buffer_size,
372 MOJO_READ_MESSAGE_FLAG_NONE));
373 buffer_size = static_cast<uint32_t>(sizeof(buffer));
374 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
376 buffer, &buffer_size,
378 MOJO_READ_MESSAGE_FLAG_NONE));
380 buffer_size = static_cast<uint32_t>(sizeof(buffer));
381 EXPECT_EQ(MOJO_RESULT_OK,
383 buffer, &buffer_size,
385 MOJO_READ_MESSAGE_FLAG_NONE));
386 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
387 EXPECT_STREQ(kWorld, buffer);
395 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
396 static const char kHello[] = "hello";
397 char buffer[100] = { 0 };
398 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
400 uint32_t context = 0;
402 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
403 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
404 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
406 scoped_refptr<MessagePipe> mp0(new MessagePipe(
407 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
408 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
410 // Write to MP 0, port 0.
411 EXPECT_EQ(MOJO_RESULT_OK,
413 kHello, sizeof(kHello),
415 MOJO_WRITE_MESSAGE_FLAG_NONE));
417 BootstrapMessagePipeNoWait(0, mp0);
420 // Close MP 0, port 0 before channel 1 is even connected.
423 scoped_refptr<MessagePipe> mp1(new MessagePipe(
424 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
425 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
427 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
428 // it later, it might already be readable.)
430 EXPECT_EQ(MOJO_RESULT_OK,
431 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
433 BootstrapMessagePipeNoWait(1, mp1);
436 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
437 EXPECT_EQ(123u, context);
438 mp1->RemoveWaiter(1, &waiter);
440 // Read from MP 1, port 1.
441 EXPECT_EQ(MOJO_RESULT_OK,
443 buffer, &buffer_size,
445 MOJO_READ_MESSAGE_FLAG_NONE));
446 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
447 EXPECT_STREQ(kHello, buffer);
453 TEST_F(RemoteMessagePipeTest, HandlePassing) {
454 static const char kHello[] = "hello";
456 uint32_t context = 0;
458 scoped_refptr<MessagePipe> mp0(new MessagePipe(
459 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
460 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
461 scoped_refptr<MessagePipe> mp1(new MessagePipe(
462 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
463 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
464 ConnectMessagePipes(mp0, mp1);
466 // We'll try to pass this dispatcher.
467 scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
468 MessagePipeDispatcher::kDefaultCreateOptions));
469 scoped_refptr<MessagePipe> local_mp(new MessagePipe());
470 dispatcher->Init(local_mp, 0);
472 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
473 // it later, it might already be readable.)
475 EXPECT_EQ(MOJO_RESULT_OK,
476 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
478 // Write to MP 0, port 0.
481 transport(test::DispatcherTryStartTransport(dispatcher.get()));
482 EXPECT_TRUE(transport.is_valid());
484 std::vector<DispatcherTransport> transports;
485 transports.push_back(transport);
486 EXPECT_EQ(MOJO_RESULT_OK,
487 mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
488 MOJO_WRITE_MESSAGE_FLAG_NONE));
491 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
492 // |dispatcher| is destroyed.
493 EXPECT_TRUE(dispatcher->HasOneRef());
498 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
499 EXPECT_EQ(123u, context);
500 mp1->RemoveWaiter(1, &waiter);
502 // Read from MP 1, port 1.
503 char read_buffer[100] = { 0 };
504 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
505 DispatcherVector read_dispatchers;
506 uint32_t read_num_dispatchers = 10; // Maximum to get.
507 EXPECT_EQ(MOJO_RESULT_OK,
508 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
509 &read_dispatchers, &read_num_dispatchers,
510 MOJO_READ_MESSAGE_FLAG_NONE));
511 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
512 EXPECT_STREQ(kHello, read_buffer);
513 EXPECT_EQ(1u, read_dispatchers.size());
514 EXPECT_EQ(1u, read_num_dispatchers);
515 ASSERT_TRUE(read_dispatchers[0]);
516 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
518 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
519 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
521 // Write to "local_mp", port 1.
522 EXPECT_EQ(MOJO_RESULT_OK,
523 local_mp->WriteMessage(1, kHello, sizeof(kHello), NULL,
524 MOJO_WRITE_MESSAGE_FLAG_NONE));
526 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
527 // here. (We don't crash if I sleep and then close.)
529 // Wait for the dispatcher to become readable.
531 EXPECT_EQ(MOJO_RESULT_OK,
532 dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
533 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
534 EXPECT_EQ(456u, context);
535 dispatcher->RemoveWaiter(&waiter);
537 // Read from the dispatcher.
538 memset(read_buffer, 0, sizeof(read_buffer));
539 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
540 EXPECT_EQ(MOJO_RESULT_OK,
541 dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
542 MOJO_READ_MESSAGE_FLAG_NONE));
543 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
544 EXPECT_STREQ(kHello, read_buffer);
546 // Prepare to wait on "local_mp", port 1.
548 EXPECT_EQ(MOJO_RESULT_OK,
549 local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
551 // Write to the dispatcher.
552 EXPECT_EQ(MOJO_RESULT_OK,
553 dispatcher->WriteMessage(kHello, sizeof(kHello), NULL,
554 MOJO_WRITE_MESSAGE_FLAG_NONE));
557 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
558 EXPECT_EQ(789u, context);
559 local_mp->RemoveWaiter(1, &waiter);
561 // Read from "local_mp", port 1.
562 memset(read_buffer, 0, sizeof(read_buffer));
563 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
564 EXPECT_EQ(MOJO_RESULT_OK,
565 local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
566 MOJO_READ_MESSAGE_FLAG_NONE));
567 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
568 EXPECT_STREQ(kHello, read_buffer);
570 // TODO(vtl): Also test that messages queued up before the handle was sent are
571 // delivered properly.
573 // Close everything that belongs to us.
576 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
577 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
581 #if defined(OS_POSIX)
582 #define MAYBE_SharedBufferPassing SharedBufferPassing
584 // Not yet implemented (on Windows).
585 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
587 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
588 static const char kHello[] = "hello";
590 uint32_t context = 0;
592 scoped_refptr<MessagePipe> mp0(new MessagePipe(
593 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
594 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
595 scoped_refptr<MessagePipe> mp1(new MessagePipe(
596 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
597 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
598 ConnectMessagePipes(mp0, mp1);
600 // We'll try to pass this dispatcher.
601 scoped_refptr<SharedBufferDispatcher> dispatcher;
602 EXPECT_EQ(MOJO_RESULT_OK,
603 SharedBufferDispatcher::Create(
604 SharedBufferDispatcher::kDefaultCreateOptions, 100,
606 ASSERT_TRUE(dispatcher);
609 scoped_ptr<RawSharedBufferMapping> mapping0;
610 EXPECT_EQ(MOJO_RESULT_OK,
611 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
613 ASSERT_TRUE(mapping0);
614 ASSERT_TRUE(mapping0->base());
615 ASSERT_EQ(100u, mapping0->length());
616 static_cast<char*>(mapping0->base())[0] = 'A';
617 static_cast<char*>(mapping0->base())[50] = 'B';
618 static_cast<char*>(mapping0->base())[99] = 'C';
620 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
621 // it later, it might already be readable.)
623 EXPECT_EQ(MOJO_RESULT_OK,
624 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
626 // Write to MP 0, port 0.
629 transport(test::DispatcherTryStartTransport(dispatcher.get()));
630 EXPECT_TRUE(transport.is_valid());
632 std::vector<DispatcherTransport> transports;
633 transports.push_back(transport);
634 EXPECT_EQ(MOJO_RESULT_OK,
635 mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
636 MOJO_WRITE_MESSAGE_FLAG_NONE));
639 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
640 // |dispatcher| is destroyed.
641 EXPECT_TRUE(dispatcher->HasOneRef());
646 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
647 EXPECT_EQ(123u, context);
648 mp1->RemoveWaiter(1, &waiter);
650 // Read from MP 1, port 1.
651 char read_buffer[100] = { 0 };
652 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
653 DispatcherVector read_dispatchers;
654 uint32_t read_num_dispatchers = 10; // Maximum to get.
655 EXPECT_EQ(MOJO_RESULT_OK,
656 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
657 &read_dispatchers, &read_num_dispatchers,
658 MOJO_READ_MESSAGE_FLAG_NONE));
659 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
660 EXPECT_STREQ(kHello, read_buffer);
661 EXPECT_EQ(1u, read_dispatchers.size());
662 EXPECT_EQ(1u, read_num_dispatchers);
663 ASSERT_TRUE(read_dispatchers[0]);
664 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
666 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
668 static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
670 // Make another mapping.
671 scoped_ptr<RawSharedBufferMapping> mapping1;
672 EXPECT_EQ(MOJO_RESULT_OK,
673 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
675 ASSERT_TRUE(mapping1);
676 ASSERT_TRUE(mapping1->base());
677 ASSERT_EQ(100u, mapping1->length());
678 EXPECT_NE(mapping1->base(), mapping0->base());
679 EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
680 EXPECT_EQ('B', static_cast<char*>(mapping1->base())[50]);
681 EXPECT_EQ('C', static_cast<char*>(mapping1->base())[99]);
683 // Write stuff either way.
684 static_cast<char*>(mapping1->base())[1] = 'x';
685 EXPECT_EQ('x', static_cast<char*>(mapping0->base())[1]);
686 static_cast<char*>(mapping0->base())[2] = 'y';
687 EXPECT_EQ('y', static_cast<char*>(mapping1->base())[2]);
689 // Kill the first mapping; the second should still be valid.
691 EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
693 // Close everything that belongs to us.
696 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
698 // The second mapping should still be good.
699 EXPECT_EQ('x', static_cast<char*>(mapping1->base())[1]);
702 #if defined(OS_POSIX)
703 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
705 // Not yet implemented (on Windows).
706 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
708 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
709 static const char kHello[] = "hello";
710 static const char kWorld[] = "world";
712 uint32_t context = 0;
714 scoped_refptr<MessagePipe> mp0(new MessagePipe(
715 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
716 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
717 scoped_refptr<MessagePipe> mp1(new MessagePipe(
718 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
719 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
720 ConnectMessagePipes(mp0, mp1);
722 base::FilePath unused;
723 base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
724 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
725 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
727 scoped_refptr<PlatformHandleDispatcher> dispatcher(
728 new PlatformHandleDispatcher(
729 mojo::test::PlatformHandleFromFILE(fp.Pass())));
731 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
732 // it later, it might already be readable.)
734 EXPECT_EQ(MOJO_RESULT_OK,
735 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
737 // Write to MP 0, port 0.
740 transport(test::DispatcherTryStartTransport(dispatcher.get()));
741 EXPECT_TRUE(transport.is_valid());
743 std::vector<DispatcherTransport> transports;
744 transports.push_back(transport);
745 EXPECT_EQ(MOJO_RESULT_OK,
746 mp0->WriteMessage(0, kWorld, sizeof(kWorld), &transports,
747 MOJO_WRITE_MESSAGE_FLAG_NONE));
750 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
751 // |dispatcher| is destroyed.
752 EXPECT_TRUE(dispatcher->HasOneRef());
757 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
758 EXPECT_EQ(123u, context);
759 mp1->RemoveWaiter(1, &waiter);
761 // Read from MP 1, port 1.
762 char read_buffer[100] = { 0 };
763 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
764 DispatcherVector read_dispatchers;
765 uint32_t read_num_dispatchers = 10; // Maximum to get.
766 EXPECT_EQ(MOJO_RESULT_OK,
767 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
768 &read_dispatchers, &read_num_dispatchers,
769 MOJO_READ_MESSAGE_FLAG_NONE));
770 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
771 EXPECT_STREQ(kWorld, read_buffer);
772 EXPECT_EQ(1u, read_dispatchers.size());
773 EXPECT_EQ(1u, read_num_dispatchers);
774 ASSERT_TRUE(read_dispatchers[0]);
775 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
777 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
779 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
781 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
782 EXPECT_TRUE(h.is_valid());
784 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
785 EXPECT_FALSE(h.is_valid());
789 memset(read_buffer, 0, sizeof(read_buffer));
790 EXPECT_EQ(sizeof(kHello),
791 fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
792 EXPECT_STREQ(kHello, read_buffer);
794 // Close everything that belongs to us.
797 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
800 // Test racing closes (on each end).
801 // Note: A flaky failure would almost certainly indicate a problem in the code
802 // itself (not in the test). Also, any logged warnings/errors would also
803 // probably be indicative of bugs.
804 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
805 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
807 for (unsigned i = 0; i < 256; i++) {
808 DVLOG(2) << "---------------------------------------- " << i;
809 scoped_refptr<MessagePipe> mp0(new MessagePipe(
810 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
811 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
812 BootstrapMessagePipeNoWait(0, mp0);
814 scoped_refptr<MessagePipe> mp1(new MessagePipe(
815 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
816 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
817 BootstrapMessagePipeNoWait(1, mp1);
820 io_thread()->task_runner()->PostTask(
821 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
824 base::PlatformThread::Sleep(delay);
829 io_thread()->task_runner()->PostTask(
830 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
833 base::PlatformThread::Sleep(delay);
837 RestoreInitialState();
842 } // namespace system