Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / mojo / system / remote_message_pipe_unittest.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
8
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/file_util.h"
13 #include "base/files/file_path.h"
14 #include "base/files/scoped_file.h"
15 #include "base/files/scoped_temp_dir.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/threading/platform_thread.h"  // For |Sleep()|.
21 #include "build/build_config.h"              // TODO(vtl): Remove this.
22 #include "mojo/common/test/test_utils.h"
23 #include "mojo/embedder/platform_channel_pair.h"
24 #include "mojo/embedder/platform_shared_buffer.h"
25 #include "mojo/embedder/scoped_platform_handle.h"
26 #include "mojo/embedder/simple_platform_support.h"
27 #include "mojo/system/channel.h"
28 #include "mojo/system/local_message_pipe_endpoint.h"
29 #include "mojo/system/message_pipe.h"
30 #include "mojo/system/message_pipe_dispatcher.h"
31 #include "mojo/system/platform_handle_dispatcher.h"
32 #include "mojo/system/proxy_message_pipe_endpoint.h"
33 #include "mojo/system/raw_channel.h"
34 #include "mojo/system/shared_buffer_dispatcher.h"
35 #include "mojo/system/test_utils.h"
36 #include "mojo/system/waiter.h"
37 #include "testing/gtest/include/gtest/gtest.h"
38
39 namespace mojo {
40 namespace system {
41 namespace {
42
43 class RemoteMessagePipeTest : public testing::Test {
44  public:
45   RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
46   virtual ~RemoteMessagePipeTest() {}
47
48   virtual void SetUp() OVERRIDE {
49     io_thread_.PostTaskAndWait(
50         FROM_HERE,
51         base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
52                    base::Unretained(this)));
53   }
54
55   virtual void TearDown() OVERRIDE {
56     io_thread_.PostTaskAndWait(
57         FROM_HERE,
58         base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
59                    base::Unretained(this)));
60   }
61
62  protected:
63   // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
64   // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
65   // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
66   void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
67                            scoped_refptr<MessagePipe> mp1) {
68     io_thread_.PostTaskAndWait(
69         FROM_HERE,
70         base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
71                    base::Unretained(this),
72                    mp0,
73                    mp1));
74   }
75
76   // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
77   // It assumes/requires that this is the bootstrap case, i.e., that the
78   // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
79   // returns *without* waiting for it to finish connecting.
80   void BootstrapMessagePipeNoWait(unsigned channel_index,
81                                   scoped_refptr<MessagePipe> mp) {
82     io_thread_.PostTask(
83         FROM_HERE,
84         base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
85                    base::Unretained(this),
86                    channel_index,
87                    mp));
88   }
89
90   void RestoreInitialState() {
91     io_thread_.PostTaskAndWait(
92         FROM_HERE,
93         base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
94                    base::Unretained(this)));
95   }
96
97   test::TestIOThread* io_thread() { return &io_thread_; }
98
99  private:
100   void SetUpOnIOThread() {
101     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
102
103     embedder::PlatformChannelPair channel_pair;
104     platform_handles_[0] = channel_pair.PassServerHandle();
105     platform_handles_[1] = channel_pair.PassClientHandle();
106   }
107
108   void TearDownOnIOThread() {
109     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
110
111     if (channels_[0]) {
112       channels_[0]->Shutdown();
113       channels_[0] = NULL;
114     }
115     if (channels_[1]) {
116       channels_[1]->Shutdown();
117       channels_[1] = NULL;
118     }
119   }
120
121   void CreateAndInitChannel(unsigned channel_index) {
122     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
123     CHECK(channel_index == 0 || channel_index == 1);
124     CHECK(!channels_[channel_index]);
125
126     channels_[channel_index] = new Channel();
127     CHECK(channels_[channel_index]->Init(
128         RawChannel::Create(platform_handles_[channel_index].Pass())));
129   }
130
131   void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
132                                      scoped_refptr<MessagePipe> mp1) {
133     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
134
135     if (!channels_[0])
136       CreateAndInitChannel(0);
137     if (!channels_[1])
138       CreateAndInitChannel(1);
139
140     MessageInTransit::EndpointId local_id0 =
141         channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
142     MessageInTransit::EndpointId local_id1 =
143         channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
144
145     CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
146     CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
147   }
148
149   void BootstrapMessagePipeOnIOThread(unsigned channel_index,
150                                       scoped_refptr<MessagePipe> mp) {
151     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
152     CHECK(channel_index == 0 || channel_index == 1);
153
154     unsigned port = channel_index ^ 1u;
155
156     CreateAndInitChannel(channel_index);
157     MessageInTransit::EndpointId endpoint_id =
158         channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
159     if (endpoint_id == MessageInTransit::kInvalidEndpointId)
160       return;
161
162     CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
163     CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
164         Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
165   }
166
167   void RestoreInitialStateOnIOThread() {
168     CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
169
170     TearDownOnIOThread();
171     SetUpOnIOThread();
172   }
173
174   test::TestIOThread io_thread_;
175   embedder::ScopedPlatformHandle platform_handles_[2];
176   scoped_refptr<Channel> channels_[2];
177
178   DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
179 };
180
181 TEST_F(RemoteMessagePipeTest, Basic) {
182   static const char kHello[] = "hello";
183   static const char kWorld[] = "world!!!1!!!1!";
184   char buffer[100] = {0};
185   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
186   Waiter waiter;
187   HandleSignalsState hss;
188   uint32_t context = 0;
189
190   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
191   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
192   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
193
194   scoped_refptr<MessagePipe> mp0(new MessagePipe(
195       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
196       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
197   scoped_refptr<MessagePipe> mp1(new MessagePipe(
198       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
199       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
200   ConnectMessagePipes(mp0, mp1);
201
202   // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
203
204   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
205   // it later, it might already be readable.)
206   waiter.Init();
207   ASSERT_EQ(MOJO_RESULT_OK,
208             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
209
210   // Write to MP 0, port 0.
211   EXPECT_EQ(MOJO_RESULT_OK,
212             mp0->WriteMessage(0,
213                               UserPointer<const void>(kHello),
214                               sizeof(kHello),
215                               NULL,
216                               MOJO_WRITE_MESSAGE_FLAG_NONE));
217
218   // Wait.
219   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
220   EXPECT_EQ(123u, context);
221   hss = HandleSignalsState();
222   mp1->RemoveWaiter(1, &waiter, &hss);
223   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
224             hss.satisfied_signals);
225   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
226             hss.satisfiable_signals);
227
228   // Read from MP 1, port 1.
229   EXPECT_EQ(MOJO_RESULT_OK,
230             mp1->ReadMessage(1,
231                              UserPointer<void>(buffer),
232                              MakeUserPointer(&buffer_size),
233                              NULL,
234                              NULL,
235                              MOJO_READ_MESSAGE_FLAG_NONE));
236   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
237   EXPECT_STREQ(kHello, buffer);
238
239   // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
240
241   waiter.Init();
242   ASSERT_EQ(MOJO_RESULT_OK,
243             mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, NULL));
244
245   EXPECT_EQ(MOJO_RESULT_OK,
246             mp1->WriteMessage(1,
247                               UserPointer<const void>(kWorld),
248                               sizeof(kWorld),
249                               NULL,
250                               MOJO_WRITE_MESSAGE_FLAG_NONE));
251
252   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
253   EXPECT_EQ(456u, context);
254   hss = HandleSignalsState();
255   mp0->RemoveWaiter(0, &waiter, &hss);
256   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
257             hss.satisfied_signals);
258   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
259             hss.satisfiable_signals);
260
261   buffer_size = static_cast<uint32_t>(sizeof(buffer));
262   EXPECT_EQ(MOJO_RESULT_OK,
263             mp0->ReadMessage(0,
264                              UserPointer<void>(buffer),
265                              MakeUserPointer(&buffer_size),
266                              NULL,
267                              NULL,
268                              MOJO_READ_MESSAGE_FLAG_NONE));
269   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
270   EXPECT_STREQ(kWorld, buffer);
271
272   // Close MP 0, port 0.
273   mp0->Close(0);
274
275   // Try to wait for MP 1, port 1 to become readable. This will eventually fail
276   // when it realizes that MP 0, port 0 has been closed. (It may also fail
277   // immediately.)
278   waiter.Init();
279   hss = HandleSignalsState();
280   MojoResult result =
281       mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
282   if (result == MOJO_RESULT_OK) {
283     EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
284               waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
285     EXPECT_EQ(789u, context);
286     hss = HandleSignalsState();
287     mp1->RemoveWaiter(1, &waiter, &hss);
288   }
289   EXPECT_EQ(0u, hss.satisfied_signals);
290   EXPECT_EQ(0u, hss.satisfiable_signals);
291
292   // And MP 1, port 1.
293   mp1->Close(1);
294 }
295
296 TEST_F(RemoteMessagePipeTest, Multiplex) {
297   static const char kHello[] = "hello";
298   static const char kWorld[] = "world!!!1!!!1!";
299   char buffer[100] = {0};
300   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
301   Waiter waiter;
302   HandleSignalsState hss;
303   uint32_t context = 0;
304
305   // Connect message pipes as in the |Basic| test.
306
307   scoped_refptr<MessagePipe> mp0(new MessagePipe(
308       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
309       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
310   scoped_refptr<MessagePipe> mp1(new MessagePipe(
311       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
312       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
313   ConnectMessagePipes(mp0, mp1);
314
315   // Now put another message pipe on the channel.
316
317   scoped_refptr<MessagePipe> mp2(new MessagePipe(
318       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
319       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
320   scoped_refptr<MessagePipe> mp3(new MessagePipe(
321       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
322       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
323   ConnectMessagePipes(mp2, mp3);
324
325   // Write: MP 2, port 0 -> MP 3, port 1.
326
327   waiter.Init();
328   ASSERT_EQ(MOJO_RESULT_OK,
329             mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, NULL));
330
331   EXPECT_EQ(MOJO_RESULT_OK,
332             mp2->WriteMessage(0,
333                               UserPointer<const void>(kHello),
334                               sizeof(kHello),
335                               NULL,
336                               MOJO_WRITE_MESSAGE_FLAG_NONE));
337
338   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
339   EXPECT_EQ(789u, context);
340   hss = HandleSignalsState();
341   mp3->RemoveWaiter(1, &waiter, &hss);
342   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
343             hss.satisfied_signals);
344   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
345             hss.satisfiable_signals);
346
347   // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
348   buffer_size = static_cast<uint32_t>(sizeof(buffer));
349   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
350             mp0->ReadMessage(0,
351                              UserPointer<void>(buffer),
352                              MakeUserPointer(&buffer_size),
353                              NULL,
354                              NULL,
355                              MOJO_READ_MESSAGE_FLAG_NONE));
356   buffer_size = static_cast<uint32_t>(sizeof(buffer));
357   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
358             mp1->ReadMessage(1,
359                              UserPointer<void>(buffer),
360                              MakeUserPointer(&buffer_size),
361                              NULL,
362                              NULL,
363                              MOJO_READ_MESSAGE_FLAG_NONE));
364   buffer_size = static_cast<uint32_t>(sizeof(buffer));
365   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
366             mp2->ReadMessage(0,
367                              UserPointer<void>(buffer),
368                              MakeUserPointer(&buffer_size),
369                              NULL,
370                              NULL,
371                              MOJO_READ_MESSAGE_FLAG_NONE));
372
373   // Read from MP 3, port 1.
374   buffer_size = static_cast<uint32_t>(sizeof(buffer));
375   EXPECT_EQ(MOJO_RESULT_OK,
376             mp3->ReadMessage(1,
377                              UserPointer<void>(buffer),
378                              MakeUserPointer(&buffer_size),
379                              NULL,
380                              NULL,
381                              MOJO_READ_MESSAGE_FLAG_NONE));
382   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
383   EXPECT_STREQ(kHello, buffer);
384
385   // Write: MP 0, port 0 -> MP 1, port 1 again.
386
387   waiter.Init();
388   ASSERT_EQ(MOJO_RESULT_OK,
389             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
390
391   EXPECT_EQ(MOJO_RESULT_OK,
392             mp0->WriteMessage(0,
393                               UserPointer<const void>(kWorld),
394                               sizeof(kWorld),
395                               NULL,
396                               MOJO_WRITE_MESSAGE_FLAG_NONE));
397
398   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
399   EXPECT_EQ(123u, context);
400   hss = HandleSignalsState();
401   mp1->RemoveWaiter(1, &waiter, &hss);
402   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
403             hss.satisfied_signals);
404   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
405             hss.satisfiable_signals);
406
407   // Make sure there's nothing on the other ports.
408   buffer_size = static_cast<uint32_t>(sizeof(buffer));
409   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
410             mp0->ReadMessage(0,
411                              UserPointer<void>(buffer),
412                              MakeUserPointer(&buffer_size),
413                              NULL,
414                              NULL,
415                              MOJO_READ_MESSAGE_FLAG_NONE));
416   buffer_size = static_cast<uint32_t>(sizeof(buffer));
417   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
418             mp2->ReadMessage(0,
419                              UserPointer<void>(buffer),
420                              MakeUserPointer(&buffer_size),
421                              NULL,
422                              NULL,
423                              MOJO_READ_MESSAGE_FLAG_NONE));
424   buffer_size = static_cast<uint32_t>(sizeof(buffer));
425   EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
426             mp3->ReadMessage(1,
427                              UserPointer<void>(buffer),
428                              MakeUserPointer(&buffer_size),
429                              NULL,
430                              NULL,
431                              MOJO_READ_MESSAGE_FLAG_NONE));
432
433   buffer_size = static_cast<uint32_t>(sizeof(buffer));
434   EXPECT_EQ(MOJO_RESULT_OK,
435             mp1->ReadMessage(1,
436                              UserPointer<void>(buffer),
437                              MakeUserPointer(&buffer_size),
438                              NULL,
439                              NULL,
440                              MOJO_READ_MESSAGE_FLAG_NONE));
441   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
442   EXPECT_STREQ(kWorld, buffer);
443
444   mp0->Close(0);
445   mp1->Close(1);
446   mp2->Close(0);
447   mp3->Close(1);
448 }
449
450 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
451   static const char kHello[] = "hello";
452   char buffer[100] = {0};
453   uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
454   Waiter waiter;
455   HandleSignalsState hss;
456   uint32_t context = 0;
457
458   // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
459   // connected to MP 1, port 0, which will be attached to channel 1. This leaves
460   // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
461
462   scoped_refptr<MessagePipe> mp0(new MessagePipe(
463       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
464       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
465
466   // Write to MP 0, port 0.
467   EXPECT_EQ(MOJO_RESULT_OK,
468             mp0->WriteMessage(0,
469                               UserPointer<const void>(kHello),
470                               sizeof(kHello),
471                               NULL,
472                               MOJO_WRITE_MESSAGE_FLAG_NONE));
473
474   BootstrapMessagePipeNoWait(0, mp0);
475
476   // Close MP 0, port 0 before channel 1 is even connected.
477   mp0->Close(0);
478
479   scoped_refptr<MessagePipe> mp1(new MessagePipe(
480       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
481       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
482
483   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
484   // it later, it might already be readable.)
485   waiter.Init();
486   ASSERT_EQ(MOJO_RESULT_OK,
487             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
488
489   BootstrapMessagePipeNoWait(1, mp1);
490
491   // Wait.
492   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
493   EXPECT_EQ(123u, context);
494   hss = HandleSignalsState();
495   // Note: MP 1, port 1 should definitely should be readable, but it may or may
496   // not appear as writable (there's a race, and it may not have noticed that
497   // the other side was closed yet -- e.g., inserting a sleep here would make it
498   // much more likely to notice that it's no longer writable).
499   mp1->RemoveWaiter(1, &waiter, &hss);
500   EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
501   EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
502
503   // Read from MP 1, port 1.
504   EXPECT_EQ(MOJO_RESULT_OK,
505             mp1->ReadMessage(1,
506                              UserPointer<void>(buffer),
507                              MakeUserPointer(&buffer_size),
508                              NULL,
509                              NULL,
510                              MOJO_READ_MESSAGE_FLAG_NONE));
511   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
512   EXPECT_STREQ(kHello, buffer);
513
514   // And MP 1, port 1.
515   mp1->Close(1);
516 }
517
518 TEST_F(RemoteMessagePipeTest, HandlePassing) {
519   static const char kHello[] = "hello";
520   Waiter waiter;
521   HandleSignalsState hss;
522   uint32_t context = 0;
523
524   scoped_refptr<MessagePipe> mp0(new MessagePipe(
525       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
526       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
527   scoped_refptr<MessagePipe> mp1(new MessagePipe(
528       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
529       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
530   ConnectMessagePipes(mp0, mp1);
531
532   // We'll try to pass this dispatcher.
533   scoped_refptr<MessagePipeDispatcher> dispatcher(
534       new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
535   scoped_refptr<MessagePipe> local_mp(new MessagePipe());
536   dispatcher->Init(local_mp, 0);
537
538   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
539   // it later, it might already be readable.)
540   waiter.Init();
541   ASSERT_EQ(MOJO_RESULT_OK,
542             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
543
544   // Write to MP 0, port 0.
545   {
546     DispatcherTransport transport(
547         test::DispatcherTryStartTransport(dispatcher.get()));
548     EXPECT_TRUE(transport.is_valid());
549
550     std::vector<DispatcherTransport> transports;
551     transports.push_back(transport);
552     EXPECT_EQ(MOJO_RESULT_OK,
553               mp0->WriteMessage(0,
554                                 UserPointer<const void>(kHello),
555                                 sizeof(kHello),
556                                 &transports,
557                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
558     transport.End();
559
560     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
561     // |dispatcher| is destroyed.
562     EXPECT_TRUE(dispatcher->HasOneRef());
563     dispatcher = NULL;
564   }
565
566   // Wait.
567   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
568   EXPECT_EQ(123u, context);
569   hss = HandleSignalsState();
570   mp1->RemoveWaiter(1, &waiter, &hss);
571   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
572             hss.satisfied_signals);
573   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
574             hss.satisfiable_signals);
575
576   // Read from MP 1, port 1.
577   char read_buffer[100] = {0};
578   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
579   DispatcherVector read_dispatchers;
580   uint32_t read_num_dispatchers = 10;  // Maximum to get.
581   EXPECT_EQ(MOJO_RESULT_OK,
582             mp1->ReadMessage(1,
583                              UserPointer<void>(read_buffer),
584                              MakeUserPointer(&read_buffer_size),
585                              &read_dispatchers,
586                              &read_num_dispatchers,
587                              MOJO_READ_MESSAGE_FLAG_NONE));
588   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
589   EXPECT_STREQ(kHello, read_buffer);
590   EXPECT_EQ(1u, read_dispatchers.size());
591   EXPECT_EQ(1u, read_num_dispatchers);
592   ASSERT_TRUE(read_dispatchers[0]);
593   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
594
595   EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
596   dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
597
598   // Add the waiter now, before it becomes readable to avoid a race.
599   waiter.Init();
600   ASSERT_EQ(
601       MOJO_RESULT_OK,
602       dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, NULL));
603
604   // Write to "local_mp", port 1.
605   EXPECT_EQ(MOJO_RESULT_OK,
606             local_mp->WriteMessage(1,
607                                    UserPointer<const void>(kHello),
608                                    sizeof(kHello),
609                                    NULL,
610                                    MOJO_WRITE_MESSAGE_FLAG_NONE));
611
612   // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
613   // here. (We don't crash if I sleep and then close.)
614
615   // Wait for the dispatcher to become readable.
616   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
617   EXPECT_EQ(456u, context);
618   hss = HandleSignalsState();
619   dispatcher->RemoveWaiter(&waiter, &hss);
620   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
621             hss.satisfied_signals);
622   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
623             hss.satisfiable_signals);
624
625   // Read from the dispatcher.
626   memset(read_buffer, 0, sizeof(read_buffer));
627   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
628   EXPECT_EQ(MOJO_RESULT_OK,
629             dispatcher->ReadMessage(UserPointer<void>(read_buffer),
630                                     MakeUserPointer(&read_buffer_size),
631                                     0,
632                                     NULL,
633                                     MOJO_READ_MESSAGE_FLAG_NONE));
634   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
635   EXPECT_STREQ(kHello, read_buffer);
636
637   // Prepare to wait on "local_mp", port 1.
638   waiter.Init();
639   ASSERT_EQ(
640       MOJO_RESULT_OK,
641       local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, NULL));
642
643   // Write to the dispatcher.
644   EXPECT_EQ(MOJO_RESULT_OK,
645             dispatcher->WriteMessage(UserPointer<const void>(kHello),
646                                      sizeof(kHello),
647                                      NULL,
648                                      MOJO_WRITE_MESSAGE_FLAG_NONE));
649
650   // Wait.
651   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
652   EXPECT_EQ(789u, context);
653   hss = HandleSignalsState();
654   local_mp->RemoveWaiter(1, &waiter, &hss);
655   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
656             hss.satisfied_signals);
657   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
658             hss.satisfiable_signals);
659
660   // Read from "local_mp", port 1.
661   memset(read_buffer, 0, sizeof(read_buffer));
662   read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
663   EXPECT_EQ(MOJO_RESULT_OK,
664             local_mp->ReadMessage(1,
665                                   UserPointer<void>(read_buffer),
666                                   MakeUserPointer(&read_buffer_size),
667                                   NULL,
668                                   NULL,
669                                   MOJO_READ_MESSAGE_FLAG_NONE));
670   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
671   EXPECT_STREQ(kHello, read_buffer);
672
673   // TODO(vtl): Also test that messages queued up before the handle was sent are
674   // delivered properly.
675
676   // Close everything that belongs to us.
677   mp0->Close(0);
678   mp1->Close(1);
679   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
680   // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
681   local_mp->Close(1);
682 }
683
684 #if defined(OS_POSIX)
685 #define MAYBE_SharedBufferPassing SharedBufferPassing
686 #else
687 // Not yet implemented (on Windows).
688 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
689 #endif
690 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
691   static const char kHello[] = "hello";
692   Waiter waiter;
693   HandleSignalsState hss;
694   uint32_t context = 0;
695
696   scoped_refptr<MessagePipe> mp0(new MessagePipe(
697       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
698       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
699   scoped_refptr<MessagePipe> mp1(new MessagePipe(
700       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
701       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
702   ConnectMessagePipes(mp0, mp1);
703
704   // We'll try to pass this dispatcher.
705   embedder::SimplePlatformSupport platform_support;
706   scoped_refptr<SharedBufferDispatcher> dispatcher;
707   EXPECT_EQ(MOJO_RESULT_OK,
708             SharedBufferDispatcher::Create(
709                 &platform_support,
710                 SharedBufferDispatcher::kDefaultCreateOptions,
711                 100,
712                 &dispatcher));
713   ASSERT_TRUE(dispatcher);
714
715   // Make a mapping.
716   scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
717   EXPECT_EQ(
718       MOJO_RESULT_OK,
719       dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
720   ASSERT_TRUE(mapping0);
721   ASSERT_TRUE(mapping0->GetBase());
722   ASSERT_EQ(100u, mapping0->GetLength());
723   static_cast<char*>(mapping0->GetBase())[0] = 'A';
724   static_cast<char*>(mapping0->GetBase())[50] = 'B';
725   static_cast<char*>(mapping0->GetBase())[99] = 'C';
726
727   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
728   // it later, it might already be readable.)
729   waiter.Init();
730   ASSERT_EQ(MOJO_RESULT_OK,
731             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
732
733   // Write to MP 0, port 0.
734   {
735     DispatcherTransport transport(
736         test::DispatcherTryStartTransport(dispatcher.get()));
737     EXPECT_TRUE(transport.is_valid());
738
739     std::vector<DispatcherTransport> transports;
740     transports.push_back(transport);
741     EXPECT_EQ(MOJO_RESULT_OK,
742               mp0->WriteMessage(0,
743                                 UserPointer<const void>(kHello),
744                                 sizeof(kHello),
745                                 &transports,
746                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
747     transport.End();
748
749     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
750     // |dispatcher| is destroyed.
751     EXPECT_TRUE(dispatcher->HasOneRef());
752     dispatcher = NULL;
753   }
754
755   // Wait.
756   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
757   EXPECT_EQ(123u, context);
758   hss = HandleSignalsState();
759   mp1->RemoveWaiter(1, &waiter, &hss);
760   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
761             hss.satisfied_signals);
762   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
763             hss.satisfiable_signals);
764
765   // Read from MP 1, port 1.
766   char read_buffer[100] = {0};
767   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
768   DispatcherVector read_dispatchers;
769   uint32_t read_num_dispatchers = 10;  // Maximum to get.
770   EXPECT_EQ(MOJO_RESULT_OK,
771             mp1->ReadMessage(1,
772                              UserPointer<void>(read_buffer),
773                              MakeUserPointer(&read_buffer_size),
774                              &read_dispatchers,
775                              &read_num_dispatchers,
776                              MOJO_READ_MESSAGE_FLAG_NONE));
777   EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
778   EXPECT_STREQ(kHello, read_buffer);
779   EXPECT_EQ(1u, read_dispatchers.size());
780   EXPECT_EQ(1u, read_num_dispatchers);
781   ASSERT_TRUE(read_dispatchers[0]);
782   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
783
784   EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
785   dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
786
787   // Make another mapping.
788   scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
789   EXPECT_EQ(
790       MOJO_RESULT_OK,
791       dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
792   ASSERT_TRUE(mapping1);
793   ASSERT_TRUE(mapping1->GetBase());
794   ASSERT_EQ(100u, mapping1->GetLength());
795   EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
796   EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
797   EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
798   EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
799
800   // Write stuff either way.
801   static_cast<char*>(mapping1->GetBase())[1] = 'x';
802   EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
803   static_cast<char*>(mapping0->GetBase())[2] = 'y';
804   EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
805
806   // Kill the first mapping; the second should still be valid.
807   mapping0.reset();
808   EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
809
810   // Close everything that belongs to us.
811   mp0->Close(0);
812   mp1->Close(1);
813   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
814
815   // The second mapping should still be good.
816   EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
817 }
818
819 #if defined(OS_POSIX)
820 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
821 #else
822 // Not yet implemented (on Windows).
823 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
824 #endif
825 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
826   base::ScopedTempDir temp_dir;
827   ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
828
829   static const char kHello[] = "hello";
830   static const char kWorld[] = "world";
831   Waiter waiter;
832   uint32_t context = 0;
833   HandleSignalsState hss;
834
835   scoped_refptr<MessagePipe> mp0(new MessagePipe(
836       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
837       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
838   scoped_refptr<MessagePipe> mp1(new MessagePipe(
839       scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
840       scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
841   ConnectMessagePipes(mp0, mp1);
842
843   base::FilePath unused;
844   base::ScopedFILE fp(
845       CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
846   EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
847   // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
848   // be passed.
849   scoped_refptr<PlatformHandleDispatcher> dispatcher(
850       new PlatformHandleDispatcher(
851           mojo::test::PlatformHandleFromFILE(fp.Pass())));
852
853   // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
854   // it later, it might already be readable.)
855   waiter.Init();
856   ASSERT_EQ(MOJO_RESULT_OK,
857             mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
858
859   // Write to MP 0, port 0.
860   {
861     DispatcherTransport transport(
862         test::DispatcherTryStartTransport(dispatcher.get()));
863     EXPECT_TRUE(transport.is_valid());
864
865     std::vector<DispatcherTransport> transports;
866     transports.push_back(transport);
867     EXPECT_EQ(MOJO_RESULT_OK,
868               mp0->WriteMessage(0,
869                                 UserPointer<const void>(kWorld),
870                                 sizeof(kWorld),
871                                 &transports,
872                                 MOJO_WRITE_MESSAGE_FLAG_NONE));
873     transport.End();
874
875     // |dispatcher| should have been closed. This is |DCHECK()|ed when the
876     // |dispatcher| is destroyed.
877     EXPECT_TRUE(dispatcher->HasOneRef());
878     dispatcher = NULL;
879   }
880
881   // Wait.
882   EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
883   EXPECT_EQ(123u, context);
884   hss = HandleSignalsState();
885   mp1->RemoveWaiter(1, &waiter, &hss);
886   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
887             hss.satisfied_signals);
888   EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
889             hss.satisfiable_signals);
890
891   // Read from MP 1, port 1.
892   char read_buffer[100] = {0};
893   uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
894   DispatcherVector read_dispatchers;
895   uint32_t read_num_dispatchers = 10;  // Maximum to get.
896   EXPECT_EQ(MOJO_RESULT_OK,
897             mp1->ReadMessage(1,
898                              UserPointer<void>(read_buffer),
899                              MakeUserPointer(&read_buffer_size),
900                              &read_dispatchers,
901                              &read_num_dispatchers,
902                              MOJO_READ_MESSAGE_FLAG_NONE));
903   EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
904   EXPECT_STREQ(kWorld, read_buffer);
905   EXPECT_EQ(1u, read_dispatchers.size());
906   EXPECT_EQ(1u, read_num_dispatchers);
907   ASSERT_TRUE(read_dispatchers[0]);
908   EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
909
910   EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
911   dispatcher =
912       static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
913
914   embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
915   EXPECT_TRUE(h.is_valid());
916
917   fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
918   EXPECT_FALSE(h.is_valid());
919   EXPECT_TRUE(fp);
920
921   rewind(fp.get());
922   memset(read_buffer, 0, sizeof(read_buffer));
923   EXPECT_EQ(sizeof(kHello),
924             fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
925   EXPECT_STREQ(kHello, read_buffer);
926
927   // Close everything that belongs to us.
928   mp0->Close(0);
929   mp1->Close(1);
930   EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
931 }
932
933 // Test racing closes (on each end).
934 // Note: A flaky failure would almost certainly indicate a problem in the code
935 // itself (not in the test). Also, any logged warnings/errors would also
936 // probably be indicative of bugs.
937 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
938   base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
939
940   for (unsigned i = 0; i < 256; i++) {
941     DVLOG(2) << "---------------------------------------- " << i;
942     scoped_refptr<MessagePipe> mp0(new MessagePipe(
943         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
944         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
945     BootstrapMessagePipeNoWait(0, mp0);
946
947     scoped_refptr<MessagePipe> mp1(new MessagePipe(
948         scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
949         scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
950     BootstrapMessagePipeNoWait(1, mp1);
951
952     if (i & 1u) {
953       io_thread()->task_runner()->PostTask(
954           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
955     }
956     if (i & 2u)
957       base::PlatformThread::Sleep(delay);
958
959     mp0->Close(0);
960
961     if (i & 4u) {
962       io_thread()->task_runner()->PostTask(
963           FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
964     }
965     if (i & 8u)
966       base::PlatformThread::Sleep(delay);
967
968     mp1->Close(1);
969
970     RestoreInitialState();
971   }
972 }
973
974 }  // namespace
975 }  // namespace system
976 }  // namespace mojo