Upstream version 5.34.92.0
[platform/framework/web/crosswalk.git] / src / third_party / libjingle / source / talk / base / virtualsocket_unittest.cc
1 /*
2  * libjingle
3  * Copyright 2006, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #include <time.h>
29 #ifdef POSIX
30 #include <netinet/in.h>
31 #endif
32 #include <cmath>
33
34 #include "talk/base/logging.h"
35 #include "talk/base/gunit.h"
36 #include "talk/base/testclient.h"
37 #include "talk/base/testutils.h"
38 #include "talk/base/thread.h"
39 #include "talk/base/timeutils.h"
40 #include "talk/base/virtualsocketserver.h"
41
42 using namespace talk_base;
43
44 // Sends at a constant rate but with random packet sizes.
45 struct Sender : public MessageHandler {
46   Sender(Thread* th, AsyncSocket* s, uint32 rt)
47       : thread(th), socket(new AsyncUDPSocket(s)),
48         done(false), rate(rt), count(0) {
49     last_send = talk_base::Time();
50     thread->PostDelayed(NextDelay(), this, 1);
51   }
52
53   uint32 NextDelay() {
54     uint32 size = (rand() % 4096) + 1;
55     return 1000 * size / rate;
56   }
57
58   void OnMessage(Message* pmsg) {
59     ASSERT_EQ(1u, pmsg->message_id);
60
61     if (done)
62       return;
63
64     uint32 cur_time = talk_base::Time();
65     uint32 delay = cur_time - last_send;
66     uint32 size = rate * delay / 1000;
67     size = std::min<uint32>(size, 4096);
68     size = std::max<uint32>(size, sizeof(uint32));
69
70     count += size;
71     memcpy(dummy, &cur_time, sizeof(cur_time));
72     socket->Send(dummy, size, DSCP_NO_CHANGE);
73
74     last_send = cur_time;
75     thread->PostDelayed(NextDelay(), this, 1);
76   }
77
78   Thread* thread;
79   scoped_ptr<AsyncUDPSocket> socket;
80   bool done;
81   uint32 rate;  // bytes per second
82   uint32 count;
83   uint32 last_send;
84   char dummy[4096];
85 };
86
87 struct Receiver : public MessageHandler, public sigslot::has_slots<> {
88   Receiver(Thread* th, AsyncSocket* s, uint32 bw)
89       : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
90         count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
91     socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
92     thread->PostDelayed(1000, this, 1);
93   }
94
95   ~Receiver() {
96     thread->Clear(this);
97   }
98
99   void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
100                     const SocketAddress& remote_addr,
101                     const PacketTime& packet_time) {
102     ASSERT_EQ(socket.get(), s);
103     ASSERT_GE(size, 4U);
104
105     count += size;
106     sec_count += size;
107
108     uint32 send_time = *reinterpret_cast<const uint32*>(data);
109     uint32 recv_time = talk_base::Time();
110     uint32 delay = recv_time - send_time;
111     sum += delay;
112     sum_sq += delay * delay;
113     samples += 1;
114   }
115
116   void OnMessage(Message* pmsg) {
117     ASSERT_EQ(1u, pmsg->message_id);
118
119     if (done)
120       return;
121
122     // It is always possible for us to receive more than expected because
123     // packets can be further delayed in delivery.
124     if (bandwidth > 0)
125       ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
126     sec_count = 0;
127     thread->PostDelayed(1000, this, 1);
128   }
129
130   Thread* thread;
131   scoped_ptr<AsyncUDPSocket> socket;
132   uint32 bandwidth;
133   bool done;
134   size_t count;
135   size_t sec_count;
136   double sum;
137   double sum_sq;
138   uint32 samples;
139 };
140
141 class VirtualSocketServerTest : public testing::Test {
142  public:
143   VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
144                               kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
145                               kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
146   }
147
148   void CheckAddressIncrementalization(const SocketAddress& post,
149                                       const SocketAddress& pre) {
150     EXPECT_EQ(post.port(), pre.port() + 1);
151     IPAddress post_ip = post.ipaddr();
152     IPAddress pre_ip = pre.ipaddr();
153     EXPECT_EQ(pre_ip.family(), post_ip.family());
154     if (post_ip.family() == AF_INET) {
155       in_addr pre_ipv4 = pre_ip.ipv4_address();
156       in_addr post_ipv4 = post_ip.ipv4_address();
157       int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
158       EXPECT_EQ(1, difference);
159     } else if (post_ip.family() == AF_INET6) {
160       in6_addr post_ip6 = post_ip.ipv6_address();
161       in6_addr pre_ip6 = pre_ip.ipv6_address();
162       uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
163       uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
164       EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
165     }
166   }
167
168   void BasicTest(const SocketAddress& initial_addr) {
169     AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
170                                                  SOCK_DGRAM);
171     socket->Bind(initial_addr);
172     SocketAddress server_addr = socket->GetLocalAddress();
173     // Make sure VSS didn't switch families on us.
174     EXPECT_EQ(server_addr.family(), initial_addr.family());
175
176     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
177     AsyncSocket* socket2 =
178         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
179     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
180
181     SocketAddress client2_addr;
182     EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
183     EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
184
185     SocketAddress client1_addr;
186     EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
187     EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
188     EXPECT_EQ(client1_addr, server_addr);
189
190     SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
191     for (int i = 0; i < 10; i++) {
192       client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
193
194       SocketAddress next_client2_addr;
195       EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
196       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
197       CheckAddressIncrementalization(next_client2_addr, client2_addr);
198       // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
199
200       SocketAddress server_addr2;
201       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
202       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
203       EXPECT_EQ(server_addr2, server_addr);
204
205       client2_addr = next_client2_addr;
206     }
207   }
208
209   // initial_addr should be made from either INADDR_ANY or in6addr_any.
210   void ConnectTest(const SocketAddress& initial_addr) {
211     testing::StreamSink sink;
212     SocketAddress accept_addr;
213     const SocketAddress kEmptyAddr =
214         EmptySocketAddressWithFamily(initial_addr.family());
215
216     // Create client
217     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
218                                                  SOCK_STREAM);
219     sink.Monitor(client);
220     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
221     EXPECT_TRUE(client->GetLocalAddress().IsNil());
222
223     // Create server
224     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
225                                                  SOCK_STREAM);
226     sink.Monitor(server);
227     EXPECT_NE(0, server->Listen(5));  // Bind required
228     EXPECT_EQ(0, server->Bind(initial_addr));
229     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
230     EXPECT_EQ(0, server->Listen(5));
231     EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
232
233     // No pending server connections
234     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
235     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
236     EXPECT_EQ(AF_UNSPEC, accept_addr.family());
237
238     // Attempt connect to listening socket
239     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
240     EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
241     EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
242     EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
243
244     // Client is connecting
245     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
246     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
247     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
248
249     ss_->ProcessMessagesUntilIdle();
250
251     // Client still connecting
252     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
253     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
254     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
255
256     // Server has pending connection
257     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
258     Socket* accepted = server->Accept(&accept_addr);
259     EXPECT_TRUE(NULL != accepted);
260     EXPECT_NE(accept_addr, kEmptyAddr);
261     EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
262
263     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
264     EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
265     EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
266
267     ss_->ProcessMessagesUntilIdle();
268
269     // Client has connected
270     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
271     EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
272     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
273     EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
274     EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
275   }
276
277   void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
278     testing::StreamSink sink;
279     SocketAddress accept_addr;
280     const SocketAddress nil_addr;
281     const SocketAddress empty_addr =
282         EmptySocketAddressWithFamily(initial_addr.family());
283
284     // Create client
285     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
286                                                  SOCK_STREAM);
287     sink.Monitor(client);
288
289     // Create server
290     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
291                                                  SOCK_STREAM);
292     sink.Monitor(server);
293     EXPECT_EQ(0, server->Bind(initial_addr));
294     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
295     // Attempt connect to non-listening socket
296     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
297
298     ss_->ProcessMessagesUntilIdle();
299
300     // No pending server connections
301     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
302     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
303     EXPECT_EQ(accept_addr, nil_addr);
304
305     // Connection failed
306     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
307     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
308     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
309     EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
310   }
311
312   void CloseDuringConnectTest(const SocketAddress& initial_addr) {
313     testing::StreamSink sink;
314     SocketAddress accept_addr;
315     const SocketAddress empty_addr =
316         EmptySocketAddressWithFamily(initial_addr.family());
317
318     // Create client and server
319     scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
320                                                           SOCK_STREAM));
321     sink.Monitor(client.get());
322     scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
323                                                           SOCK_STREAM));
324     sink.Monitor(server.get());
325
326     // Initiate connect
327     EXPECT_EQ(0, server->Bind(initial_addr));
328     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
329
330     EXPECT_EQ(0, server->Listen(5));
331     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
332
333     // Server close before socket enters accept queue
334     EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
335     server->Close();
336
337     ss_->ProcessMessagesUntilIdle();
338
339     // Result: connection failed
340     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
341     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
342
343     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
344     sink.Monitor(server.get());
345
346     // Initiate connect
347     EXPECT_EQ(0, server->Bind(initial_addr));
348     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
349
350     EXPECT_EQ(0, server->Listen(5));
351     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
352
353     ss_->ProcessMessagesUntilIdle();
354
355     // Server close while socket is in accept queue
356     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
357     server->Close();
358
359     ss_->ProcessMessagesUntilIdle();
360
361     // Result: connection failed
362     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
363     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
364
365     // New server
366     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
367     sink.Monitor(server.get());
368
369     // Initiate connect
370     EXPECT_EQ(0, server->Bind(initial_addr));
371     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
372
373     EXPECT_EQ(0, server->Listen(5));
374     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
375
376     ss_->ProcessMessagesUntilIdle();
377
378     // Server accepts connection
379     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
380     scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
381     ASSERT_TRUE(NULL != accepted.get());
382     sink.Monitor(accepted.get());
383
384     // Client closes before connection complets
385     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
386
387     // Connected message has not been processed yet.
388     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
389     client->Close();
390
391     ss_->ProcessMessagesUntilIdle();
392
393     // Result: accepted socket closes
394     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
395     EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
396     EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
397   }
398
399   void CloseTest(const SocketAddress& initial_addr) {
400     testing::StreamSink sink;
401     const SocketAddress kEmptyAddr;
402
403     // Create clients
404     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
405     sink.Monitor(a);
406     a->Bind(initial_addr);
407     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
408
409
410     scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
411                                                      SOCK_STREAM));
412     sink.Monitor(b.get());
413     b->Bind(initial_addr);
414     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
415
416     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
417     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
418
419     ss_->ProcessMessagesUntilIdle();
420
421     EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
422     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
423     EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
424
425     EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
426     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
427     EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
428
429     EXPECT_EQ(1, a->Send("a", 1));
430     b->Close();
431     EXPECT_EQ(1, a->Send("b", 1));
432
433     ss_->ProcessMessagesUntilIdle();
434
435     char buffer[10];
436     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
437     EXPECT_EQ(-1, b->Recv(buffer, 10));
438
439     EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
440     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
441     EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
442
443     // No signal for Closer
444     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
445     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
446     EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
447   }
448
449   void TcpSendTest(const SocketAddress& initial_addr) {
450     testing::StreamSink sink;
451     const SocketAddress kEmptyAddr;
452
453     // Connect two sockets
454     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
455     sink.Monitor(a);
456     a->Bind(initial_addr);
457     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
458
459     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
460     sink.Monitor(b);
461     b->Bind(initial_addr);
462     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
463
464     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
465     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
466
467     ss_->ProcessMessagesUntilIdle();
468
469     const size_t kBufferSize = 2000;
470     ss_->set_send_buffer_capacity(kBufferSize);
471     ss_->set_recv_buffer_capacity(kBufferSize);
472
473     const size_t kDataSize = 5000;
474     char send_buffer[kDataSize], recv_buffer[kDataSize];
475     for (size_t i = 0; i < kDataSize; ++i)
476       send_buffer[i] = static_cast<char>(i % 256);
477     memset(recv_buffer, 0, sizeof(recv_buffer));
478     size_t send_pos = 0, recv_pos = 0;
479
480     // Can't send more than send buffer in one write
481     int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
482     EXPECT_EQ(static_cast<int>(kBufferSize), result);
483     send_pos += result;
484
485     ss_->ProcessMessagesUntilIdle();
486     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
487     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
488
489     // Receive buffer is already filled, fill send buffer again
490     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
491     EXPECT_EQ(static_cast<int>(kBufferSize), result);
492     send_pos += result;
493
494     ss_->ProcessMessagesUntilIdle();
495     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
496     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
497
498     // No more room in send or receive buffer
499     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
500     EXPECT_EQ(-1, result);
501     EXPECT_TRUE(a->IsBlocking());
502
503     // Read a subset of the data
504     result = b->Recv(recv_buffer + recv_pos, 500);
505     EXPECT_EQ(500, result);
506     recv_pos += result;
507
508     ss_->ProcessMessagesUntilIdle();
509     EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
510     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
511
512     // Room for more on the sending side
513     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
514     EXPECT_EQ(500, result);
515     send_pos += result;
516
517     // Empty the recv buffer
518     while (true) {
519       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
520       if (result < 0) {
521         EXPECT_EQ(-1, result);
522         EXPECT_TRUE(b->IsBlocking());
523         break;
524       }
525       recv_pos += result;
526     }
527
528     ss_->ProcessMessagesUntilIdle();
529     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
530
531     // Continue to empty the recv buffer
532     while (true) {
533       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
534       if (result < 0) {
535         EXPECT_EQ(-1, result);
536         EXPECT_TRUE(b->IsBlocking());
537         break;
538       }
539       recv_pos += result;
540     }
541
542     // Send last of the data
543     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
544     EXPECT_EQ(500, result);
545     send_pos += result;
546
547     ss_->ProcessMessagesUntilIdle();
548     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
549
550     // Receive the last of the data
551     while (true) {
552       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
553       if (result < 0) {
554         EXPECT_EQ(-1, result);
555         EXPECT_TRUE(b->IsBlocking());
556         break;
557       }
558       recv_pos += result;
559     }
560
561     ss_->ProcessMessagesUntilIdle();
562     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
563
564     // The received data matches the sent data
565     EXPECT_EQ(kDataSize, send_pos);
566     EXPECT_EQ(kDataSize, recv_pos);
567     EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
568   }
569
570   void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
571     const SocketAddress kEmptyAddr;
572
573     // Connect two sockets
574     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
575                                             SOCK_STREAM);
576     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
577                                             SOCK_STREAM);
578     a->Bind(initial_addr);
579     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
580
581     b->Bind(initial_addr);
582     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
583
584     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
585     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
586     ss_->ProcessMessagesUntilIdle();
587
588     // First, deliver all packets in 0 ms.
589     char buffer[2] = { 0, 0 };
590     const char cNumPackets = 10;
591     for (char i = 0; i < cNumPackets; ++i) {
592       buffer[0] = '0' + i;
593       EXPECT_EQ(1, a->Send(buffer, 1));
594     }
595
596     ss_->ProcessMessagesUntilIdle();
597
598     for (char i = 0; i < cNumPackets; ++i) {
599       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
600       EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
601     }
602
603     // Next, deliver packets at random intervals
604     const uint32 mean = 50;
605     const uint32 stddev = 50;
606
607     ss_->set_delay_mean(mean);
608     ss_->set_delay_stddev(stddev);
609     ss_->UpdateDelayDistribution();
610
611     for (char i = 0; i < cNumPackets; ++i) {
612       buffer[0] = 'A' + i;
613       EXPECT_EQ(1, a->Send(buffer, 1));
614     }
615
616     ss_->ProcessMessagesUntilIdle();
617
618     for (char i = 0; i < cNumPackets; ++i) {
619       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
620       EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
621     }
622   }
623
624   void BandwidthTest(const SocketAddress& initial_addr) {
625     AsyncSocket* send_socket =
626         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
627     AsyncSocket* recv_socket =
628         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
629     ASSERT_EQ(0, send_socket->Bind(initial_addr));
630     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
631     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
632     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
633     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
634
635     uint32 bandwidth = 64 * 1024;
636     ss_->set_bandwidth(bandwidth);
637
638     Thread* pthMain = Thread::Current();
639     Sender sender(pthMain, send_socket, 80 * 1024);
640     Receiver receiver(pthMain, recv_socket, bandwidth);
641
642     pthMain->ProcessMessages(5000);
643     sender.done = true;
644     pthMain->ProcessMessages(5000);
645
646     ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
647     ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
648
649     ss_->set_bandwidth(0);
650   }
651
652   void DelayTest(const SocketAddress& initial_addr) {
653     time_t seed = ::time(NULL);
654     LOG(LS_VERBOSE) << "seed = " << seed;
655     srand(static_cast<unsigned int>(seed));
656
657     const uint32 mean = 2000;
658     const uint32 stddev = 500;
659
660     ss_->set_delay_mean(mean);
661     ss_->set_delay_stddev(stddev);
662     ss_->UpdateDelayDistribution();
663
664     AsyncSocket* send_socket =
665         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
666     AsyncSocket* recv_socket =
667         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
668     ASSERT_EQ(0, send_socket->Bind(initial_addr));
669     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
670     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
671     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
672     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
673
674     Thread* pthMain = Thread::Current();
675     // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
676     // 1000 packets, which is necessary to get a good distribution.
677     Sender sender(pthMain, send_socket, 100 * 2 * 1024);
678     Receiver receiver(pthMain, recv_socket, 0);
679
680     pthMain->ProcessMessages(10000);
681     sender.done = receiver.done = true;
682     ss_->ProcessMessagesUntilIdle();
683
684     const double sample_mean = receiver.sum / receiver.samples;
685     double num =
686         receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
687     double den = receiver.samples * (receiver.samples - 1);
688     const double sample_stddev = std::sqrt(num / den);
689     LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
690
691     EXPECT_LE(500u, receiver.samples);
692     // We initially used a 0.1 fudge factor, but on the build machine, we
693     // have seen the value differ by as much as 0.13.
694     EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
695     EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
696
697     ss_->set_delay_mean(0);
698     ss_->set_delay_stddev(0);
699     ss_->UpdateDelayDistribution();
700   }
701
702   // Test cross-family communication between a client bound to client_addr and a
703   // server bound to server_addr. shouldSucceed indicates if communication is
704   // expected to work or not.
705   void CrossFamilyConnectionTest(const SocketAddress& client_addr,
706                                  const SocketAddress& server_addr,
707                                  bool shouldSucceed) {
708     testing::StreamSink sink;
709     SocketAddress accept_address;
710     const SocketAddress kEmptyAddr;
711
712     // Client gets a IPv4 address
713     AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
714                                                  SOCK_STREAM);
715     sink.Monitor(client);
716     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
717     EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
718     client->Bind(client_addr);
719
720     // Server gets a non-mapped non-any IPv6 address.
721     // IPv4 sockets should not be able to connect to this.
722     AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
723                                                  SOCK_STREAM);
724     sink.Monitor(server);
725     server->Bind(server_addr);
726     server->Listen(5);
727
728     if (shouldSucceed) {
729       EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
730       ss_->ProcessMessagesUntilIdle();
731       EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
732       Socket* accepted = server->Accept(&accept_address);
733       EXPECT_TRUE(NULL != accepted);
734       EXPECT_NE(kEmptyAddr, accept_address);
735       ss_->ProcessMessagesUntilIdle();
736       EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
737       EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
738     } else {
739       // Check that the connection failed.
740       EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
741       ss_->ProcessMessagesUntilIdle();
742
743       EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
744       EXPECT_TRUE(NULL == server->Accept(&accept_address));
745       EXPECT_EQ(accept_address, kEmptyAddr);
746       EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
747       EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
748       EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
749     }
750   }
751
752   // Test cross-family datagram sending between a client bound to client_addr
753   // and a server bound to server_addr. shouldSucceed indicates if sending is
754   // expected to succed or not.
755   void CrossFamilyDatagramTest(const SocketAddress& client_addr,
756                                const SocketAddress& server_addr,
757                                bool shouldSucceed) {
758     AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
759     socket->Bind(server_addr);
760     SocketAddress bound_server_addr = socket->GetLocalAddress();
761     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
762
763     AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
764     socket2->Bind(client_addr);
765     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
766     SocketAddress client2_addr;
767
768     if (shouldSucceed) {
769       EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
770       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
771       SocketAddress client1_addr;
772       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
773       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
774       EXPECT_EQ(client1_addr, bound_server_addr);
775     } else {
776       EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
777       EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
778     }
779   }
780
781  protected:
782   virtual void SetUp() {
783     Thread::Current()->set_socketserver(ss_);
784   }
785   virtual void TearDown() {
786     Thread::Current()->set_socketserver(NULL);
787   }
788
789   VirtualSocketServer* ss_;
790   const SocketAddress kIPv4AnyAddress;
791   const SocketAddress kIPv6AnyAddress;
792 };
793
794 TEST_F(VirtualSocketServerTest, basic_v4) {
795   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
796   BasicTest(ipv4_test_addr);
797 }
798
799 TEST_F(VirtualSocketServerTest, basic_v6) {
800   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
801   BasicTest(ipv6_test_addr);
802 }
803
804 TEST_F(VirtualSocketServerTest, connect_v4) {
805   ConnectTest(kIPv4AnyAddress);
806 }
807
808 TEST_F(VirtualSocketServerTest, connect_v6) {
809   ConnectTest(kIPv6AnyAddress);
810 }
811
812 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
813   ConnectToNonListenerTest(kIPv4AnyAddress);
814 }
815
816 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
817   ConnectToNonListenerTest(kIPv6AnyAddress);
818 }
819
820 TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
821   CloseDuringConnectTest(kIPv4AnyAddress);
822 }
823
824 TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
825   CloseDuringConnectTest(kIPv6AnyAddress);
826 }
827
828 TEST_F(VirtualSocketServerTest, close_v4) {
829   CloseTest(kIPv4AnyAddress);
830 }
831
832 TEST_F(VirtualSocketServerTest, close_v6) {
833   CloseTest(kIPv6AnyAddress);
834 }
835
836 TEST_F(VirtualSocketServerTest, tcp_send_v4) {
837   TcpSendTest(kIPv4AnyAddress);
838 }
839
840 TEST_F(VirtualSocketServerTest, tcp_send_v6) {
841   TcpSendTest(kIPv6AnyAddress);
842 }
843
844 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
845   TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
846 }
847
848 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
849   TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
850 }
851
852 TEST_F(VirtualSocketServerTest, bandwidth_v4) {
853   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
854   BandwidthTest(ipv4_test_addr);
855 }
856
857 TEST_F(VirtualSocketServerTest, bandwidth_v6) {
858   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
859   BandwidthTest(ipv6_test_addr);
860 }
861
862 TEST_F(VirtualSocketServerTest, delay_v4) {
863   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
864   DelayTest(ipv4_test_addr);
865 }
866
867 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
868 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) {
869   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
870   DelayTest(ipv6_test_addr);
871 }
872
873 // Works, receiving socket sees 127.0.0.2.
874 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
875   CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
876                             SocketAddress("0.0.0.0", 5000),
877                             true);
878 }
879
880 // Fails.
881 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
882   CrossFamilyConnectionTest(SocketAddress("::2", 0),
883                             SocketAddress("0.0.0.0", 5000),
884                             false);
885 }
886
887 // Fails.
888 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
889   CrossFamilyConnectionTest(SocketAddress("::2", 0),
890                             SocketAddress("::ffff:127.0.0.1", 5000),
891                             false);
892 }
893
894 // Works. receiving socket sees ::ffff:127.0.0.2.
895 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
896   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
897                             SocketAddress("::", 5000),
898                             true);
899 }
900
901 // Fails.
902 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
903   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
904                             SocketAddress("::1", 5000),
905                             false);
906 }
907
908 // Works. Receiving socket sees ::ffff:127.0.0.1.
909 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
910   CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
911                             SocketAddress("::ffff:127.0.0.2", 5000),
912                             true);
913 }
914
915 // Works, receiving socket sees a result from GetNextIP.
916 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
917   CrossFamilyConnectionTest(SocketAddress("::", 0),
918                             SocketAddress("0.0.0.0", 5000),
919                             true);
920 }
921
922 // Works, receiving socket sees whatever GetNextIP gave the client.
923 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
924   CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
925                             SocketAddress("::", 5000),
926                             true);
927 }
928
929 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
930   CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
931                           SocketAddress("::", 5000),
932                           true);
933 }
934
935 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
936   CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
937                           SocketAddress("0.0.0.0", 5000),
938                           true);
939 }
940
941 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
942   CrossFamilyDatagramTest(SocketAddress("::2", 0),
943                           SocketAddress("0.0.0.0", 5000),
944                           false);
945 }
946
947 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
948   CrossFamilyDatagramTest(SocketAddress("::2", 0),
949                           SocketAddress("::ffff:127.0.0.1", 5000),
950                           false);
951 }
952
953 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
954   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
955                           SocketAddress("::", 5000),
956                           true);
957 }
958
959 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
960   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
961                           SocketAddress("::1", 5000),
962                           false);
963 }
964
965 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
966   CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
967                           SocketAddress("::ffff:127.0.0.2", 5000),
968                           true);
969 }
970
971 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
972   CrossFamilyDatagramTest(SocketAddress("::", 0),
973                           SocketAddress("0.0.0.0", 5000),
974                           true);
975 }
976
977 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
978   const uint32 kTestMean[] = { 10, 100, 333, 1000 };
979   const double kTestDev[] = { 0.25, 0.1, 0.01 };
980   // TODO: The current code only works for 1000 data points or more.
981   const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
982   for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
983     for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
984       for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
985         ASSERT_LT(0u, kTestSamples[sidx]);
986         const uint32 kStdDev =
987             static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
988         VirtualSocketServer::Function* f =
989             VirtualSocketServer::CreateDistribution(kTestMean[midx],
990                                                     kStdDev,
991                                                     kTestSamples[sidx]);
992         ASSERT_TRUE(NULL != f);
993         ASSERT_EQ(kTestSamples[sidx], f->size());
994         double sum = 0;
995         for (uint32 i = 0; i < f->size(); ++i) {
996           sum += (*f)[i].second;
997         }
998         const double mean = sum / f->size();
999         double sum_sq_dev = 0;
1000         for (uint32 i = 0; i < f->size(); ++i) {
1001           double dev = (*f)[i].second - mean;
1002           sum_sq_dev += dev * dev;
1003         }
1004         const double stddev = std::sqrt(sum_sq_dev / f->size());
1005         EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
1006           << "M=" << kTestMean[midx]
1007           << " SD=" << kStdDev
1008           << " N=" << kTestSamples[sidx];
1009         EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
1010           << "M=" << kTestMean[midx]
1011           << " SD=" << kStdDev
1012           << " N=" << kTestSamples[sidx];
1013         delete f;
1014       }
1015     }
1016   }
1017 }