Upstream version 8.37.180.0
[platform/framework/web/crosswalk.git] / src / mojo / system / remote_message_pipe_unittest.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include <vector>
10
11 #include "base/basictypes.h"
12 #include "base/bind.h"
13 #include "base/file_util.h"
14 #include "base/files/file_path.h"
15 #include "base/files/scoped_file.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/message_loop/message_loop.h"
19 #include "base/threading/platform_thread.h"  // For |Sleep()|.
20 #include "build/build_config.h"  // TODO(vtl): Remove this.
21 #include "mojo/common/test/test_utils.h"
22 #include "mojo/embedder/platform_channel_pair.h"
23 #include "mojo/embedder/scoped_platform_handle.h"
24 #include "mojo/system/channel.h"
25 #include "mojo/system/local_message_pipe_endpoint.h"
26 #include "mojo/system/message_pipe.h"
27 #include "mojo/system/message_pipe_dispatcher.h"
28 #include "mojo/system/platform_handle_dispatcher.h"
29 #include "mojo/system/proxy_message_pipe_endpoint.h"
30 #include "mojo/system/raw_channel.h"
31 #include "mojo/system/shared_buffer_dispatcher.h"
32 #include "mojo/system/test_utils.h"
33 #include "mojo/system/waiter.h"
34 #include "testing/gtest/include/gtest/gtest.h"
35
36 namespace mojo {
37 namespace system {
38 namespace {
39
40 class RemoteMessagePipeTest : public testing::Test {
41  public:
42   RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
43   virtual ~RemoteMessagePipeTest() {}
44
45   virtual void SetUp() OVERRIDE {
46     io_thread_.PostTaskAndWait(
47         FROM_HERE,
48         base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
49                    base::Unretained(this)));
50   }
51
52   virtual void TearDown() OVERRIDE {
53     io_thread_.PostTaskAndWait(
54         FROM_HERE,
55         base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
56                    base::Unretained(this)));
57   }
58
59  protected:
60   // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
61   // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
62   // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
63   void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
64                            scoped_refptr<MessagePipe> mp1) {
65     io_thread_.PostTaskAndWait(
66         FROM_HERE,
67         base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
68                    base::Unretained(this), mp0, mp1));
69   }
70
71   // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
72   // It assumes/requires that this is the bootstrap case, i.e., that the
73   // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
74   // returns *without* waiting for it to finish connecting.
75   void BootstrapMessagePipeNoWait(unsigned channel_index,
76                                   scoped_refptr<MessagePipe> mp) {
77     io_thread_.PostTask(
78         FROM_HERE,
79         base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
80                    base::Unretained(this), channel_index, mp));
81   }
82
83   void RestoreInitialState() {
84     io_thread_.PostTaskAndWait(
85         FROM_HERE,
86         base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
87                    base::Unretained(this)));
88   }
89
90   test::TestIOThread* io_thread() { return &io_thread_; }
91
92  private:
93   void SetUpOnIOThread() {
94     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
95
96     embedder::PlatformChannelPair channel_pair;
97     platform_handles_[0] = channel_pair.PassServerHandle();
98     platform_handles_[1] = channel_pair.PassClientHandle();
99   }
100
101   void TearDownOnIOThread() {
102     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
103
104     if (channels_[0]) {
105       channels_[0]->Shutdown();
106       channels_[0] = NULL;
107     }
108     if (channels_[1]) {
109       channels_[1]->Shutdown();
110       channels_[1] = NULL;
111     }
112   }
113
114   void CreateAndInitChannel(unsigned channel_index) {
115     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
116     CHECK(channel_index == 0 || channel_index == 1);
117     CHECK(!channels_[channel_index]);
118
119     channels_[channel_index] = new Channel();
120     CHECK(channels_[channel_index]->Init(
121         RawChannel::Create(platform_handles_[channel_index].Pass())));
122   }
123
124   void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
125                                      scoped_refptr<MessagePipe> mp1) {
126     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
127
128     if (!channels_[0])
129       CreateAndInitChannel(0);
130     if (!channels_[1])
131       CreateAndInitChannel(1);
132
133     MessageInTransit::EndpointId local_id0 =
134         channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
135     MessageInTransit::EndpointId local_id1 =
136         channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
137
138     CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
139     CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
140   }
141
142   void BootstrapMessagePipeOnIOThread(unsigned channel_index,
143                                       scoped_refptr<MessagePipe> mp) {
144     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
145     CHECK(channel_index == 0 || channel_index == 1);
146
147     unsigned port = channel_index ^ 1u;
148
149     CreateAndInitChannel(channel_index);
150     MessageInTransit::EndpointId endpoint_id =
151         channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
152     if (endpoint_id == MessageInTransit::kInvalidEndpointId)
153       return;
154
155     CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
156     CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
157         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
158   }
159
160   void RestoreInitialStateOnIOThread() {
161     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
162
163     TearDownOnIOThread();
164     SetUpOnIOThread();
165   }
166
167   test::TestIOThread io_thread_;
168   embedder::ScopedPlatformHandle platform_handles_[2];
169   scoped_refptr<Channel> channels_[2];
170
171   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
172 };
173
174 TEST_F(RemoteMessagePipeTest, Basic) {
175   static const char kHello[] = "hello";
176   static const char kWorld[] = "world!!!1!!!1!";
177   char buffer[100] = { 0 };
178   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
179   Waiter waiter;
180   uint32_t context = 0;
181
182   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
183   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
184   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
185
186   scoped_refptr<MessagePipe> mp0(new MessagePipe(
187       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
188       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
189   scoped_refptr<MessagePipe> mp1(new MessagePipe(
190       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
191       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
192   ConnectMessagePipes(mp0, mp1);
193
194   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
195
196   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
197   // it later, it might already be readable.)
198   waiter.Init();
199   EXPECT_EQ(MOJO_RESULT_OK,
200             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
201
202   // Write to MP 0, port 0.
203   EXPECT_EQ(MOJO_RESULT_OK,
204             mp0->WriteMessage(0,
205                               kHello, sizeof(kHello),
206                               NULL,
207                               MOJO_WRITE_MESSAGE_FLAG_NONE));
208
209   // Wait.
210   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
211   EXPECT_EQ(123u, context);
212   mp1->RemoveWaiter(1, &waiter);
213
214   // Read from MP 1, port 1.
215   EXPECT_EQ(MOJO_RESULT_OK,
216             mp1->ReadMessage(1,
217                              buffer, &buffer_size,
218                              NULL, NULL,
219                              MOJO_READ_MESSAGE_FLAG_NONE));
220   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
221   EXPECT_STREQ(kHello, buffer);
222
223   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
224
225   waiter.Init();
226   EXPECT_EQ(MOJO_RESULT_OK,
227             mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
228
229   EXPECT_EQ(MOJO_RESULT_OK,
230             mp1->WriteMessage(1,
231                               kWorld, sizeof(kWorld),
232                               NULL,
233                               MOJO_WRITE_MESSAGE_FLAG_NONE));
234
235   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
236   EXPECT_EQ(456u, context);
237   mp0->RemoveWaiter(0, &waiter);
238
239   buffer_size = static_cast<uint32_t>(sizeof(buffer));
240   EXPECT_EQ(MOJO_RESULT_OK,
241             mp0->ReadMessage(0,
242                              buffer, &buffer_size,
243                              NULL, NULL,
244                              MOJO_READ_MESSAGE_FLAG_NONE));
245   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
246   EXPECT_STREQ(kWorld, buffer);
247
248   // Close MP 0, port 0.
249   mp0->Close(0);
250
251   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
252   // when it realizes that MP 0, port 0 has been closed. (It may also fail
253   // immediately.)
254   waiter.Init();
255   MojoResult result =
256       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789);
257   if (result == MOJO_RESULT_OK) {
258     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
259               waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
260     EXPECT_EQ(789u, context);
261     mp1->RemoveWaiter(1, &waiter);
262   } else {
263     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
264   }
265
266   // And MP 1, port 1.
267   mp1->Close(1);
268 }
269
270 TEST_F(RemoteMessagePipeTest, Multiplex) {
271   static const char kHello[] = "hello";
272   static const char kWorld[] = "world!!!1!!!1!";
273   char buffer[100] = { 0 };
274   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
275   Waiter waiter;
276   uint32_t context = 0;
277
278   // Connect message pipes as in the |Basic| test.
279
280   scoped_refptr<MessagePipe> mp0(new MessagePipe(
281       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
282       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
283   scoped_refptr<MessagePipe> mp1(new MessagePipe(
284       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
285       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
286   ConnectMessagePipes(mp0, mp1);
287
288   // Now put another message pipe on the channel.
289
290   scoped_refptr<MessagePipe> mp2(new MessagePipe(
291       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
292       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
293   scoped_refptr<MessagePipe> mp3(new MessagePipe(
294       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
295       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
296   ConnectMessagePipes(mp2, mp3);
297
298   // Write: MP 2, port 0 -> MP 3, port 1.
299
300   waiter.Init();
301   EXPECT_EQ(MOJO_RESULT_OK,
302             mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
303
304   EXPECT_EQ(MOJO_RESULT_OK,
305             mp2->WriteMessage(0,
306                               kHello, sizeof(kHello),
307                               NULL,
308                               MOJO_WRITE_MESSAGE_FLAG_NONE));
309
310   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
311   EXPECT_EQ(789u, context);
312   mp3->RemoveWaiter(1, &waiter);
313
314   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
315   buffer_size = static_cast<uint32_t>(sizeof(buffer));
316   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
317             mp0->ReadMessage(0,
318                              buffer, &buffer_size,
319                              NULL, NULL,
320                              MOJO_READ_MESSAGE_FLAG_NONE));
321   buffer_size = static_cast<uint32_t>(sizeof(buffer));
322   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
323             mp1->ReadMessage(1,
324                              buffer, &buffer_size,
325                              NULL, NULL,
326                              MOJO_READ_MESSAGE_FLAG_NONE));
327   buffer_size = static_cast<uint32_t>(sizeof(buffer));
328   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
329             mp2->ReadMessage(0,
330                              buffer, &buffer_size,
331                              NULL, NULL,
332                              MOJO_READ_MESSAGE_FLAG_NONE));
333
334   // Read from MP 3, port 1.
335   buffer_size = static_cast<uint32_t>(sizeof(buffer));
336   EXPECT_EQ(MOJO_RESULT_OK,
337             mp3->ReadMessage(1,
338                              buffer, &buffer_size,
339                              NULL, NULL,
340                              MOJO_READ_MESSAGE_FLAG_NONE));
341   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
342   EXPECT_STREQ(kHello, buffer);
343
344   // Write: MP 0, port 0 -> MP 1, port 1 again.
345
346   waiter.Init();
347   EXPECT_EQ(MOJO_RESULT_OK,
348             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
349
350   EXPECT_EQ(MOJO_RESULT_OK,
351             mp0->WriteMessage(0,
352                               kWorld, sizeof(kWorld),
353                               NULL,
354                               MOJO_WRITE_MESSAGE_FLAG_NONE));
355
356   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
357   EXPECT_EQ(123u, context);
358   mp1->RemoveWaiter(1, &waiter);
359
360   // Make sure there's nothing on the other ports.
361   buffer_size = static_cast<uint32_t>(sizeof(buffer));
362   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
363             mp0->ReadMessage(0,
364                              buffer, &buffer_size,
365                              NULL, NULL,
366                              MOJO_READ_MESSAGE_FLAG_NONE));
367   buffer_size = static_cast<uint32_t>(sizeof(buffer));
368   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
369             mp2->ReadMessage(0,
370                              buffer, &buffer_size,
371                              NULL, NULL,
372                              MOJO_READ_MESSAGE_FLAG_NONE));
373   buffer_size = static_cast<uint32_t>(sizeof(buffer));
374   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
375             mp3->ReadMessage(1,
376                              buffer, &buffer_size,
377                              NULL, NULL,
378                              MOJO_READ_MESSAGE_FLAG_NONE));
379
380   buffer_size = static_cast<uint32_t>(sizeof(buffer));
381   EXPECT_EQ(MOJO_RESULT_OK,
382             mp1->ReadMessage(1,
383                              buffer, &buffer_size,
384                              NULL, NULL,
385                              MOJO_READ_MESSAGE_FLAG_NONE));
386   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
387   EXPECT_STREQ(kWorld, buffer);
388
389   mp0->Close(0);
390   mp1->Close(1);
391   mp2->Close(0);
392   mp3->Close(1);
393 }
394
395 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
396   static const char kHello[] = "hello";
397   char buffer[100] = { 0 };
398   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
399   Waiter waiter;
400   uint32_t context = 0;
401
402   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
403   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
404   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
405
406   scoped_refptr<MessagePipe> mp0(new MessagePipe(
407       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
408       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
409
410   // Write to MP 0, port 0.
411   EXPECT_EQ(MOJO_RESULT_OK,
412             mp0->WriteMessage(0,
413                               kHello, sizeof(kHello),
414                               NULL,
415                               MOJO_WRITE_MESSAGE_FLAG_NONE));
416
417   BootstrapMessagePipeNoWait(0, mp0);
418
419
420   // Close MP 0, port 0 before channel 1 is even connected.
421   mp0->Close(0);
422
423   scoped_refptr<MessagePipe> mp1(new MessagePipe(
424       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
425       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
426
427   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
428   // it later, it might already be readable.)
429   waiter.Init();
430   EXPECT_EQ(MOJO_RESULT_OK,
431             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
432
433   BootstrapMessagePipeNoWait(1, mp1);
434
435   // Wait.
436   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
437   EXPECT_EQ(123u, context);
438   mp1->RemoveWaiter(1, &waiter);
439
440   // Read from MP 1, port 1.
441   EXPECT_EQ(MOJO_RESULT_OK,
442             mp1->ReadMessage(1,
443                              buffer, &buffer_size,
444                              NULL, NULL,
445                              MOJO_READ_MESSAGE_FLAG_NONE));
446   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
447   EXPECT_STREQ(kHello, buffer);
448
449   // And MP 1, port 1.
450   mp1->Close(1);
451 }
452
453 TEST_F(RemoteMessagePipeTest, HandlePassing) {
454   static const char kHello[] = "hello";
455   Waiter waiter;
456   uint32_t context = 0;
457
458   scoped_refptr<MessagePipe> mp0(new MessagePipe(
459       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
460       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
461   scoped_refptr<MessagePipe> mp1(new MessagePipe(
462       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
463       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
464   ConnectMessagePipes(mp0, mp1);
465
466   // We'll try to pass this dispatcher.
467   scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher(
468       MessagePipeDispatcher::kDefaultCreateOptions));
469   scoped_refptr<MessagePipe> local_mp(new MessagePipe());
470   dispatcher->Init(local_mp, 0);
471
472   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
473   // it later, it might already be readable.)
474   waiter.Init();
475   EXPECT_EQ(MOJO_RESULT_OK,
476             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
477
478   // Write to MP 0, port 0.
479   {
480     DispatcherTransport
481         transport(test::DispatcherTryStartTransport(dispatcher.get()));
482     EXPECT_TRUE(transport.is_valid());
483
484     std::vector<DispatcherTransport> transports;
485     transports.push_back(transport);
486     EXPECT_EQ(MOJO_RESULT_OK,
487               mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
488                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
489     transport.End();
490
491     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
492     // |dispatcher| is destroyed.
493     EXPECT_TRUE(dispatcher->HasOneRef());
494     dispatcher = NULL;
495   }
496
497   // Wait.
498   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
499   EXPECT_EQ(123u, context);
500   mp1->RemoveWaiter(1, &waiter);
501
502   // Read from MP 1, port 1.
503   char read_buffer[100] = { 0 };
504   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
505   DispatcherVector read_dispatchers;
506   uint32_t read_num_dispatchers = 10;  // Maximum to get.
507   EXPECT_EQ(MOJO_RESULT_OK,
508             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
509                              &read_dispatchers, &read_num_dispatchers,
510                              MOJO_READ_MESSAGE_FLAG_NONE));
511   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
512   EXPECT_STREQ(kHello, read_buffer);
513   EXPECT_EQ(1u, read_dispatchers.size());
514   EXPECT_EQ(1u, read_num_dispatchers);
515   ASSERT_TRUE(read_dispatchers[0]);
516   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
517
518   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
519   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
520
521   // Write to "local_mp", port 1.
522   EXPECT_EQ(MOJO_RESULT_OK,
523             local_mp->WriteMessage(1, kHello, sizeof(kHello), NULL,
524                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
525
526   // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
527   // here. (We don't crash if I sleep and then close.)
528
529   // Wait for the dispatcher to become readable.
530   waiter.Init();
531   EXPECT_EQ(MOJO_RESULT_OK,
532             dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456));
533   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
534   EXPECT_EQ(456u, context);
535   dispatcher->RemoveWaiter(&waiter);
536
537   // Read from the dispatcher.
538   memset(read_buffer, 0, sizeof(read_buffer));
539   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
540   EXPECT_EQ(MOJO_RESULT_OK,
541             dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
542                                     MOJO_READ_MESSAGE_FLAG_NONE));
543   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
544   EXPECT_STREQ(kHello, read_buffer);
545
546   // Prepare to wait on "local_mp", port 1.
547   waiter.Init();
548   EXPECT_EQ(MOJO_RESULT_OK,
549             local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789));
550
551   // Write to the dispatcher.
552   EXPECT_EQ(MOJO_RESULT_OK,
553             dispatcher->WriteMessage(kHello, sizeof(kHello), NULL,
554                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
555
556   // Wait.
557   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
558   EXPECT_EQ(789u, context);
559   local_mp->RemoveWaiter(1, &waiter);
560
561   // Read from "local_mp", port 1.
562   memset(read_buffer, 0, sizeof(read_buffer));
563   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
564   EXPECT_EQ(MOJO_RESULT_OK,
565             local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
566                                   MOJO_READ_MESSAGE_FLAG_NONE));
567   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
568   EXPECT_STREQ(kHello, read_buffer);
569
570   // TODO(vtl): Also test that messages queued up before the handle was sent are
571   // delivered properly.
572
573   // Close everything that belongs to us.
574   mp0->Close(0);
575   mp1->Close(1);
576   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
577   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
578   local_mp->Close(1);
579 }
580
581 #if defined(OS_POSIX)
582 #define MAYBE_SharedBufferPassing SharedBufferPassing
583 #else
584 // Not yet implemented (on Windows).
585 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
586 #endif
587 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
588   static const char kHello[] = "hello";
589   Waiter waiter;
590   uint32_t context = 0;
591
592   scoped_refptr<MessagePipe> mp0(new MessagePipe(
593       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
594       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
595   scoped_refptr<MessagePipe> mp1(new MessagePipe(
596       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
597       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
598   ConnectMessagePipes(mp0, mp1);
599
600   // We'll try to pass this dispatcher.
601   scoped_refptr<SharedBufferDispatcher> dispatcher;
602   EXPECT_EQ(MOJO_RESULT_OK,
603             SharedBufferDispatcher::Create(
604                 SharedBufferDispatcher::kDefaultCreateOptions, 100,
605                 &dispatcher));
606   ASSERT_TRUE(dispatcher);
607
608   // Make a mapping.
609   scoped_ptr<RawSharedBufferMapping> mapping0;
610   EXPECT_EQ(MOJO_RESULT_OK,
611             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
612                                   &mapping0));
613   ASSERT_TRUE(mapping0);
614   ASSERT_TRUE(mapping0->base());
615   ASSERT_EQ(100u, mapping0->length());
616   static_cast<char*>(mapping0->base())[0] = 'A';
617   static_cast<char*>(mapping0->base())[50] = 'B';
618   static_cast<char*>(mapping0->base())[99] = 'C';
619
620   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
621   // it later, it might already be readable.)
622   waiter.Init();
623   EXPECT_EQ(MOJO_RESULT_OK,
624             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
625
626   // Write to MP 0, port 0.
627   {
628     DispatcherTransport
629         transport(test::DispatcherTryStartTransport(dispatcher.get()));
630     EXPECT_TRUE(transport.is_valid());
631
632     std::vector<DispatcherTransport> transports;
633     transports.push_back(transport);
634     EXPECT_EQ(MOJO_RESULT_OK,
635               mp0->WriteMessage(0, kHello, sizeof(kHello), &transports,
636                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
637     transport.End();
638
639     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
640     // |dispatcher| is destroyed.
641     EXPECT_TRUE(dispatcher->HasOneRef());
642     dispatcher = NULL;
643   }
644
645   // Wait.
646   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
647   EXPECT_EQ(123u, context);
648   mp1->RemoveWaiter(1, &waiter);
649
650   // Read from MP 1, port 1.
651   char read_buffer[100] = { 0 };
652   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
653   DispatcherVector read_dispatchers;
654   uint32_t read_num_dispatchers = 10;  // Maximum to get.
655   EXPECT_EQ(MOJO_RESULT_OK,
656             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
657                              &read_dispatchers, &read_num_dispatchers,
658                              MOJO_READ_MESSAGE_FLAG_NONE));
659   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
660   EXPECT_STREQ(kHello, read_buffer);
661   EXPECT_EQ(1u, read_dispatchers.size());
662   EXPECT_EQ(1u, read_num_dispatchers);
663   ASSERT_TRUE(read_dispatchers[0]);
664   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
665
666   EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
667   dispatcher =
668       static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
669
670   // Make another mapping.
671   scoped_ptr<RawSharedBufferMapping> mapping1;
672   EXPECT_EQ(MOJO_RESULT_OK,
673             dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE,
674                                   &mapping1));
675   ASSERT_TRUE(mapping1);
676   ASSERT_TRUE(mapping1->base());
677   ASSERT_EQ(100u, mapping1->length());
678   EXPECT_NE(mapping1->base(), mapping0->base());
679   EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
680   EXPECT_EQ('B', static_cast<char*>(mapping1->base())[50]);
681   EXPECT_EQ('C', static_cast<char*>(mapping1->base())[99]);
682
683   // Write stuff either way.
684   static_cast<char*>(mapping1->base())[1] = 'x';
685   EXPECT_EQ('x', static_cast<char*>(mapping0->base())[1]);
686   static_cast<char*>(mapping0->base())[2] = 'y';
687   EXPECT_EQ('y', static_cast<char*>(mapping1->base())[2]);
688
689   // Kill the first mapping; the second should still be valid.
690   mapping0.reset();
691   EXPECT_EQ('A', static_cast<char*>(mapping1->base())[0]);
692
693   // Close everything that belongs to us.
694   mp0->Close(0);
695   mp1->Close(1);
696   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
697
698   // The second mapping should still be good.
699   EXPECT_EQ('x', static_cast<char*>(mapping1->base())[1]);
700 }
701
702 #if defined(OS_POSIX)
703 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
704 #else
705 // Not yet implemented (on Windows).
706 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
707 #endif
708 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
709   static const char kHello[] = "hello";
710   static const char kWorld[] = "world";
711   Waiter waiter;
712   uint32_t context = 0;
713
714   scoped_refptr<MessagePipe> mp0(new MessagePipe(
715       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
716       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
717   scoped_refptr<MessagePipe> mp1(new MessagePipe(
718       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
719       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
720   ConnectMessagePipes(mp0, mp1);
721
722   base::FilePath unused;
723   base::ScopedFILE fp(CreateAndOpenTemporaryFile(&unused));
724   EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
725   // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
726   // be passed.
727   scoped_refptr<PlatformHandleDispatcher> dispatcher(
728       new PlatformHandleDispatcher(
729           mojo::test::PlatformHandleFromFILE(fp.Pass())));
730
731   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
732   // it later, it might already be readable.)
733   waiter.Init();
734   EXPECT_EQ(MOJO_RESULT_OK,
735             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123));
736
737   // Write to MP 0, port 0.
738   {
739     DispatcherTransport
740         transport(test::DispatcherTryStartTransport(dispatcher.get()));
741     EXPECT_TRUE(transport.is_valid());
742
743     std::vector<DispatcherTransport> transports;
744     transports.push_back(transport);
745     EXPECT_EQ(MOJO_RESULT_OK,
746               mp0->WriteMessage(0, kWorld, sizeof(kWorld), &transports,
747                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
748     transport.End();
749
750     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
751     // |dispatcher| is destroyed.
752     EXPECT_TRUE(dispatcher->HasOneRef());
753     dispatcher = NULL;
754   }
755
756   // Wait.
757   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
758   EXPECT_EQ(123u, context);
759   mp1->RemoveWaiter(1, &waiter);
760
761   // Read from MP 1, port 1.
762   char read_buffer[100] = { 0 };
763   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
764   DispatcherVector read_dispatchers;
765   uint32_t read_num_dispatchers = 10;  // Maximum to get.
766   EXPECT_EQ(MOJO_RESULT_OK,
767             mp1->ReadMessage(1, read_buffer, &read_buffer_size,
768                              &read_dispatchers, &read_num_dispatchers,
769                              MOJO_READ_MESSAGE_FLAG_NONE));
770   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
771   EXPECT_STREQ(kWorld, read_buffer);
772   EXPECT_EQ(1u, read_dispatchers.size());
773   EXPECT_EQ(1u, read_num_dispatchers);
774   ASSERT_TRUE(read_dispatchers[0]);
775   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
776
777   EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
778   dispatcher =
779       static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
780
781   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
782   EXPECT_TRUE(h.is_valid());
783
784   fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
785   EXPECT_FALSE(h.is_valid());
786   EXPECT_TRUE(fp);
787
788   rewind(fp.get());
789   memset(read_buffer, 0, sizeof(read_buffer));
790   EXPECT_EQ(sizeof(kHello),
791             fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
792   EXPECT_STREQ(kHello, read_buffer);
793
794   // Close everything that belongs to us.
795   mp0->Close(0);
796   mp1->Close(1);
797   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
798 }
799
800 // Test racing closes (on each end).
801 // Note: A flaky failure would almost certainly indicate a problem in the code
802 // itself (not in the test). Also, any logged warnings/errors would also
803 // probably be indicative of bugs.
804 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
805   base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
806
807   for (unsigned i = 0; i < 256; i++) {
808     DVLOG(2) << "---------------------------------------- " << i;
809     scoped_refptr<MessagePipe> mp0(new MessagePipe(
810         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
811         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
812     BootstrapMessagePipeNoWait(0, mp0);
813
814     scoped_refptr<MessagePipe> mp1(new MessagePipe(
815         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
816         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
817     BootstrapMessagePipeNoWait(1, mp1);
818
819     if (i & 1u) {
820       io_thread()->task_runner()->PostTask(
821           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
822     }
823     if (i & 2u)
824       base::PlatformThread::Sleep(delay);
825
826     mp0->Close(0);
827
828     if (i & 4u) {
829       io_thread()->task_runner()->PostTask(
830           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
831     }
832     if (i & 8u)
833       base::PlatformThread::Sleep(delay);
834
835     mp1->Close(1);
836
837     RestoreInitialState();
838   }
839 }
840
841 }  // namespace
842 }  // namespace system
843 }  // namespace mojo