1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/test/utility/udp_proxy.h"
7 #include "base/logging.h"
8 #include "base/rand_util.h"
9 #include "base/synchronization/waitable_event.h"
10 #include "base/threading/thread.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/udp/udp_socket.h"
19 const size_t kMaxPacketSize = 65536;
21 Packet::Packet(size_t size) : data(size) {}
24 PacketPipe::PacketPipe() {}
25 PacketPipe::~PacketPipe() {}
26 void PacketPipe::InitOnIOThread() {}
27 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
29 pipe_->AppendToPipe(pipe.Pass());
35 // Roughly emulates a buffer inside a device.
36 // If the buffer is full, packets are dropped.
37 // Packets are output at a maximum bandwidth.
38 class Buffer : public PacketPipe {
40 Buffer(size_t buffer_size,
41 double max_megabits_per_second) :
43 max_buffer_size_(buffer_size),
44 max_megabits_per_second_(max_megabits_per_second),
48 virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
49 if (packet->data.size() + buffer_size_ <= max_buffer_size_) {
50 buffer_.push_back(packet);
51 buffer_size_ += packet->data.size();
52 if (buffer_.size() == 1) {
60 double megabits = buffer_.front()->data.size() * 8 / 1000000.0;
61 double seconds = megabits / max_megabits_per_second_;
62 int64 microseconds = static_cast<int64>(seconds * 1E6);
63 base::MessageLoop::current()->PostDelayedTask(
65 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
66 base::TimeDelta::FromMicroseconds(microseconds));
69 void ProcessBuffer() {
70 CHECK(!buffer_.empty());
71 pipe_->Send(buffer_.front());
72 buffer_size_ -= buffer_.front()->data.size();
74 if (!buffer_.empty()) {
79 std::deque<scoped_refptr<Packet> > buffer_;
81 size_t max_buffer_size_;
82 double max_megabits_per_second_; // megabits per second
83 base::WeakPtrFactory<Buffer> weak_factory_;
86 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
87 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
90 class RandomDrop : public PacketPipe {
92 RandomDrop(double drop_fraction) : drop_fraction_(drop_fraction) {
95 virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
96 if (base::RandDouble() >= drop_fraction_) {
102 double drop_fraction_;
105 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
106 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
109 class SimpleDelayBase : public PacketPipe {
111 SimpleDelayBase() : weak_factory_(this) {
113 virtual ~SimpleDelayBase() {}
115 virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
116 double seconds = GetDelay();
117 base::MessageLoop::current()->PostDelayedTask(
119 base::Bind(&SimpleDelayBase::SendInternal,
120 weak_factory_.GetWeakPtr(),
122 base::TimeDelta::FromMicroseconds(
123 static_cast<int64>(seconds * 1E6)));
126 virtual double GetDelay() = 0;
129 virtual void SendInternal(scoped_refptr<Packet> packet) {
133 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
136 class ConstantDelay : public SimpleDelayBase {
138 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {
140 virtual double GetDelay() OVERRIDE {
141 return delay_seconds_;
145 double delay_seconds_;
148 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
149 return scoped_ptr<PacketPipe>(
150 new ConstantDelay(delay_seconds)).Pass();
153 class RandomUnsortedDelay : public SimpleDelayBase {
155 RandomUnsortedDelay(double random_delay) :
156 random_delay_(random_delay) {
159 virtual double GetDelay() OVERRIDE {
160 return random_delay_ * base::RandDouble();
164 double random_delay_;
167 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
168 return scoped_ptr<PacketPipe>(
169 new RandomUnsortedDelay(random_delay)).Pass();
173 class RandomSortedDelay : public PacketPipe {
175 RandomSortedDelay(double random_delay,
177 double seconds_between_extra_delay) :
178 random_delay_(random_delay),
179 extra_delay_(extra_delay),
180 seconds_between_extra_delay_(seconds_between_extra_delay),
181 weak_factory_(this) {
184 virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
185 buffer_.push_back(packet);
186 if (buffer_.size() == 1) {
190 virtual void InitOnIOThread() OVERRIDE {
191 // As we start the stream, assume that we are in a random
192 // place between two extra delays, thus multiplier = 1.0;
193 ScheduleExtraDelay(1.0);
197 void ScheduleExtraDelay(double mult) {
198 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
199 int64 microseconds = static_cast<int64>(seconds * 1E6);
200 base::MessageLoop::current()->PostDelayedTask(
202 base::Bind(&RandomSortedDelay::CauseExtraDelay,
203 weak_factory_.GetWeakPtr()),
204 base::TimeDelta::FromMicroseconds(microseconds));
207 void CauseExtraDelay() {
208 block_until_ = base::TimeTicks::Now() +
209 base::TimeDelta::FromMicroseconds(
210 static_cast<int64>(extra_delay_ * 1E6));
211 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
212 // before scheduling another one to make the average equal to
213 // seconds_between_extra_delay_.
214 ScheduleExtraDelay(2.0);
218 double seconds = base::RandDouble() * random_delay_;
219 base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
220 base::TimeDelta delay_time =
221 base::TimeDelta::FromMicroseconds(
222 static_cast<int64>(seconds * 1E6));
223 if (block_time > delay_time) {
224 block_time = delay_time;
227 base::MessageLoop::current()->PostDelayedTask(
229 base::Bind(&RandomSortedDelay::ProcessBuffer,
230 weak_factory_.GetWeakPtr()),
234 void ProcessBuffer() {
235 CHECK(!buffer_.empty());
236 pipe_->Send(buffer_.front());
238 if (!buffer_.empty()) {
243 base::TimeTicks block_until_;
244 std::deque<scoped_refptr<Packet> > buffer_;
245 double random_delay_;
247 double seconds_between_extra_delay_;
248 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
251 scoped_ptr<PacketPipe> NewRandomSortedDelay(
254 double seconds_between_extra_delay) {
255 return scoped_ptr<PacketPipe>(
256 new RandomSortedDelay(random_delay,
258 seconds_between_extra_delay)).Pass();
261 class NetworkGlitchPipe : public PacketPipe {
263 NetworkGlitchPipe(double average_work_time,
264 double average_outage_time) :
266 max_work_time_(average_work_time * 2),
267 max_outage_time_(average_outage_time * 2),
268 weak_factory_(this) {
271 virtual void InitOnIOThread() OVERRIDE {
275 virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
284 double seconds = base::RandDouble() *
285 (works_ ? max_work_time_ : max_outage_time_);
286 int64 microseconds = static_cast<int64>(seconds * 1E6);
287 base::MessageLoop::current()->PostDelayedTask(
289 base::Bind(&NetworkGlitchPipe::Flip,
290 weak_factory_.GetWeakPtr()),
291 base::TimeDelta::FromMicroseconds(microseconds));
295 double max_work_time_;
296 double max_outage_time_;
297 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
300 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
301 double average_outage_time) {
302 return scoped_ptr<PacketPipe>(
303 new NetworkGlitchPipe(average_work_time, average_outage_time)).Pass();
306 class PacketSender : public PacketPipe {
308 PacketSender(net::UDPSocket* udp_socket,
309 const net::IPEndPoint* destination) :
311 udp_socket_(udp_socket),
312 destination_(destination),
313 weak_factory_(this) {
315 virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
317 LOG(ERROR) << "Cannot write packet right now: blocked";
321 VLOG(1) << "Sending packet, len = " << packet->data.size();
322 // We ignore all problems, callbacks and errors.
323 // If it didn't work we just drop the packet at and call it a day.
324 scoped_refptr<net::IOBuffer> buf = new net::WrappedIOBuffer(
325 reinterpret_cast<char*>(&packet->data.front()));
327 if (destination_->address().empty()) {
328 VLOG(1) << "Destination has not been set yet.";
330 VLOG(1) << "Destination:" << destination_->ToString();
331 result = udp_socket_->SendTo(buf,
332 static_cast<int>(packet->data.size()),
334 base::Bind(&PacketSender::AllowWrite,
335 weak_factory_.GetWeakPtr(),
339 if (result == net::ERR_IO_PENDING) {
341 } else if (result < 0) {
342 LOG(ERROR) << "Failed to write packet.";
345 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
350 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
351 scoped_refptr<Packet> packet,
357 net::UDPSocket* udp_socket_;
358 const net::IPEndPoint* destination_; // not owned
359 base::WeakPtrFactory<PacketSender> weak_factory_;
363 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
365 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
372 scoped_ptr<PacketPipe> WifiNetwork() {
373 // This represents the buffer on the sender.
374 scoped_ptr<PacketPipe> pipe;
375 BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
376 BuildPipe(&pipe, new RandomDrop(0.005));
377 // This represents the buffer on the router.
378 BuildPipe(&pipe, new ConstantDelay(1E-3));
379 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
380 BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
381 BuildPipe(&pipe, new ConstantDelay(1E-3));
382 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
383 BuildPipe(&pipe, new RandomDrop(0.005));
384 // This represents the buffer on the receiving device.
385 BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
389 scoped_ptr<PacketPipe> EvilNetwork() {
390 // This represents the buffer on the sender.
391 scoped_ptr<PacketPipe> pipe;
392 BuildPipe(&pipe, new Buffer(4 << 10, 2000000));
393 // This represents the buffer on the router.
394 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
395 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
396 BuildPipe(&pipe, new Buffer(4 << 10, 1000000)); // 4 kb buf, 1mbit/s
397 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
398 BuildPipe(&pipe, new ConstantDelay(1E-3));
399 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
400 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
401 // This represents the buffer on the receiving device.
402 BuildPipe(&pipe, new Buffer(4 << 10, 2000000)); // 4 kb buf, 2mbit/s
406 class UDPProxyImpl : public UDPProxy {
408 UDPProxyImpl(const net::IPEndPoint& local_port,
409 const net::IPEndPoint& destination,
410 scoped_ptr<PacketPipe> to_dest_pipe,
411 scoped_ptr<PacketPipe> from_dest_pipe,
412 net::NetLog* net_log) :
413 proxy_thread_("media::cast::test::UdpProxy Thread"),
414 local_port_(local_port),
415 destination_(destination),
416 start_event_(false, false),
417 to_dest_pipe_(to_dest_pipe.Pass()),
418 from_dest_pipe_(to_dest_pipe.Pass()) {
419 proxy_thread_.StartWithOptions(
420 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
421 proxy_thread_.message_loop_proxy()->PostTask(
423 base::Bind(&UDPProxyImpl::Start,
424 base::Unretained(this),
429 void Start(net::NetLog* net_log) {
430 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
431 net::RandIntCallback(),
433 net::NetLog::Source()));
434 BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_));
435 BuildPipe(&from_dest_pipe_,
436 new PacketSender(socket_.get(), &return_address_));
437 to_dest_pipe_->InitOnIOThread();
438 from_dest_pipe_->InitOnIOThread();
440 VLOG(0) << "From:" << local_port_.ToString();
441 VLOG(0) << "To:" << destination_.ToString();
443 CHECK_GE(socket_->Bind(local_port_), 0);
445 start_event_.Signal();
449 virtual ~UDPProxyImpl() {
450 proxy_thread_.Stop();
454 void ProcessPacket(scoped_refptr<Packet> packet,
455 scoped_refptr<net::IOBuffer> recv_buf,
457 DCHECK_NE(len, net::ERR_IO_PENDING);
458 VLOG(1) << "Got packet, len = " << len;
460 LOG(WARNING) << "Socket read error: " << len;
463 packet->data.resize(len);
464 if (recv_address_ == destination_) {
465 from_dest_pipe_->Send(packet);
467 VLOG(1) << "Return address = " << recv_address_.ToString();
468 return_address_ = recv_address_;
469 to_dest_pipe_->Send(packet);
473 void ReadCallback(scoped_refptr<Packet> packet,
474 scoped_refptr<net::IOBuffer> recv_buf,
476 ProcessPacket(packet, recv_buf, len);
482 scoped_refptr<Packet> packet(new Packet(kMaxPacketSize));
483 scoped_refptr<net::IOBuffer> recv_buf = new net::WrappedIOBuffer(
484 reinterpret_cast<char*>(&packet->data.front()));
485 int len = socket_->RecvFrom(
489 base::Bind(&UDPProxyImpl::ReadCallback,
490 base::Unretained(this),
493 if (len == net::ERR_IO_PENDING)
495 ProcessPacket(packet, recv_buf, len);
500 net::IPEndPoint local_port_;
501 net::IPEndPoint destination_;
502 net::IPEndPoint recv_address_;
503 net::IPEndPoint return_address_;
504 base::Thread proxy_thread_;
505 scoped_ptr<net::UDPSocket> socket_;
506 scoped_ptr<PacketPipe> to_dest_pipe_;
507 scoped_ptr<PacketPipe> from_dest_pipe_;
508 base::WaitableEvent start_event_;
511 scoped_ptr<UDPProxy> UDPProxy::Create(
512 const net::IPEndPoint& local_port,
513 const net::IPEndPoint& destination,
514 scoped_ptr<PacketPipe> to_dest_pipe,
515 scoped_ptr<PacketPipe> from_dest_pipe,
516 net::NetLog* net_log) {
517 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
520 from_dest_pipe.Pass(),