- add third_party src.
[platform/framework/web/crosswalk.git] / src / third_party / libjingle / source / talk / base / virtualsocket_unittest.cc
1 /*
2  * libjingle
3  * Copyright 2006, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #include <time.h>
29 #ifdef POSIX
30 #include <netinet/in.h>
31 #endif
32 #include <cmath>
33
34 #include "talk/base/logging.h"
35 #include "talk/base/gunit.h"
36 #include "talk/base/testclient.h"
37 #include "talk/base/testutils.h"
38 #include "talk/base/thread.h"
39 #include "talk/base/timeutils.h"
40 #include "talk/base/virtualsocketserver.h"
41
42 using namespace talk_base;
43
44 // Sends at a constant rate but with random packet sizes.
45 struct Sender : public MessageHandler {
46   Sender(Thread* th, AsyncSocket* s, uint32 rt)
47       : thread(th), socket(new AsyncUDPSocket(s)),
48         done(false), rate(rt), count(0) {
49     last_send = talk_base::Time();
50     thread->PostDelayed(NextDelay(), this, 1);
51   }
52
53   uint32 NextDelay() {
54     uint32 size = (rand() % 4096) + 1;
55     return 1000 * size / rate;
56   }
57
58   void OnMessage(Message* pmsg) {
59     ASSERT_EQ(1u, pmsg->message_id);
60
61     if (done)
62       return;
63
64     uint32 cur_time = talk_base::Time();
65     uint32 delay = cur_time - last_send;
66     uint32 size = rate * delay / 1000;
67     size = std::min<uint32>(size, 4096);
68     size = std::max<uint32>(size, sizeof(uint32));
69
70     count += size;
71     memcpy(dummy, &cur_time, sizeof(cur_time));
72     socket->Send(dummy, size, DSCP_NO_CHANGE);
73
74     last_send = cur_time;
75     thread->PostDelayed(NextDelay(), this, 1);
76   }
77
78   Thread* thread;
79   scoped_ptr<AsyncUDPSocket> socket;
80   bool done;
81   uint32 rate;  // bytes per second
82   uint32 count;
83   uint32 last_send;
84   char dummy[4096];
85 };
86
87 struct Receiver : public MessageHandler, public sigslot::has_slots<> {
88   Receiver(Thread* th, AsyncSocket* s, uint32 bw)
89       : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
90         count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
91     socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
92     thread->PostDelayed(1000, this, 1);
93   }
94
95   ~Receiver() {
96     thread->Clear(this);
97   }
98
99   void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
100                     const SocketAddress& remote_addr) {
101     ASSERT_EQ(socket.get(), s);
102     ASSERT_GE(size, 4U);
103
104     count += size;
105     sec_count += size;
106
107     uint32 send_time = *reinterpret_cast<const uint32*>(data);
108     uint32 recv_time = talk_base::Time();
109     uint32 delay = recv_time - send_time;
110     sum += delay;
111     sum_sq += delay * delay;
112     samples += 1;
113   }
114
115   void OnMessage(Message* pmsg) {
116     ASSERT_EQ(1u, pmsg->message_id);
117
118     if (done)
119       return;
120
121     // It is always possible for us to receive more than expected because
122     // packets can be further delayed in delivery.
123     if (bandwidth > 0)
124       ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
125     sec_count = 0;
126     thread->PostDelayed(1000, this, 1);
127   }
128
129   Thread* thread;
130   scoped_ptr<AsyncUDPSocket> socket;
131   uint32 bandwidth;
132   bool done;
133   size_t count;
134   size_t sec_count;
135   double sum;
136   double sum_sq;
137   uint32 samples;
138 };
139
140 class VirtualSocketServerTest : public testing::Test {
141  public:
142   VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
143                               kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
144                               kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
145   }
146
147   void CheckAddressIncrementalization(const SocketAddress& post,
148                                       const SocketAddress& pre) {
149     EXPECT_EQ(post.port(), pre.port() + 1);
150     IPAddress post_ip = post.ipaddr();
151     IPAddress pre_ip = pre.ipaddr();
152     EXPECT_EQ(pre_ip.family(), post_ip.family());
153     if (post_ip.family() == AF_INET) {
154       in_addr pre_ipv4 = pre_ip.ipv4_address();
155       in_addr post_ipv4 = post_ip.ipv4_address();
156       int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
157       EXPECT_EQ(1, difference);
158     } else if (post_ip.family() == AF_INET6) {
159       in6_addr post_ip6 = post_ip.ipv6_address();
160       in6_addr pre_ip6 = pre_ip.ipv6_address();
161       uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
162       uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
163       EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
164     }
165   }
166
167   void BasicTest(const SocketAddress& initial_addr) {
168     AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
169                                                  SOCK_DGRAM);
170     socket->Bind(initial_addr);
171     SocketAddress server_addr = socket->GetLocalAddress();
172     // Make sure VSS didn't switch families on us.
173     EXPECT_EQ(server_addr.family(), initial_addr.family());
174
175     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
176     AsyncSocket* socket2 =
177         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
178     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
179
180     SocketAddress client2_addr;
181     EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
182     EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
183
184     SocketAddress client1_addr;
185     EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
186     EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
187     EXPECT_EQ(client1_addr, server_addr);
188
189     SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
190     for (int i = 0; i < 10; i++) {
191       client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
192
193       SocketAddress next_client2_addr;
194       EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
195       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
196       CheckAddressIncrementalization(next_client2_addr, client2_addr);
197       // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
198
199       SocketAddress server_addr2;
200       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
201       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
202       EXPECT_EQ(server_addr2, server_addr);
203
204       client2_addr = next_client2_addr;
205     }
206   }
207
208   // initial_addr should be made from either INADDR_ANY or in6addr_any.
209   void ConnectTest(const SocketAddress& initial_addr) {
210     testing::StreamSink sink;
211     SocketAddress accept_addr;
212     const SocketAddress kEmptyAddr =
213         EmptySocketAddressWithFamily(initial_addr.family());
214
215     // Create client
216     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
217                                                  SOCK_STREAM);
218     sink.Monitor(client);
219     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
220     EXPECT_TRUE(client->GetLocalAddress().IsNil());
221
222     // Create server
223     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
224                                                  SOCK_STREAM);
225     sink.Monitor(server);
226     EXPECT_NE(0, server->Listen(5));  // Bind required
227     EXPECT_EQ(0, server->Bind(initial_addr));
228     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
229     EXPECT_EQ(0, server->Listen(5));
230     EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
231
232     // No pending server connections
233     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
234     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
235     EXPECT_EQ(AF_UNSPEC, accept_addr.family());
236
237     // Attempt connect to listening socket
238     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
239     EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
240     EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
241     EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
242
243     // Client is connecting
244     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
245     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
246     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
247
248     ss_->ProcessMessagesUntilIdle();
249
250     // Client still connecting
251     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
252     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
253     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
254
255     // Server has pending connection
256     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
257     Socket* accepted = server->Accept(&accept_addr);
258     EXPECT_TRUE(NULL != accepted);
259     EXPECT_NE(accept_addr, kEmptyAddr);
260     EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
261
262     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
263     EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
264     EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
265
266     ss_->ProcessMessagesUntilIdle();
267
268     // Client has connected
269     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
270     EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
271     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
272     EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
273     EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
274   }
275
276   void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
277     testing::StreamSink sink;
278     SocketAddress accept_addr;
279     const SocketAddress nil_addr;
280     const SocketAddress empty_addr =
281         EmptySocketAddressWithFamily(initial_addr.family());
282
283     // Create client
284     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
285                                                  SOCK_STREAM);
286     sink.Monitor(client);
287
288     // Create server
289     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
290                                                  SOCK_STREAM);
291     sink.Monitor(server);
292     EXPECT_EQ(0, server->Bind(initial_addr));
293     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
294     // Attempt connect to non-listening socket
295     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
296
297     ss_->ProcessMessagesUntilIdle();
298
299     // No pending server connections
300     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
301     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
302     EXPECT_EQ(accept_addr, nil_addr);
303
304     // Connection failed
305     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
306     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
307     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
308     EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
309   }
310
311   void CloseDuringConnectTest(const SocketAddress& initial_addr) {
312     testing::StreamSink sink;
313     SocketAddress accept_addr;
314     const SocketAddress empty_addr =
315         EmptySocketAddressWithFamily(initial_addr.family());
316
317     // Create client and server
318     scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
319                                                           SOCK_STREAM));
320     sink.Monitor(client.get());
321     scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
322                                                           SOCK_STREAM));
323     sink.Monitor(server.get());
324
325     // Initiate connect
326     EXPECT_EQ(0, server->Bind(initial_addr));
327     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
328
329     EXPECT_EQ(0, server->Listen(5));
330     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
331
332     // Server close before socket enters accept queue
333     EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
334     server->Close();
335
336     ss_->ProcessMessagesUntilIdle();
337
338     // Result: connection failed
339     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
340     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
341
342     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
343     sink.Monitor(server.get());
344
345     // Initiate connect
346     EXPECT_EQ(0, server->Bind(initial_addr));
347     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
348
349     EXPECT_EQ(0, server->Listen(5));
350     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
351
352     ss_->ProcessMessagesUntilIdle();
353
354     // Server close while socket is in accept queue
355     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
356     server->Close();
357
358     ss_->ProcessMessagesUntilIdle();
359
360     // Result: connection failed
361     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
362     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
363
364     // New server
365     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
366     sink.Monitor(server.get());
367
368     // Initiate connect
369     EXPECT_EQ(0, server->Bind(initial_addr));
370     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
371
372     EXPECT_EQ(0, server->Listen(5));
373     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
374
375     ss_->ProcessMessagesUntilIdle();
376
377     // Server accepts connection
378     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
379     scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
380     ASSERT_TRUE(NULL != accepted.get());
381     sink.Monitor(accepted.get());
382
383     // Client closes before connection complets
384     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
385
386     // Connected message has not been processed yet.
387     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
388     client->Close();
389
390     ss_->ProcessMessagesUntilIdle();
391
392     // Result: accepted socket closes
393     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
394     EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
395     EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
396   }
397
398   void CloseTest(const SocketAddress& initial_addr) {
399     testing::StreamSink sink;
400     const SocketAddress kEmptyAddr;
401
402     // Create clients
403     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
404     sink.Monitor(a);
405     a->Bind(initial_addr);
406     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
407
408
409     scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
410                                                      SOCK_STREAM));
411     sink.Monitor(b.get());
412     b->Bind(initial_addr);
413     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
414
415     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
416     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
417
418     ss_->ProcessMessagesUntilIdle();
419
420     EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
421     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
422     EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
423
424     EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
425     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
426     EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
427
428     EXPECT_EQ(1, a->Send("a", 1));
429     b->Close();
430     EXPECT_EQ(1, a->Send("b", 1));
431
432     ss_->ProcessMessagesUntilIdle();
433
434     char buffer[10];
435     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
436     EXPECT_EQ(-1, b->Recv(buffer, 10));
437
438     EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
439     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
440     EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
441
442     // No signal for Closer
443     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
444     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
445     EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
446   }
447
448   void TcpSendTest(const SocketAddress& initial_addr) {
449     testing::StreamSink sink;
450     const SocketAddress kEmptyAddr;
451
452     // Connect two sockets
453     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
454     sink.Monitor(a);
455     a->Bind(initial_addr);
456     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
457
458     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
459     sink.Monitor(b);
460     b->Bind(initial_addr);
461     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
462
463     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
464     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
465
466     ss_->ProcessMessagesUntilIdle();
467
468     const size_t kBufferSize = 2000;
469     ss_->set_send_buffer_capacity(kBufferSize);
470     ss_->set_recv_buffer_capacity(kBufferSize);
471
472     const size_t kDataSize = 5000;
473     char send_buffer[kDataSize], recv_buffer[kDataSize];
474     for (size_t i = 0; i < kDataSize; ++i)
475       send_buffer[i] = static_cast<char>(i % 256);
476     memset(recv_buffer, 0, sizeof(recv_buffer));
477     size_t send_pos = 0, recv_pos = 0;
478
479     // Can't send more than send buffer in one write
480     int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
481     EXPECT_EQ(static_cast<int>(kBufferSize), result);
482     send_pos += result;
483
484     ss_->ProcessMessagesUntilIdle();
485     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
486     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
487
488     // Receive buffer is already filled, fill send buffer again
489     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
490     EXPECT_EQ(static_cast<int>(kBufferSize), result);
491     send_pos += result;
492
493     ss_->ProcessMessagesUntilIdle();
494     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
495     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
496
497     // No more room in send or receive buffer
498     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
499     EXPECT_EQ(-1, result);
500     EXPECT_TRUE(a->IsBlocking());
501
502     // Read a subset of the data
503     result = b->Recv(recv_buffer + recv_pos, 500);
504     EXPECT_EQ(500, result);
505     recv_pos += result;
506
507     ss_->ProcessMessagesUntilIdle();
508     EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
509     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
510
511     // Room for more on the sending side
512     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
513     EXPECT_EQ(500, result);
514     send_pos += result;
515
516     // Empty the recv buffer
517     while (true) {
518       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
519       if (result < 0) {
520         EXPECT_EQ(-1, result);
521         EXPECT_TRUE(b->IsBlocking());
522         break;
523       }
524       recv_pos += result;
525     }
526
527     ss_->ProcessMessagesUntilIdle();
528     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
529
530     // Continue to empty the recv buffer
531     while (true) {
532       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
533       if (result < 0) {
534         EXPECT_EQ(-1, result);
535         EXPECT_TRUE(b->IsBlocking());
536         break;
537       }
538       recv_pos += result;
539     }
540
541     // Send last of the data
542     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
543     EXPECT_EQ(500, result);
544     send_pos += result;
545
546     ss_->ProcessMessagesUntilIdle();
547     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
548
549     // Receive the last of the data
550     while (true) {
551       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
552       if (result < 0) {
553         EXPECT_EQ(-1, result);
554         EXPECT_TRUE(b->IsBlocking());
555         break;
556       }
557       recv_pos += result;
558     }
559
560     ss_->ProcessMessagesUntilIdle();
561     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
562
563     // The received data matches the sent data
564     EXPECT_EQ(kDataSize, send_pos);
565     EXPECT_EQ(kDataSize, recv_pos);
566     EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
567   }
568
569   void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
570     const SocketAddress kEmptyAddr;
571
572     // Connect two sockets
573     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
574                                             SOCK_STREAM);
575     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
576                                             SOCK_STREAM);
577     a->Bind(initial_addr);
578     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
579
580     b->Bind(initial_addr);
581     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
582
583     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
584     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
585     ss_->ProcessMessagesUntilIdle();
586
587     // First, deliver all packets in 0 ms.
588     char buffer[2] = { 0, 0 };
589     const char cNumPackets = 10;
590     for (char i = 0; i < cNumPackets; ++i) {
591       buffer[0] = '0' + i;
592       EXPECT_EQ(1, a->Send(buffer, 1));
593     }
594
595     ss_->ProcessMessagesUntilIdle();
596
597     for (char i = 0; i < cNumPackets; ++i) {
598       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
599       EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
600     }
601
602     // Next, deliver packets at random intervals
603     const uint32 mean = 50;
604     const uint32 stddev = 50;
605
606     ss_->set_delay_mean(mean);
607     ss_->set_delay_stddev(stddev);
608     ss_->UpdateDelayDistribution();
609
610     for (char i = 0; i < cNumPackets; ++i) {
611       buffer[0] = 'A' + i;
612       EXPECT_EQ(1, a->Send(buffer, 1));
613     }
614
615     ss_->ProcessMessagesUntilIdle();
616
617     for (char i = 0; i < cNumPackets; ++i) {
618       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
619       EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
620     }
621   }
622
623   void BandwidthTest(const SocketAddress& initial_addr) {
624     AsyncSocket* send_socket =
625         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
626     AsyncSocket* recv_socket =
627         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
628     ASSERT_EQ(0, send_socket->Bind(initial_addr));
629     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
630     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
631     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
632     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
633
634     uint32 bandwidth = 64 * 1024;
635     ss_->set_bandwidth(bandwidth);
636
637     Thread* pthMain = Thread::Current();
638     Sender sender(pthMain, send_socket, 80 * 1024);
639     Receiver receiver(pthMain, recv_socket, bandwidth);
640
641     pthMain->ProcessMessages(5000);
642     sender.done = true;
643     pthMain->ProcessMessages(5000);
644
645     ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
646     ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
647
648     ss_->set_bandwidth(0);
649   }
650
651   void DelayTest(const SocketAddress& initial_addr) {
652     time_t seed = ::time(NULL);
653     LOG(LS_VERBOSE) << "seed = " << seed;
654     srand(static_cast<unsigned int>(seed));
655
656     const uint32 mean = 2000;
657     const uint32 stddev = 500;
658
659     ss_->set_delay_mean(mean);
660     ss_->set_delay_stddev(stddev);
661     ss_->UpdateDelayDistribution();
662
663     AsyncSocket* send_socket =
664         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
665     AsyncSocket* recv_socket =
666         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
667     ASSERT_EQ(0, send_socket->Bind(initial_addr));
668     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
669     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
670     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
671     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
672
673     Thread* pthMain = Thread::Current();
674     // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
675     // 1000 packets, which is necessary to get a good distribution.
676     Sender sender(pthMain, send_socket, 100 * 2 * 1024);
677     Receiver receiver(pthMain, recv_socket, 0);
678
679     pthMain->ProcessMessages(10000);
680     sender.done = receiver.done = true;
681     ss_->ProcessMessagesUntilIdle();
682
683     const double sample_mean = receiver.sum / receiver.samples;
684     double num =
685         receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
686     double den = receiver.samples * (receiver.samples - 1);
687     const double sample_stddev = std::sqrt(num / den);
688     LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
689
690     EXPECT_LE(500u, receiver.samples);
691     // We initially used a 0.1 fudge factor, but on the build machine, we
692     // have seen the value differ by as much as 0.13.
693     EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
694     EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
695
696     ss_->set_delay_mean(0);
697     ss_->set_delay_stddev(0);
698     ss_->UpdateDelayDistribution();
699   }
700
701   // Test cross-family communication between a client bound to client_addr and a
702   // server bound to server_addr. shouldSucceed indicates if communication is
703   // expected to work or not.
704   void CrossFamilyConnectionTest(const SocketAddress& client_addr,
705                                  const SocketAddress& server_addr,
706                                  bool shouldSucceed) {
707     testing::StreamSink sink;
708     SocketAddress accept_address;
709     const SocketAddress kEmptyAddr;
710
711     // Client gets a IPv4 address
712     AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
713                                                  SOCK_STREAM);
714     sink.Monitor(client);
715     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
716     EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
717     client->Bind(client_addr);
718
719     // Server gets a non-mapped non-any IPv6 address.
720     // IPv4 sockets should not be able to connect to this.
721     AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
722                                                  SOCK_STREAM);
723     sink.Monitor(server);
724     server->Bind(server_addr);
725     server->Listen(5);
726
727     if (shouldSucceed) {
728       EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
729       ss_->ProcessMessagesUntilIdle();
730       EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
731       Socket* accepted = server->Accept(&accept_address);
732       EXPECT_TRUE(NULL != accepted);
733       EXPECT_NE(kEmptyAddr, accept_address);
734       ss_->ProcessMessagesUntilIdle();
735       EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
736       EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
737     } else {
738       // Check that the connection failed.
739       EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
740       ss_->ProcessMessagesUntilIdle();
741
742       EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
743       EXPECT_TRUE(NULL == server->Accept(&accept_address));
744       EXPECT_EQ(accept_address, kEmptyAddr);
745       EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
746       EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
747       EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
748     }
749   }
750
751   // Test cross-family datagram sending between a client bound to client_addr
752   // and a server bound to server_addr. shouldSucceed indicates if sending is
753   // expected to succed or not.
754   void CrossFamilyDatagramTest(const SocketAddress& client_addr,
755                                const SocketAddress& server_addr,
756                                bool shouldSucceed) {
757     AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
758     socket->Bind(server_addr);
759     SocketAddress bound_server_addr = socket->GetLocalAddress();
760     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
761
762     AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
763     socket2->Bind(client_addr);
764     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
765     SocketAddress client2_addr;
766
767     if (shouldSucceed) {
768       EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
769       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
770       SocketAddress client1_addr;
771       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
772       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
773       EXPECT_EQ(client1_addr, bound_server_addr);
774     } else {
775       EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
776       EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
777     }
778   }
779
780  protected:
781   virtual void SetUp() {
782     Thread::Current()->set_socketserver(ss_);
783   }
784   virtual void TearDown() {
785     Thread::Current()->set_socketserver(NULL);
786   }
787
788   VirtualSocketServer* ss_;
789   const SocketAddress kIPv4AnyAddress;
790   const SocketAddress kIPv6AnyAddress;
791 };
792
793 TEST_F(VirtualSocketServerTest, basic_v4) {
794   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
795   BasicTest(ipv4_test_addr);
796 }
797
798 TEST_F(VirtualSocketServerTest, basic_v6) {
799   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
800   BasicTest(ipv6_test_addr);
801 }
802
803 TEST_F(VirtualSocketServerTest, connect_v4) {
804   ConnectTest(kIPv4AnyAddress);
805 }
806
807 TEST_F(VirtualSocketServerTest, connect_v6) {
808   ConnectTest(kIPv6AnyAddress);
809 }
810
811 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
812   ConnectToNonListenerTest(kIPv4AnyAddress);
813 }
814
815 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
816   ConnectToNonListenerTest(kIPv6AnyAddress);
817 }
818
819 TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
820   CloseDuringConnectTest(kIPv4AnyAddress);
821 }
822
823 TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
824   CloseDuringConnectTest(kIPv6AnyAddress);
825 }
826
827 TEST_F(VirtualSocketServerTest, close_v4) {
828   CloseTest(kIPv4AnyAddress);
829 }
830
831 TEST_F(VirtualSocketServerTest, close_v6) {
832   CloseTest(kIPv6AnyAddress);
833 }
834
835 TEST_F(VirtualSocketServerTest, tcp_send_v4) {
836   TcpSendTest(kIPv4AnyAddress);
837 }
838
839 TEST_F(VirtualSocketServerTest, tcp_send_v6) {
840   TcpSendTest(kIPv6AnyAddress);
841 }
842
843 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
844   TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
845 }
846
847 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
848   TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
849 }
850
851 TEST_F(VirtualSocketServerTest, bandwidth_v4) {
852   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
853   BandwidthTest(ipv4_test_addr);
854 }
855
856 TEST_F(VirtualSocketServerTest, bandwidth_v6) {
857   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
858   BandwidthTest(ipv6_test_addr);
859 }
860
861 TEST_F(VirtualSocketServerTest, delay_v4) {
862   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
863   DelayTest(ipv4_test_addr);
864 }
865
866 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
867 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) {
868   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
869   DelayTest(ipv6_test_addr);
870 }
871
872 // Works, receiving socket sees 127.0.0.2.
873 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
874   CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
875                             SocketAddress("0.0.0.0", 5000),
876                             true);
877 }
878
879 // Fails.
880 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
881   CrossFamilyConnectionTest(SocketAddress("::2", 0),
882                             SocketAddress("0.0.0.0", 5000),
883                             false);
884 }
885
886 // Fails.
887 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
888   CrossFamilyConnectionTest(SocketAddress("::2", 0),
889                             SocketAddress("::ffff:127.0.0.1", 5000),
890                             false);
891 }
892
893 // Works. receiving socket sees ::ffff:127.0.0.2.
894 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
895   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
896                             SocketAddress("::", 5000),
897                             true);
898 }
899
900 // Fails.
901 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
902   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
903                             SocketAddress("::1", 5000),
904                             false);
905 }
906
907 // Works. Receiving socket sees ::ffff:127.0.0.1.
908 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
909   CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
910                             SocketAddress("::ffff:127.0.0.2", 5000),
911                             true);
912 }
913
914 // Works, receiving socket sees a result from GetNextIP.
915 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
916   CrossFamilyConnectionTest(SocketAddress("::", 0),
917                             SocketAddress("0.0.0.0", 5000),
918                             true);
919 }
920
921 // Works, receiving socket sees whatever GetNextIP gave the client.
922 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
923   CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
924                             SocketAddress("::", 5000),
925                             true);
926 }
927
928 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
929   CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
930                           SocketAddress("::", 5000),
931                           true);
932 }
933
934 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
935   CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
936                           SocketAddress("0.0.0.0", 5000),
937                           true);
938 }
939
940 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
941   CrossFamilyDatagramTest(SocketAddress("::2", 0),
942                           SocketAddress("0.0.0.0", 5000),
943                           false);
944 }
945
946 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
947   CrossFamilyDatagramTest(SocketAddress("::2", 0),
948                           SocketAddress("::ffff:127.0.0.1", 5000),
949                           false);
950 }
951
952 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
953   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
954                           SocketAddress("::", 5000),
955                           true);
956 }
957
958 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
959   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
960                           SocketAddress("::1", 5000),
961                           false);
962 }
963
964 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
965   CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
966                           SocketAddress("::ffff:127.0.0.2", 5000),
967                           true);
968 }
969
970 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
971   CrossFamilyDatagramTest(SocketAddress("::", 0),
972                           SocketAddress("0.0.0.0", 5000),
973                           true);
974 }
975
976 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
977   const uint32 kTestMean[] = { 10, 100, 333, 1000 };
978   const double kTestDev[] = { 0.25, 0.1, 0.01 };
979   // TODO: The current code only works for 1000 data points or more.
980   const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
981   for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
982     for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
983       for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
984         ASSERT_LT(0u, kTestSamples[sidx]);
985         const uint32 kStdDev =
986             static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
987         VirtualSocketServer::Function* f =
988             VirtualSocketServer::CreateDistribution(kTestMean[midx],
989                                                     kStdDev,
990                                                     kTestSamples[sidx]);
991         ASSERT_TRUE(NULL != f);
992         ASSERT_EQ(kTestSamples[sidx], f->size());
993         double sum = 0;
994         for (uint32 i = 0; i < f->size(); ++i) {
995           sum += (*f)[i].second;
996         }
997         const double mean = sum / f->size();
998         double sum_sq_dev = 0;
999         for (uint32 i = 0; i < f->size(); ++i) {
1000           double dev = (*f)[i].second - mean;
1001           sum_sq_dev += dev * dev;
1002         }
1003         const double stddev = std::sqrt(sum_sq_dev / f->size());
1004         EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
1005           << "M=" << kTestMean[midx]
1006           << " SD=" << kStdDev
1007           << " N=" << kTestSamples[sidx];
1008         EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
1009           << "M=" << kTestMean[midx]
1010           << " SD=" << kStdDev
1011           << " N=" << kTestSamples[sidx];
1012         delete f;
1013       }
1014     }
1015   }
1016 }