2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
11 #include "webrtc/base/virtualsocketserver.h"
20 #include "webrtc/base/common.h"
21 #include "webrtc/base/logging.h"
22 #include "webrtc/base/physicalsocketserver.h"
23 #include "webrtc/base/socketaddresspair.h"
24 #include "webrtc/base/thread.h"
25 #include "webrtc/base/timeutils.h"
28 #if defined(WEBRTC_WIN)
29 const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} };
31 // This value is entirely arbitrary, hence the lack of concern about endianness.
32 const in_addr kInitialNextIPv4 = { 0x01000000 };
34 // Starts at ::2 so as to not cause confusion with ::1.
35 const in6_addr kInitialNextIPv6 = { { {
36 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
39 const uint16 kFirstEphemeralPort = 49152;
40 const uint16 kLastEphemeralPort = 65535;
41 const uint16 kEphemeralPortCount = kLastEphemeralPort - kFirstEphemeralPort + 1;
42 const uint32 kDefaultNetworkCapacity = 64 * 1024;
43 const uint32 kDefaultTcpBufferSize = 32 * 1024;
45 const uint32 UDP_HEADER_SIZE = 28; // IP + UDP headers
46 const uint32 TCP_HEADER_SIZE = 40; // IP + TCP headers
47 const uint32 TCP_MSS = 1400; // Maximum segment size
49 // Note: The current algorithm doesn't work for sample sizes smaller than this.
50 const int NUM_SAMPLES = 1000;
58 // Packets are passed between sockets as messages. We copy the data just like
60 class Packet : public MessageData {
62 Packet(const char* data, size_t size, const SocketAddress& from)
63 : size_(size), consumed_(0), from_(from) {
65 data_ = new char[size_];
66 memcpy(data_, data, size_);
73 const char* data() const { return data_ + consumed_; }
74 size_t size() const { return size_ - consumed_; }
75 const SocketAddress& from() const { return from_; }
77 // Remove the first size bytes from the data.
78 void Consume(size_t size) {
79 ASSERT(size + consumed_ < size_);
85 size_t size_, consumed_;
89 struct MessageAddress : public MessageData {
90 explicit MessageAddress(const SocketAddress& a) : addr(a) { }
94 // Implements the socket interface using the virtual network. Packets are
95 // passed as messages using the message queue of the socket server.
96 class VirtualSocket : public AsyncSocket, public MessageHandler {
98 VirtualSocket(VirtualSocketServer* server, int family, int type, bool async)
99 : server_(server), family_(family), type_(type), async_(async),
100 state_(CS_CLOSED), error_(0), listen_queue_(NULL),
101 write_enabled_(false),
102 network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) {
103 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
104 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams
107 virtual ~VirtualSocket() {
110 for (RecvBuffer::iterator it = recv_buffer_.begin();
111 it != recv_buffer_.end(); ++it) {
116 virtual SocketAddress GetLocalAddress() const {
120 virtual SocketAddress GetRemoteAddress() const {
124 // Used by server sockets to set the local address without binding.
125 void SetLocalAddress(const SocketAddress& addr) {
129 virtual int Bind(const SocketAddress& addr) {
130 if (!local_addr_.IsNil()) {
135 int result = server_->Bind(this, &local_addr_);
141 was_any_ = addr.IsAnyIP();
146 virtual int Connect(const SocketAddress& addr) {
147 return InitiateConnect(addr, true);
150 virtual int Close() {
151 if (!local_addr_.IsNil() && bound_) {
152 // Remove from the binding table.
153 server_->Unbind(local_addr_, this);
157 if (SOCK_STREAM == type_) {
158 // Cancel pending sockets
160 while (!listen_queue_->empty()) {
161 SocketAddress addr = listen_queue_->front();
163 // Disconnect listening socket.
164 server_->Disconnect(server_->LookupBinding(addr));
165 listen_queue_->pop_front();
167 delete listen_queue_;
168 listen_queue_ = NULL;
170 // Disconnect stream sockets
171 if (CS_CONNECTED == state_) {
172 // Disconnect remote socket, check if it is a child of a server socket.
173 VirtualSocket* socket =
174 server_->LookupConnection(local_addr_, remote_addr_);
176 // Not a server socket child, then see if it is bound.
177 // TODO: If this is indeed a server socket that has no
178 // children this will cause the server socket to be
179 // closed. This might lead to unexpected results, how to fix this?
180 socket = server_->LookupBinding(remote_addr_);
182 server_->Disconnect(socket);
184 // Remove mapping for both directions.
185 server_->RemoveConnection(remote_addr_, local_addr_);
186 server_->RemoveConnection(local_addr_, remote_addr_);
188 // Cancel potential connects
190 if (server_->msg_queue_) {
191 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
193 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
194 ASSERT(NULL != it->pdata);
195 MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
197 // Lookup remote side.
198 VirtualSocket* socket = server_->LookupConnection(local_addr_,
201 // Server socket, remote side is a socket retreived by
202 // accept. Accepted sockets are not bound so we will not
203 // find it by looking in the bindings table.
204 server_->Disconnect(socket);
205 server_->RemoveConnection(local_addr_, data->addr);
207 server_->Disconnect(server_->LookupBinding(data->addr));
211 // Clear incoming packets and disconnect messages
212 if (server_->msg_queue_) {
213 server_->msg_queue_->Clear(this);
219 remote_addr_.Clear();
223 virtual int Send(const void *pv, size_t cb) {
224 if (CS_CONNECTED != state_) {
228 if (SOCK_DGRAM == type_) {
229 return SendUdp(pv, cb, remote_addr_);
231 return SendTcp(pv, cb);
235 virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
236 if (SOCK_DGRAM == type_) {
237 return SendUdp(pv, cb, addr);
239 if (CS_CONNECTED != state_) {
243 return SendTcp(pv, cb);
247 virtual int Recv(void *pv, size_t cb) {
249 return RecvFrom(pv, cb, &addr);
252 virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
253 // If we don't have a packet, then either error or wait for one to arrive.
254 if (recv_buffer_.empty()) {
259 while (recv_buffer_.empty()) {
261 server_->msg_queue_->Get(&msg);
262 server_->msg_queue_->Dispatch(&msg);
266 // Return the packet at the front of the queue.
267 Packet* packet = recv_buffer_.front();
268 size_t data_read = _min(cb, packet->size());
269 memcpy(pv, packet->data(), data_read);
270 *paddr = packet->from();
272 if (data_read < packet->size()) {
273 packet->Consume(data_read);
275 recv_buffer_.pop_front();
279 if (SOCK_STREAM == type_) {
280 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
281 recv_buffer_size_ -= data_read;
283 VirtualSocket* sender = server_->LookupBinding(remote_addr_);
284 ASSERT(NULL != sender);
285 server_->SendTcp(sender);
289 return static_cast<int>(data_read);
292 virtual int Listen(int backlog) {
293 ASSERT(SOCK_STREAM == type_);
294 ASSERT(CS_CLOSED == state_);
295 if (local_addr_.IsNil()) {
299 ASSERT(NULL == listen_queue_);
300 listen_queue_ = new ListenQueue;
301 state_ = CS_CONNECTING;
305 virtual VirtualSocket* Accept(SocketAddress *paddr) {
306 if (NULL == listen_queue_) {
310 while (!listen_queue_->empty()) {
311 VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_,
314 // Set the new local address to the same as this server socket.
315 socket->SetLocalAddress(local_addr_);
316 // Sockets made from a socket that 'was Any' need to inherit that.
317 socket->set_was_any(was_any_);
318 SocketAddress remote_addr(listen_queue_->front());
319 int result = socket->InitiateConnect(remote_addr, false);
320 listen_queue_->pop_front();
325 socket->CompleteConnect(remote_addr, false);
327 *paddr = remote_addr;
331 error_ = EWOULDBLOCK;
335 virtual int GetError() const {
339 virtual void SetError(int error) {
343 virtual ConnState GetState() const {
347 virtual int GetOption(Option opt, int* value) {
348 OptionsMap::const_iterator it = options_map_.find(opt);
349 if (it == options_map_.end()) {
353 return 0; // 0 is success to emulate getsockopt()
356 virtual int SetOption(Option opt, int value) {
357 options_map_[opt] = value;
358 return 0; // 0 is success to emulate setsockopt()
361 virtual int EstimateMTU(uint16* mtu) {
362 if (CS_CONNECTED != state_)
368 void OnMessage(Message *pmsg) {
369 if (pmsg->message_id == MSG_ID_PACKET) {
370 //ASSERT(!local_addr_.IsAny());
371 ASSERT(NULL != pmsg->pdata);
372 Packet* packet = static_cast<Packet*>(pmsg->pdata);
374 recv_buffer_.push_back(packet);
377 SignalReadEvent(this);
379 } else if (pmsg->message_id == MSG_ID_CONNECT) {
380 ASSERT(NULL != pmsg->pdata);
381 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
382 if (listen_queue_ != NULL) {
383 listen_queue_->push_back(data->addr);
385 SignalReadEvent(this);
387 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
388 CompleteConnect(data->addr, true);
390 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
391 server_->Disconnect(server_->LookupBinding(data->addr));
394 } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
395 ASSERT(SOCK_STREAM == type_);
396 if (CS_CLOSED != state_) {
397 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
399 remote_addr_.Clear();
401 SignalCloseEvent(this, error);
409 bool was_any() { return was_any_; }
410 void set_was_any(bool was_any) { was_any_ = was_any; }
413 struct NetworkEntry {
418 typedef std::deque<SocketAddress> ListenQueue;
419 typedef std::deque<NetworkEntry> NetworkQueue;
420 typedef std::vector<char> SendBuffer;
421 typedef std::list<Packet*> RecvBuffer;
422 typedef std::map<Option, int> OptionsMap;
424 int InitiateConnect(const SocketAddress& addr, bool use_delay) {
425 if (!remote_addr_.IsNil()) {
426 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
429 if (local_addr_.IsNil()) {
430 // If there's no local address set, grab a random one in the correct AF.
432 if (addr.ipaddr().family() == AF_INET) {
433 result = Bind(SocketAddress("0.0.0.0", 0));
434 } else if (addr.ipaddr().family() == AF_INET6) {
435 result = Bind(SocketAddress("::", 0));
441 if (type_ == SOCK_DGRAM) {
443 state_ = CS_CONNECTED;
445 int result = server_->Connect(this, addr, use_delay);
447 error_ = EHOSTUNREACH;
450 state_ = CS_CONNECTING;
455 void CompleteConnect(const SocketAddress& addr, bool notify) {
456 ASSERT(CS_CONNECTING == state_);
458 state_ = CS_CONNECTED;
459 server_->AddConnection(remote_addr_, local_addr_, this);
460 if (async_ && notify) {
461 SignalConnectEvent(this);
465 int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) {
466 // If we have not been assigned a local port, then get one.
467 if (local_addr_.IsNil()) {
468 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
469 int result = server_->Bind(this, &local_addr_);
477 // Send the data in a message to the appropriate socket.
478 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
481 int SendTcp(const void* pv, size_t cb) {
482 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
484 write_enabled_ = true;
485 error_ = EWOULDBLOCK;
488 size_t consumed = _min(cb, capacity);
489 const char* cpv = static_cast<const char*>(pv);
490 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
491 server_->SendTcp(this);
492 return static_cast<int>(consumed);
495 VirtualSocketServer* server_;
501 SocketAddress local_addr_;
502 SocketAddress remote_addr_;
504 // Pending sockets which can be Accepted
505 ListenQueue* listen_queue_;
507 // Data which tcp has buffered for sending
508 SendBuffer send_buffer_;
511 // Critical section to protect the recv_buffer and queue_
512 CriticalSection crit_;
514 // Network model that enforces bandwidth and capacity constraints
515 NetworkQueue network_;
516 size_t network_size_;
518 // Data which has been received from the network
519 RecvBuffer recv_buffer_;
520 // The amount of data which is in flight or in recv_buffer_
521 size_t recv_buffer_size_;
523 // Is this socket bound?
526 // When we bind a socket to Any, VSS's Bind gives it another address. For
527 // dual-stack sockets, we want to distinguish between sockets that were
528 // explicitly given a particular address and sockets that had one picked
532 // Store the options that are set
533 OptionsMap options_map_;
535 friend class VirtualSocketServer;
538 VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
539 : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
540 network_delay_(Time()), next_ipv4_(kInitialNextIPv4),
541 next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort),
542 bindings_(new AddressMap()), connections_(new ConnectionMap()),
543 bandwidth_(0), network_capacity_(kDefaultNetworkCapacity),
544 send_buffer_capacity_(kDefaultTcpBufferSize),
545 recv_buffer_capacity_(kDefaultTcpBufferSize),
546 delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES),
547 delay_dist_(NULL), drop_prob_(0.0) {
549 server_ = new PhysicalSocketServer();
550 server_owned_ = true;
552 UpdateDelayDistribution();
555 VirtualSocketServer::~VirtualSocketServer() {
564 IPAddress VirtualSocketServer::GetNextIP(int family) {
565 if (family == AF_INET) {
566 IPAddress next_ip(next_ipv4_);
568 HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1);
570 } else if (family == AF_INET6) {
571 IPAddress next_ip(next_ipv6_);
572 uint32* as_ints = reinterpret_cast<uint32*>(&next_ipv6_.s6_addr);
579 uint16 VirtualSocketServer::GetNextPort() {
580 uint16 port = next_port_;
581 if (next_port_ < kLastEphemeralPort) {
584 next_port_ = kFirstEphemeralPort;
589 Socket* VirtualSocketServer::CreateSocket(int type) {
590 return CreateSocket(AF_INET, type);
593 Socket* VirtualSocketServer::CreateSocket(int family, int type) {
594 return CreateSocketInternal(family, type);
597 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
598 return CreateAsyncSocket(AF_INET, type);
601 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
602 return CreateSocketInternal(family, type);
605 VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
606 return new VirtualSocket(this, family, type, true);
609 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
610 msg_queue_ = msg_queue;
612 msg_queue_->SignalQueueDestroyed.connect(this,
613 &VirtualSocketServer::OnMessageQueueDestroyed);
617 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
618 ASSERT(msg_queue_ == Thread::Current());
619 if (stop_on_idle_ && Thread::Current()->empty()) {
622 return socketserver()->Wait(cmsWait, process_io);
625 void VirtualSocketServer::WakeUp() {
626 socketserver()->WakeUp();
629 bool VirtualSocketServer::ProcessMessagesUntilIdle() {
630 ASSERT(msg_queue_ == Thread::Current());
631 stop_on_idle_ = true;
632 while (!msg_queue_->empty()) {
634 if (msg_queue_->Get(&msg, kForever)) {
635 msg_queue_->Dispatch(&msg);
638 stop_on_idle_ = false;
639 return !msg_queue_->IsQuitting();
642 void VirtualSocketServer::SetNextPortForTesting(uint16 port) {
646 int VirtualSocketServer::Bind(VirtualSocket* socket,
647 const SocketAddress& addr) {
648 ASSERT(NULL != socket);
649 // Address must be completely specified at this point
650 ASSERT(!IPIsUnspec(addr.ipaddr()));
651 ASSERT(addr.port() != 0);
653 // Normalize the address (turns v6-mapped addresses into v4-addresses).
654 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
656 AddressMap::value_type entry(normalized, socket);
657 return bindings_->insert(entry).second ? 0 : -1;
660 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
661 ASSERT(NULL != socket);
663 if (IPIsAny(addr->ipaddr())) {
664 addr->SetIP(GetNextIP(addr->ipaddr().family()));
665 } else if (!IPIsUnspec(addr->ipaddr())) {
666 addr->SetIP(addr->ipaddr().Normalized());
671 if (addr->port() == 0) {
672 for (int i = 0; i < kEphemeralPortCount; ++i) {
673 addr->SetPort(GetNextPort());
674 if (bindings_->find(*addr) == bindings_->end()) {
680 return Bind(socket, *addr);
683 VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
684 SocketAddress normalized(addr.ipaddr().Normalized(),
686 AddressMap::iterator it = bindings_->find(normalized);
687 return (bindings_->end() != it) ? it->second : NULL;
690 int VirtualSocketServer::Unbind(const SocketAddress& addr,
691 VirtualSocket* socket) {
692 SocketAddress normalized(addr.ipaddr().Normalized(),
694 ASSERT((*bindings_)[normalized] == socket);
695 bindings_->erase(bindings_->find(normalized));
699 void VirtualSocketServer::AddConnection(const SocketAddress& local,
700 const SocketAddress& remote,
701 VirtualSocket* remote_socket) {
702 // Add this socket pair to our routing table. This will allow
703 // multiple clients to connect to the same server address.
704 SocketAddress local_normalized(local.ipaddr().Normalized(),
706 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
708 SocketAddressPair address_pair(local_normalized, remote_normalized);
709 connections_->insert(std::pair<SocketAddressPair,
710 VirtualSocket*>(address_pair, remote_socket));
713 VirtualSocket* VirtualSocketServer::LookupConnection(
714 const SocketAddress& local,
715 const SocketAddress& remote) {
716 SocketAddress local_normalized(local.ipaddr().Normalized(),
718 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
720 SocketAddressPair address_pair(local_normalized, remote_normalized);
721 ConnectionMap::iterator it = connections_->find(address_pair);
722 return (connections_->end() != it) ? it->second : NULL;
725 void VirtualSocketServer::RemoveConnection(const SocketAddress& local,
726 const SocketAddress& remote) {
727 SocketAddress local_normalized(local.ipaddr().Normalized(),
729 SocketAddress remote_normalized(remote.ipaddr().Normalized(),
731 SocketAddressPair address_pair(local_normalized, remote_normalized);
732 connections_->erase(address_pair);
735 static double Random() {
736 return static_cast<double>(rand()) / RAND_MAX;
739 int VirtualSocketServer::Connect(VirtualSocket* socket,
740 const SocketAddress& remote_addr,
742 uint32 delay = use_delay ? GetRandomTransitDelay() : 0;
743 VirtualSocket* remote = LookupBinding(remote_addr);
744 if (!CanInteractWith(socket, remote)) {
745 LOG(LS_INFO) << "Address family mismatch between "
746 << socket->GetLocalAddress() << " and " << remote_addr;
749 if (remote != NULL) {
750 SocketAddress addr = socket->GetLocalAddress();
751 msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT,
752 new MessageAddress(addr));
754 LOG(LS_INFO) << "No one listening at " << remote_addr;
755 msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT);
760 bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
762 // Remove the mapping.
763 msg_queue_->Post(socket, MSG_ID_DISCONNECT);
769 int VirtualSocketServer::SendUdp(VirtualSocket* socket,
770 const char* data, size_t data_size,
771 const SocketAddress& remote_addr) {
772 // See if we want to drop this packet.
773 if (Random() < drop_prob_) {
774 LOG(LS_VERBOSE) << "Dropping packet: bad luck";
775 return static_cast<int>(data_size);
778 VirtualSocket* recipient = LookupBinding(remote_addr);
780 // Make a fake recipient for address family checking.
781 scoped_ptr<VirtualSocket> dummy_socket(
782 CreateSocketInternal(AF_INET, SOCK_DGRAM));
783 dummy_socket->SetLocalAddress(remote_addr);
784 if (!CanInteractWith(socket, dummy_socket.get())) {
785 LOG(LS_VERBOSE) << "Incompatible address families: "
786 << socket->GetLocalAddress() << " and " << remote_addr;
789 LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
790 return static_cast<int>(data_size);
793 if (!CanInteractWith(socket, recipient)) {
794 LOG(LS_VERBOSE) << "Incompatible address families: "
795 << socket->GetLocalAddress() << " and " << remote_addr;
799 CritScope cs(&socket->crit_);
801 uint32 cur_time = Time();
802 PurgeNetworkPackets(socket, cur_time);
804 // Determine whether we have enough bandwidth to accept this packet. To do
805 // this, we need to update the send queue. Once we know it's current size,
806 // we know whether we can fit this packet.
808 // NOTE: There are better algorithms for maintaining such a queue (such as
809 // "Derivative Random Drop"); however, this algorithm is a more accurate
810 // simulation of what a normal network would do.
812 size_t packet_size = data_size + UDP_HEADER_SIZE;
813 if (socket->network_size_ + packet_size > network_capacity_) {
814 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
815 return static_cast<int>(data_size);
818 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
819 UDP_HEADER_SIZE, false);
821 return static_cast<int>(data_size);
824 void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
825 // TCP can't send more data than will fill up the receiver's buffer.
826 // We track the data that is in the buffer plus data in flight using the
827 // recipient's recv_buffer_size_. Anything beyond that must be stored in the
828 // sender's buffer. We will trigger the buffered data to be sent when data
829 // is read from the recv_buffer.
831 // Lookup the local/remote pair in the connections table.
832 VirtualSocket* recipient = LookupConnection(socket->local_addr_,
833 socket->remote_addr_);
835 LOG(LS_VERBOSE) << "Sending data to no one.";
839 CritScope cs(&socket->crit_);
841 uint32 cur_time = Time();
842 PurgeNetworkPackets(socket, cur_time);
845 size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
846 size_t max_data_size = _min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
847 size_t data_size = _min(socket->send_buffer_.size(), max_data_size);
851 AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
852 data_size, TCP_HEADER_SIZE, true);
853 recipient->recv_buffer_size_ += data_size;
855 size_t new_buffer_size = socket->send_buffer_.size() - data_size;
856 // Avoid undefined access beyond the last element of the vector.
857 // This only happens when new_buffer_size is 0.
858 if (data_size < socket->send_buffer_.size()) {
859 // memmove is required for potentially overlapping source/destination.
860 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
863 socket->send_buffer_.resize(new_buffer_size);
866 if (socket->write_enabled_
867 && (socket->send_buffer_.size() < send_buffer_capacity_)) {
868 socket->write_enabled_ = false;
869 socket->SignalWriteEvent(socket);
873 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
874 VirtualSocket* recipient,
880 VirtualSocket::NetworkEntry entry;
881 entry.size = data_size + header_size;
883 sender->network_size_ += entry.size;
884 uint32 send_delay = SendDelay(static_cast<uint32>(sender->network_size_));
885 entry.done_time = cur_time + send_delay;
886 sender->network_.push_back(entry);
888 // Find the delay for crossing the many virtual hops of the network.
889 uint32 transit_delay = GetRandomTransitDelay();
891 // Post the packet as a message to be delivered (on our own thread)
892 Packet* p = new Packet(data, data_size, sender->local_addr_);
893 uint32 ts = TimeAfter(send_delay + transit_delay);
895 // Ensure that new packets arrive after previous ones
896 // TODO: consider ordering on a per-socket basis, since this
897 // introduces artifical delay.
898 ts = TimeMax(ts, network_delay_);
900 msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p);
901 network_delay_ = TimeMax(ts, network_delay_);
904 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
906 while (!socket->network_.empty() &&
907 (socket->network_.front().done_time <= cur_time)) {
908 ASSERT(socket->network_size_ >= socket->network_.front().size);
909 socket->network_size_ -= socket->network_.front().size;
910 socket->network_.pop_front();
914 uint32 VirtualSocketServer::SendDelay(uint32 size) {
918 return 1000 * size / bandwidth_;
922 void PrintFunction(std::vector<std::pair<double, double> >* f) {
925 for (uint32 i = 0; i < f->size(); ++i) {
926 std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
927 sum += (*f)[i].second;
930 const double mean = sum / f->size();
931 double sum_sq_dev = 0;
932 for (uint32 i = 0; i < f->size(); ++i) {
933 double dev = (*f)[i].second - mean;
934 sum_sq_dev += dev * dev;
936 std::cout << "Mean = " << mean << " StdDev = "
937 << sqrt(sum_sq_dev / f->size()) << std::endl;
942 void VirtualSocketServer::UpdateDelayDistribution() {
943 Function* dist = CreateDistribution(delay_mean_, delay_stddev_,
945 // We take a lock just to make sure we don't leak memory.
947 CritScope cs(&delay_crit_);
953 static double PI = 4 * atan(1.0);
955 static double Normal(double x, double mean, double stddev) {
956 double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
957 return exp(-a) / (stddev * sqrt(2 * PI));
960 #if 0 // static unused gives a warning
961 static double Pareto(double x, double min, double k) {
965 return k * std::pow(min, k) / std::pow(x, k+1);
969 VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution(
970 uint32 mean, uint32 stddev, uint32 samples) {
971 Function* f = new Function();
974 f->push_back(Point(mean, 1.0));
977 if (mean >= 4 * static_cast<double>(stddev))
978 start = mean - 4 * static_cast<double>(stddev);
979 double end = mean + 4 * static_cast<double>(stddev);
981 for (uint32 i = 0; i < samples; i++) {
982 double x = start + (end - start) * i / (samples - 1);
983 double y = Normal(x, mean, stddev);
984 f->push_back(Point(x, y));
987 return Resample(Invert(Accumulate(f)), 0, 1, samples);
990 uint32 VirtualSocketServer::GetRandomTransitDelay() {
991 size_t index = rand() % delay_dist_->size();
992 double delay = (*delay_dist_)[index].second;
993 //LOG_F(LS_INFO) << "random[" << index << "] = " << delay;
994 return static_cast<uint32>(delay);
997 struct FunctionDomainCmp {
998 bool operator()(const VirtualSocketServer::Point& p1,
999 const VirtualSocketServer::Point& p2) {
1000 return p1.first < p2.first;
1002 bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1003 return v1 < p2.first;
1005 bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1006 return p1.first < v2;
1010 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1011 ASSERT(f->size() >= 1);
1013 for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1014 double dx = (*f)[i + 1].first - (*f)[i].first;
1015 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1019 (*f)[f->size()-1].second = v;
1023 VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
1024 for (Function::size_type i = 0; i < f->size(); ++i)
1025 std::swap((*f)[i].first, (*f)[i].second);
1027 std::sort(f->begin(), f->end(), FunctionDomainCmp());
1031 VirtualSocketServer::Function* VirtualSocketServer::Resample(
1032 Function* f, double x1, double x2, uint32 samples) {
1033 Function* g = new Function();
1035 for (size_t i = 0; i < samples; i++) {
1036 double x = x1 + (x2 - x1) * i / (samples - 1);
1037 double y = Evaluate(f, x);
1038 g->push_back(Point(x, y));
1045 double VirtualSocketServer::Evaluate(Function* f, double x) {
1046 Function::iterator iter =
1047 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1048 if (iter == f->begin()) {
1049 return (*f)[0].second;
1050 } else if (iter == f->end()) {
1051 ASSERT(f->size() >= 1);
1052 return (*f)[f->size() - 1].second;
1053 } else if (iter->first == x) {
1054 return iter->second;
1056 double x1 = (iter - 1)->first;
1057 double y1 = (iter - 1)->second;
1058 double x2 = iter->first;
1059 double y2 = iter->second;
1060 return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1064 bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
1065 VirtualSocket* remote) {
1066 if (!local || !remote) {
1069 IPAddress local_ip = local->GetLocalAddress().ipaddr();
1070 IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
1071 IPAddress local_normalized = local_ip.Normalized();
1072 IPAddress remote_normalized = remote_ip.Normalized();
1073 // Check if the addresses are the same family after Normalization (turns
1074 // mapped IPv6 address into IPv4 addresses).
1075 // This will stop unmapped V6 addresses from talking to mapped V6 addresses.
1076 if (local_normalized.family() == remote_normalized.family()) {
1080 // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY.
1081 int remote_v6_only = 0;
1082 remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only);
1083 if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) {
1086 // Same check, backwards.
1087 int local_v6_only = 0;
1088 local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only);
1089 if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) {
1093 // Check to see if either socket was explicitly bound to IPv6-any.
1094 // These sockets can talk with anyone.
1095 if (local_ip.family() == AF_INET6 && local->was_any()) {
1098 if (remote_ip.family() == AF_INET6 && remote->was_any()) {