Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / third_party / libjingle / source / talk / base / virtualsocket_unittest.cc
1 /*
2  * libjingle
3  * Copyright 2006, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #include <time.h>
29 #ifdef POSIX
30 #include <netinet/in.h>
31 #endif
32 #include <cmath>
33
34 #include "talk/base/logging.h"
35 #include "talk/base/gunit.h"
36 #include "talk/base/testclient.h"
37 #include "talk/base/testutils.h"
38 #include "talk/base/thread.h"
39 #include "talk/base/timeutils.h"
40 #include "talk/base/virtualsocketserver.h"
41
42 using namespace talk_base;
43
44 // Sends at a constant rate but with random packet sizes.
45 struct Sender : public MessageHandler {
46   Sender(Thread* th, AsyncSocket* s, uint32 rt)
47       : thread(th), socket(new AsyncUDPSocket(s)),
48         done(false), rate(rt), count(0) {
49     last_send = talk_base::Time();
50     thread->PostDelayed(NextDelay(), this, 1);
51   }
52
53   uint32 NextDelay() {
54     uint32 size = (rand() % 4096) + 1;
55     return 1000 * size / rate;
56   }
57
58   void OnMessage(Message* pmsg) {
59     ASSERT_EQ(1u, pmsg->message_id);
60
61     if (done)
62       return;
63
64     uint32 cur_time = talk_base::Time();
65     uint32 delay = cur_time - last_send;
66     uint32 size = rate * delay / 1000;
67     size = std::min<uint32>(size, 4096);
68     size = std::max<uint32>(size, sizeof(uint32));
69
70     count += size;
71     memcpy(dummy, &cur_time, sizeof(cur_time));
72     socket->Send(dummy, size, options);
73
74     last_send = cur_time;
75     thread->PostDelayed(NextDelay(), this, 1);
76   }
77
78   Thread* thread;
79   scoped_ptr<AsyncUDPSocket> socket;
80   talk_base::PacketOptions options;
81   bool done;
82   uint32 rate;  // bytes per second
83   uint32 count;
84   uint32 last_send;
85   char dummy[4096];
86 };
87
88 struct Receiver : public MessageHandler, public sigslot::has_slots<> {
89   Receiver(Thread* th, AsyncSocket* s, uint32 bw)
90       : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
91         count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
92     socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
93     thread->PostDelayed(1000, this, 1);
94   }
95
96   ~Receiver() {
97     thread->Clear(this);
98   }
99
100   void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
101                     const SocketAddress& remote_addr,
102                     const PacketTime& packet_time) {
103     ASSERT_EQ(socket.get(), s);
104     ASSERT_GE(size, 4U);
105
106     count += size;
107     sec_count += size;
108
109     uint32 send_time = *reinterpret_cast<const uint32*>(data);
110     uint32 recv_time = talk_base::Time();
111     uint32 delay = recv_time - send_time;
112     sum += delay;
113     sum_sq += delay * delay;
114     samples += 1;
115   }
116
117   void OnMessage(Message* pmsg) {
118     ASSERT_EQ(1u, pmsg->message_id);
119
120     if (done)
121       return;
122
123     // It is always possible for us to receive more than expected because
124     // packets can be further delayed in delivery.
125     if (bandwidth > 0)
126       ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
127     sec_count = 0;
128     thread->PostDelayed(1000, this, 1);
129   }
130
131   Thread* thread;
132   scoped_ptr<AsyncUDPSocket> socket;
133   uint32 bandwidth;
134   bool done;
135   size_t count;
136   size_t sec_count;
137   double sum;
138   double sum_sq;
139   uint32 samples;
140 };
141
142 class VirtualSocketServerTest : public testing::Test {
143  public:
144   VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
145                               kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
146                               kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
147   }
148
149   void CheckAddressIncrementalization(const SocketAddress& post,
150                                       const SocketAddress& pre) {
151     EXPECT_EQ(post.port(), pre.port() + 1);
152     IPAddress post_ip = post.ipaddr();
153     IPAddress pre_ip = pre.ipaddr();
154     EXPECT_EQ(pre_ip.family(), post_ip.family());
155     if (post_ip.family() == AF_INET) {
156       in_addr pre_ipv4 = pre_ip.ipv4_address();
157       in_addr post_ipv4 = post_ip.ipv4_address();
158       int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
159       EXPECT_EQ(1, difference);
160     } else if (post_ip.family() == AF_INET6) {
161       in6_addr post_ip6 = post_ip.ipv6_address();
162       in6_addr pre_ip6 = pre_ip.ipv6_address();
163       uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
164       uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
165       EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
166     }
167   }
168
169   void BasicTest(const SocketAddress& initial_addr) {
170     AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
171                                                  SOCK_DGRAM);
172     socket->Bind(initial_addr);
173     SocketAddress server_addr = socket->GetLocalAddress();
174     // Make sure VSS didn't switch families on us.
175     EXPECT_EQ(server_addr.family(), initial_addr.family());
176
177     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
178     AsyncSocket* socket2 =
179         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
180     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
181
182     SocketAddress client2_addr;
183     EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
184     EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
185
186     SocketAddress client1_addr;
187     EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
188     EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
189     EXPECT_EQ(client1_addr, server_addr);
190
191     SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
192     for (int i = 0; i < 10; i++) {
193       client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
194
195       SocketAddress next_client2_addr;
196       EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
197       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
198       CheckAddressIncrementalization(next_client2_addr, client2_addr);
199       // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
200
201       SocketAddress server_addr2;
202       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
203       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
204       EXPECT_EQ(server_addr2, server_addr);
205
206       client2_addr = next_client2_addr;
207     }
208   }
209
210   // initial_addr should be made from either INADDR_ANY or in6addr_any.
211   void ConnectTest(const SocketAddress& initial_addr) {
212     testing::StreamSink sink;
213     SocketAddress accept_addr;
214     const SocketAddress kEmptyAddr =
215         EmptySocketAddressWithFamily(initial_addr.family());
216
217     // Create client
218     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
219                                                  SOCK_STREAM);
220     sink.Monitor(client);
221     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
222     EXPECT_TRUE(client->GetLocalAddress().IsNil());
223
224     // Create server
225     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
226                                                  SOCK_STREAM);
227     sink.Monitor(server);
228     EXPECT_NE(0, server->Listen(5));  // Bind required
229     EXPECT_EQ(0, server->Bind(initial_addr));
230     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
231     EXPECT_EQ(0, server->Listen(5));
232     EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
233
234     // No pending server connections
235     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
236     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
237     EXPECT_EQ(AF_UNSPEC, accept_addr.family());
238
239     // Attempt connect to listening socket
240     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
241     EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
242     EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
243     EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
244
245     // Client is connecting
246     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
247     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
248     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
249
250     ss_->ProcessMessagesUntilIdle();
251
252     // Client still connecting
253     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
254     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
255     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
256
257     // Server has pending connection
258     EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
259     Socket* accepted = server->Accept(&accept_addr);
260     EXPECT_TRUE(NULL != accepted);
261     EXPECT_NE(accept_addr, kEmptyAddr);
262     EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
263
264     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
265     EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
266     EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
267
268     ss_->ProcessMessagesUntilIdle();
269
270     // Client has connected
271     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
272     EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
273     EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
274     EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
275     EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
276   }
277
278   void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
279     testing::StreamSink sink;
280     SocketAddress accept_addr;
281     const SocketAddress nil_addr;
282     const SocketAddress empty_addr =
283         EmptySocketAddressWithFamily(initial_addr.family());
284
285     // Create client
286     AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
287                                                  SOCK_STREAM);
288     sink.Monitor(client);
289
290     // Create server
291     AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
292                                                  SOCK_STREAM);
293     sink.Monitor(server);
294     EXPECT_EQ(0, server->Bind(initial_addr));
295     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
296     // Attempt connect to non-listening socket
297     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
298
299     ss_->ProcessMessagesUntilIdle();
300
301     // No pending server connections
302     EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
303     EXPECT_TRUE(NULL == server->Accept(&accept_addr));
304     EXPECT_EQ(accept_addr, nil_addr);
305
306     // Connection failed
307     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
308     EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
309     EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
310     EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
311   }
312
313   void CloseDuringConnectTest(const SocketAddress& initial_addr) {
314     testing::StreamSink sink;
315     SocketAddress accept_addr;
316     const SocketAddress empty_addr =
317         EmptySocketAddressWithFamily(initial_addr.family());
318
319     // Create client and server
320     scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
321                                                           SOCK_STREAM));
322     sink.Monitor(client.get());
323     scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
324                                                           SOCK_STREAM));
325     sink.Monitor(server.get());
326
327     // Initiate connect
328     EXPECT_EQ(0, server->Bind(initial_addr));
329     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
330
331     EXPECT_EQ(0, server->Listen(5));
332     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
333
334     // Server close before socket enters accept queue
335     EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
336     server->Close();
337
338     ss_->ProcessMessagesUntilIdle();
339
340     // Result: connection failed
341     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
342     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
343
344     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
345     sink.Monitor(server.get());
346
347     // Initiate connect
348     EXPECT_EQ(0, server->Bind(initial_addr));
349     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
350
351     EXPECT_EQ(0, server->Listen(5));
352     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
353
354     ss_->ProcessMessagesUntilIdle();
355
356     // Server close while socket is in accept queue
357     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
358     server->Close();
359
360     ss_->ProcessMessagesUntilIdle();
361
362     // Result: connection failed
363     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
364     EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
365
366     // New server
367     server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
368     sink.Monitor(server.get());
369
370     // Initiate connect
371     EXPECT_EQ(0, server->Bind(initial_addr));
372     EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
373
374     EXPECT_EQ(0, server->Listen(5));
375     EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
376
377     ss_->ProcessMessagesUntilIdle();
378
379     // Server accepts connection
380     EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
381     scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
382     ASSERT_TRUE(NULL != accepted.get());
383     sink.Monitor(accepted.get());
384
385     // Client closes before connection complets
386     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
387
388     // Connected message has not been processed yet.
389     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
390     client->Close();
391
392     ss_->ProcessMessagesUntilIdle();
393
394     // Result: accepted socket closes
395     EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
396     EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
397     EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
398   }
399
400   void CloseTest(const SocketAddress& initial_addr) {
401     testing::StreamSink sink;
402     const SocketAddress kEmptyAddr;
403
404     // Create clients
405     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
406     sink.Monitor(a);
407     a->Bind(initial_addr);
408     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
409
410
411     scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
412                                                      SOCK_STREAM));
413     sink.Monitor(b.get());
414     b->Bind(initial_addr);
415     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
416
417     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
418     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
419
420     ss_->ProcessMessagesUntilIdle();
421
422     EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
423     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
424     EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
425
426     EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
427     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
428     EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
429
430     EXPECT_EQ(1, a->Send("a", 1));
431     b->Close();
432     EXPECT_EQ(1, a->Send("b", 1));
433
434     ss_->ProcessMessagesUntilIdle();
435
436     char buffer[10];
437     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
438     EXPECT_EQ(-1, b->Recv(buffer, 10));
439
440     EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
441     EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
442     EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
443
444     // No signal for Closer
445     EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
446     EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
447     EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
448   }
449
450   void TcpSendTest(const SocketAddress& initial_addr) {
451     testing::StreamSink sink;
452     const SocketAddress kEmptyAddr;
453
454     // Connect two sockets
455     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
456     sink.Monitor(a);
457     a->Bind(initial_addr);
458     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
459
460     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
461     sink.Monitor(b);
462     b->Bind(initial_addr);
463     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
464
465     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
466     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
467
468     ss_->ProcessMessagesUntilIdle();
469
470     const size_t kBufferSize = 2000;
471     ss_->set_send_buffer_capacity(kBufferSize);
472     ss_->set_recv_buffer_capacity(kBufferSize);
473
474     const size_t kDataSize = 5000;
475     char send_buffer[kDataSize], recv_buffer[kDataSize];
476     for (size_t i = 0; i < kDataSize; ++i)
477       send_buffer[i] = static_cast<char>(i % 256);
478     memset(recv_buffer, 0, sizeof(recv_buffer));
479     size_t send_pos = 0, recv_pos = 0;
480
481     // Can't send more than send buffer in one write
482     int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
483     EXPECT_EQ(static_cast<int>(kBufferSize), result);
484     send_pos += result;
485
486     ss_->ProcessMessagesUntilIdle();
487     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
488     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
489
490     // Receive buffer is already filled, fill send buffer again
491     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
492     EXPECT_EQ(static_cast<int>(kBufferSize), result);
493     send_pos += result;
494
495     ss_->ProcessMessagesUntilIdle();
496     EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
497     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
498
499     // No more room in send or receive buffer
500     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
501     EXPECT_EQ(-1, result);
502     EXPECT_TRUE(a->IsBlocking());
503
504     // Read a subset of the data
505     result = b->Recv(recv_buffer + recv_pos, 500);
506     EXPECT_EQ(500, result);
507     recv_pos += result;
508
509     ss_->ProcessMessagesUntilIdle();
510     EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
511     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
512
513     // Room for more on the sending side
514     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
515     EXPECT_EQ(500, result);
516     send_pos += result;
517
518     // Empty the recv buffer
519     while (true) {
520       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
521       if (result < 0) {
522         EXPECT_EQ(-1, result);
523         EXPECT_TRUE(b->IsBlocking());
524         break;
525       }
526       recv_pos += result;
527     }
528
529     ss_->ProcessMessagesUntilIdle();
530     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
531
532     // Continue to empty the recv buffer
533     while (true) {
534       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
535       if (result < 0) {
536         EXPECT_EQ(-1, result);
537         EXPECT_TRUE(b->IsBlocking());
538         break;
539       }
540       recv_pos += result;
541     }
542
543     // Send last of the data
544     result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
545     EXPECT_EQ(500, result);
546     send_pos += result;
547
548     ss_->ProcessMessagesUntilIdle();
549     EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
550
551     // Receive the last of the data
552     while (true) {
553       result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
554       if (result < 0) {
555         EXPECT_EQ(-1, result);
556         EXPECT_TRUE(b->IsBlocking());
557         break;
558       }
559       recv_pos += result;
560     }
561
562     ss_->ProcessMessagesUntilIdle();
563     EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
564
565     // The received data matches the sent data
566     EXPECT_EQ(kDataSize, send_pos);
567     EXPECT_EQ(kDataSize, recv_pos);
568     EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
569   }
570
571   void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
572     const SocketAddress kEmptyAddr;
573
574     // Connect two sockets
575     AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
576                                             SOCK_STREAM);
577     AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
578                                             SOCK_STREAM);
579     a->Bind(initial_addr);
580     EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
581
582     b->Bind(initial_addr);
583     EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
584
585     EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
586     EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
587     ss_->ProcessMessagesUntilIdle();
588
589     // First, deliver all packets in 0 ms.
590     char buffer[2] = { 0, 0 };
591     const char cNumPackets = 10;
592     for (char i = 0; i < cNumPackets; ++i) {
593       buffer[0] = '0' + i;
594       EXPECT_EQ(1, a->Send(buffer, 1));
595     }
596
597     ss_->ProcessMessagesUntilIdle();
598
599     for (char i = 0; i < cNumPackets; ++i) {
600       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
601       EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
602     }
603
604     // Next, deliver packets at random intervals
605     const uint32 mean = 50;
606     const uint32 stddev = 50;
607
608     ss_->set_delay_mean(mean);
609     ss_->set_delay_stddev(stddev);
610     ss_->UpdateDelayDistribution();
611
612     for (char i = 0; i < cNumPackets; ++i) {
613       buffer[0] = 'A' + i;
614       EXPECT_EQ(1, a->Send(buffer, 1));
615     }
616
617     ss_->ProcessMessagesUntilIdle();
618
619     for (char i = 0; i < cNumPackets; ++i) {
620       EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
621       EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
622     }
623   }
624
625   void BandwidthTest(const SocketAddress& initial_addr) {
626     AsyncSocket* send_socket =
627         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
628     AsyncSocket* recv_socket =
629         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
630     ASSERT_EQ(0, send_socket->Bind(initial_addr));
631     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
632     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
633     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
634     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
635
636     uint32 bandwidth = 64 * 1024;
637     ss_->set_bandwidth(bandwidth);
638
639     Thread* pthMain = Thread::Current();
640     Sender sender(pthMain, send_socket, 80 * 1024);
641     Receiver receiver(pthMain, recv_socket, bandwidth);
642
643     pthMain->ProcessMessages(5000);
644     sender.done = true;
645     pthMain->ProcessMessages(5000);
646
647     ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
648     ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
649
650     ss_->set_bandwidth(0);
651   }
652
653   void DelayTest(const SocketAddress& initial_addr) {
654     time_t seed = ::time(NULL);
655     LOG(LS_VERBOSE) << "seed = " << seed;
656     srand(static_cast<unsigned int>(seed));
657
658     const uint32 mean = 2000;
659     const uint32 stddev = 500;
660
661     ss_->set_delay_mean(mean);
662     ss_->set_delay_stddev(stddev);
663     ss_->UpdateDelayDistribution();
664
665     AsyncSocket* send_socket =
666         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
667     AsyncSocket* recv_socket =
668         ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
669     ASSERT_EQ(0, send_socket->Bind(initial_addr));
670     ASSERT_EQ(0, recv_socket->Bind(initial_addr));
671     EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
672     EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
673     ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
674
675     Thread* pthMain = Thread::Current();
676     // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
677     // 1000 packets, which is necessary to get a good distribution.
678     Sender sender(pthMain, send_socket, 100 * 2 * 1024);
679     Receiver receiver(pthMain, recv_socket, 0);
680
681     pthMain->ProcessMessages(10000);
682     sender.done = receiver.done = true;
683     ss_->ProcessMessagesUntilIdle();
684
685     const double sample_mean = receiver.sum / receiver.samples;
686     double num =
687         receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
688     double den = receiver.samples * (receiver.samples - 1);
689     const double sample_stddev = std::sqrt(num / den);
690     LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
691
692     EXPECT_LE(500u, receiver.samples);
693     // We initially used a 0.1 fudge factor, but on the build machine, we
694     // have seen the value differ by as much as 0.13.
695     EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
696     EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
697
698     ss_->set_delay_mean(0);
699     ss_->set_delay_stddev(0);
700     ss_->UpdateDelayDistribution();
701   }
702
703   // Test cross-family communication between a client bound to client_addr and a
704   // server bound to server_addr. shouldSucceed indicates if communication is
705   // expected to work or not.
706   void CrossFamilyConnectionTest(const SocketAddress& client_addr,
707                                  const SocketAddress& server_addr,
708                                  bool shouldSucceed) {
709     testing::StreamSink sink;
710     SocketAddress accept_address;
711     const SocketAddress kEmptyAddr;
712
713     // Client gets a IPv4 address
714     AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
715                                                  SOCK_STREAM);
716     sink.Monitor(client);
717     EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
718     EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
719     client->Bind(client_addr);
720
721     // Server gets a non-mapped non-any IPv6 address.
722     // IPv4 sockets should not be able to connect to this.
723     AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
724                                                  SOCK_STREAM);
725     sink.Monitor(server);
726     server->Bind(server_addr);
727     server->Listen(5);
728
729     if (shouldSucceed) {
730       EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
731       ss_->ProcessMessagesUntilIdle();
732       EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
733       Socket* accepted = server->Accept(&accept_address);
734       EXPECT_TRUE(NULL != accepted);
735       EXPECT_NE(kEmptyAddr, accept_address);
736       ss_->ProcessMessagesUntilIdle();
737       EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
738       EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
739     } else {
740       // Check that the connection failed.
741       EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
742       ss_->ProcessMessagesUntilIdle();
743
744       EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
745       EXPECT_TRUE(NULL == server->Accept(&accept_address));
746       EXPECT_EQ(accept_address, kEmptyAddr);
747       EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
748       EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
749       EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
750     }
751   }
752
753   // Test cross-family datagram sending between a client bound to client_addr
754   // and a server bound to server_addr. shouldSucceed indicates if sending is
755   // expected to succed or not.
756   void CrossFamilyDatagramTest(const SocketAddress& client_addr,
757                                const SocketAddress& server_addr,
758                                bool shouldSucceed) {
759     AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
760     socket->Bind(server_addr);
761     SocketAddress bound_server_addr = socket->GetLocalAddress();
762     TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
763
764     AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
765     socket2->Bind(client_addr);
766     TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
767     SocketAddress client2_addr;
768
769     if (shouldSucceed) {
770       EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
771       EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
772       SocketAddress client1_addr;
773       EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
774       EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
775       EXPECT_EQ(client1_addr, bound_server_addr);
776     } else {
777       EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
778       EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
779     }
780   }
781
782  protected:
783   virtual void SetUp() {
784     Thread::Current()->set_socketserver(ss_);
785   }
786   virtual void TearDown() {
787     Thread::Current()->set_socketserver(NULL);
788   }
789
790   VirtualSocketServer* ss_;
791   const SocketAddress kIPv4AnyAddress;
792   const SocketAddress kIPv6AnyAddress;
793 };
794
795 TEST_F(VirtualSocketServerTest, basic_v4) {
796   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
797   BasicTest(ipv4_test_addr);
798 }
799
800 TEST_F(VirtualSocketServerTest, basic_v6) {
801   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
802   BasicTest(ipv6_test_addr);
803 }
804
805 TEST_F(VirtualSocketServerTest, connect_v4) {
806   ConnectTest(kIPv4AnyAddress);
807 }
808
809 TEST_F(VirtualSocketServerTest, connect_v6) {
810   ConnectTest(kIPv6AnyAddress);
811 }
812
813 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
814   ConnectToNonListenerTest(kIPv4AnyAddress);
815 }
816
817 TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
818   ConnectToNonListenerTest(kIPv6AnyAddress);
819 }
820
821 TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
822   CloseDuringConnectTest(kIPv4AnyAddress);
823 }
824
825 TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
826   CloseDuringConnectTest(kIPv6AnyAddress);
827 }
828
829 TEST_F(VirtualSocketServerTest, close_v4) {
830   CloseTest(kIPv4AnyAddress);
831 }
832
833 TEST_F(VirtualSocketServerTest, close_v6) {
834   CloseTest(kIPv6AnyAddress);
835 }
836
837 TEST_F(VirtualSocketServerTest, tcp_send_v4) {
838   TcpSendTest(kIPv4AnyAddress);
839 }
840
841 TEST_F(VirtualSocketServerTest, tcp_send_v6) {
842   TcpSendTest(kIPv6AnyAddress);
843 }
844
845 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
846   TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
847 }
848
849 TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
850   TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
851 }
852
853 TEST_F(VirtualSocketServerTest, bandwidth_v4) {
854   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
855   BandwidthTest(ipv4_test_addr);
856 }
857
858 TEST_F(VirtualSocketServerTest, bandwidth_v6) {
859   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
860   BandwidthTest(ipv6_test_addr);
861 }
862
863 TEST_F(VirtualSocketServerTest, delay_v4) {
864   SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
865   DelayTest(ipv4_test_addr);
866 }
867
868 // See: https://code.google.com/p/webrtc/issues/detail?id=2409
869 TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) {
870   SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
871   DelayTest(ipv6_test_addr);
872 }
873
874 // Works, receiving socket sees 127.0.0.2.
875 TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
876   CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
877                             SocketAddress("0.0.0.0", 5000),
878                             true);
879 }
880
881 // Fails.
882 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
883   CrossFamilyConnectionTest(SocketAddress("::2", 0),
884                             SocketAddress("0.0.0.0", 5000),
885                             false);
886 }
887
888 // Fails.
889 TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
890   CrossFamilyConnectionTest(SocketAddress("::2", 0),
891                             SocketAddress("::ffff:127.0.0.1", 5000),
892                             false);
893 }
894
895 // Works. receiving socket sees ::ffff:127.0.0.2.
896 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
897   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
898                             SocketAddress("::", 5000),
899                             true);
900 }
901
902 // Fails.
903 TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
904   CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
905                             SocketAddress("::1", 5000),
906                             false);
907 }
908
909 // Works. Receiving socket sees ::ffff:127.0.0.1.
910 TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
911   CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
912                             SocketAddress("::ffff:127.0.0.2", 5000),
913                             true);
914 }
915
916 // Works, receiving socket sees a result from GetNextIP.
917 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
918   CrossFamilyConnectionTest(SocketAddress("::", 0),
919                             SocketAddress("0.0.0.0", 5000),
920                             true);
921 }
922
923 // Works, receiving socket sees whatever GetNextIP gave the client.
924 TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
925   CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
926                             SocketAddress("::", 5000),
927                             true);
928 }
929
930 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
931   CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
932                           SocketAddress("::", 5000),
933                           true);
934 }
935
936 TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
937   CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
938                           SocketAddress("0.0.0.0", 5000),
939                           true);
940 }
941
942 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
943   CrossFamilyDatagramTest(SocketAddress("::2", 0),
944                           SocketAddress("0.0.0.0", 5000),
945                           false);
946 }
947
948 TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
949   CrossFamilyDatagramTest(SocketAddress("::2", 0),
950                           SocketAddress("::ffff:127.0.0.1", 5000),
951                           false);
952 }
953
954 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
955   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
956                           SocketAddress("::", 5000),
957                           true);
958 }
959
960 TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
961   CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
962                           SocketAddress("::1", 5000),
963                           false);
964 }
965
966 TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
967   CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
968                           SocketAddress("::ffff:127.0.0.2", 5000),
969                           true);
970 }
971
972 TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
973   CrossFamilyDatagramTest(SocketAddress("::", 0),
974                           SocketAddress("0.0.0.0", 5000),
975                           true);
976 }
977
978 TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
979   const uint32 kTestMean[] = { 10, 100, 333, 1000 };
980   const double kTestDev[] = { 0.25, 0.1, 0.01 };
981   // TODO: The current code only works for 1000 data points or more.
982   const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
983   for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
984     for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
985       for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
986         ASSERT_LT(0u, kTestSamples[sidx]);
987         const uint32 kStdDev =
988             static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
989         VirtualSocketServer::Function* f =
990             VirtualSocketServer::CreateDistribution(kTestMean[midx],
991                                                     kStdDev,
992                                                     kTestSamples[sidx]);
993         ASSERT_TRUE(NULL != f);
994         ASSERT_EQ(kTestSamples[sidx], f->size());
995         double sum = 0;
996         for (uint32 i = 0; i < f->size(); ++i) {
997           sum += (*f)[i].second;
998         }
999         const double mean = sum / f->size();
1000         double sum_sq_dev = 0;
1001         for (uint32 i = 0; i < f->size(); ++i) {
1002           double dev = (*f)[i].second - mean;
1003           sum_sq_dev += dev * dev;
1004         }
1005         const double stddev = std::sqrt(sum_sq_dev / f->size());
1006         EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
1007           << "M=" << kTestMean[midx]
1008           << " SD=" << kStdDev
1009           << " N=" << kTestSamples[sidx];
1010         EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
1011           << "M=" << kTestMean[midx]
1012           << " SD=" << kStdDev
1013           << " N=" << kTestSamples[sidx];
1014         delete f;
1015       }
1016     }
1017   }
1018 }