1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
7 #include "media/cast/test/utility/udp_proxy.h"
9 #include "base/logging.h"
10 #include "base/memory/linked_ptr.h"
11 #include "base/rand_util.h"
12 #include "base/synchronization/waitable_event.h"
13 #include "base/threading/thread.h"
14 #include "base/time/default_tick_clock.h"
15 #include "net/base/io_buffer.h"
16 #include "net/base/net_errors.h"
17 #include "net/udp/udp_socket.h"
23 const size_t kMaxPacketSize = 65536;
25 PacketPipe::PacketPipe() {}
26 PacketPipe::~PacketPipe() {}
27 void PacketPipe::InitOnIOThread(
28 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
29 base::TickClock* clock) {
30 task_runner_ = task_runner;
33 pipe_->InitOnIOThread(task_runner, clock);
36 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
38 pipe_->AppendToPipe(pipe.Pass());
44 // Roughly emulates a buffer inside a device.
45 // If the buffer is full, packets are dropped.
46 // Packets are output at a maximum bandwidth.
47 class Buffer : public PacketPipe {
49 Buffer(size_t buffer_size, double max_megabits_per_second)
51 max_buffer_size_(buffer_size),
52 max_megabits_per_second_(max_megabits_per_second),
54 CHECK_GT(max_buffer_size_, 0UL);
55 CHECK_GT(max_megabits_per_second, 0);
58 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
59 if (packet->size() + buffer_size_ <= max_buffer_size_) {
60 buffer_size_ += packet->size();
61 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
62 if (buffer_.size() == 1) {
70 double megabits = buffer_.front()->size() * 8 / 1000000.0;
71 double seconds = megabits / max_megabits_per_second_;
72 int64 microseconds = static_cast<int64>(seconds * 1E6);
73 task_runner_->PostDelayedTask(
75 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
76 base::TimeDelta::FromMicroseconds(microseconds));
79 void ProcessBuffer() {
80 CHECK(!buffer_.empty());
81 scoped_ptr<transport::Packet> packet(buffer_.front().release());
82 buffer_size_ -= packet->size();
84 pipe_->Send(packet.Pass());
85 if (!buffer_.empty()) {
90 std::deque<linked_ptr<transport::Packet> > buffer_;
92 size_t max_buffer_size_;
93 double max_megabits_per_second_; // megabits per second
94 base::WeakPtrFactory<Buffer> weak_factory_;
97 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
98 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
101 class RandomDrop : public PacketPipe {
103 RandomDrop(double drop_fraction)
104 : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
106 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
107 if (rand() > drop_fraction_) {
108 pipe_->Send(packet.Pass());
116 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
117 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
120 class SimpleDelayBase : public PacketPipe {
122 SimpleDelayBase() : weak_factory_(this) {}
123 virtual ~SimpleDelayBase() {}
125 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
126 double seconds = GetDelay();
127 task_runner_->PostDelayedTask(
129 base::Bind(&SimpleDelayBase::SendInternal,
130 weak_factory_.GetWeakPtr(),
131 base::Passed(&packet)),
132 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
135 virtual double GetDelay() = 0;
138 virtual void SendInternal(scoped_ptr<transport::Packet> packet) {
139 pipe_->Send(packet.Pass());
142 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
145 class ConstantDelay : public SimpleDelayBase {
147 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
148 virtual double GetDelay() OVERRIDE {
149 return delay_seconds_;
153 double delay_seconds_;
156 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
157 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
160 class RandomUnsortedDelay : public SimpleDelayBase {
162 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
164 virtual double GetDelay() OVERRIDE {
165 return random_delay_ * base::RandDouble();
169 double random_delay_;
172 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
173 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
177 class RandomSortedDelay : public PacketPipe {
179 RandomSortedDelay(double random_delay,
181 double seconds_between_extra_delay)
182 : random_delay_(random_delay),
183 extra_delay_(extra_delay),
184 seconds_between_extra_delay_(seconds_between_extra_delay),
185 weak_factory_(this) {}
187 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
188 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
189 if (buffer_.size() == 1) {
193 virtual void InitOnIOThread(
194 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
195 base::TickClock* clock) OVERRIDE {
196 PacketPipe::InitOnIOThread(task_runner, clock);
197 // As we start the stream, assume that we are in a random
198 // place between two extra delays, thus multiplier = 1.0;
199 ScheduleExtraDelay(1.0);
203 void ScheduleExtraDelay(double mult) {
204 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
205 int64 microseconds = static_cast<int64>(seconds * 1E6);
206 task_runner_->PostDelayedTask(
208 base::Bind(&RandomSortedDelay::CauseExtraDelay,
209 weak_factory_.GetWeakPtr()),
210 base::TimeDelta::FromMicroseconds(microseconds));
213 void CauseExtraDelay() {
214 block_until_ = clock_->NowTicks() +
215 base::TimeDelta::FromMicroseconds(
216 static_cast<int64>(extra_delay_ * 1E6));
217 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
218 // before scheduling another one to make the average equal to
219 // seconds_between_extra_delay_.
220 ScheduleExtraDelay(2.0);
224 double seconds = base::RandDouble() * random_delay_;
225 base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
226 base::TimeDelta delay_time =
227 base::TimeDelta::FromMicroseconds(
228 static_cast<int64>(seconds * 1E6));
229 if (block_time > delay_time) {
230 block_time = delay_time;
233 task_runner_->PostDelayedTask(FROM_HERE,
234 base::Bind(&RandomSortedDelay::ProcessBuffer,
235 weak_factory_.GetWeakPtr()),
239 void ProcessBuffer() {
240 CHECK(!buffer_.empty());
241 scoped_ptr<transport::Packet> packet(buffer_.front().release());
242 pipe_->Send(packet.Pass());
244 if (!buffer_.empty()) {
249 base::TimeTicks block_until_;
250 std::deque<linked_ptr<transport::Packet> > buffer_;
251 double random_delay_;
253 double seconds_between_extra_delay_;
254 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
257 scoped_ptr<PacketPipe> NewRandomSortedDelay(
260 double seconds_between_extra_delay) {
261 return scoped_ptr<PacketPipe>(
262 new RandomSortedDelay(
263 random_delay, extra_delay, seconds_between_extra_delay))
267 class NetworkGlitchPipe : public PacketPipe {
269 NetworkGlitchPipe(double average_work_time, double average_outage_time)
271 max_work_time_(average_work_time * 2),
272 max_outage_time_(average_outage_time * 2),
273 weak_factory_(this) {}
275 virtual void InitOnIOThread(
276 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
277 base::TickClock* clock) OVERRIDE {
278 PacketPipe::InitOnIOThread(task_runner, clock);
282 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
284 pipe_->Send(packet.Pass());
291 double seconds = base::RandDouble() *
292 (works_ ? max_work_time_ : max_outage_time_);
293 int64 microseconds = static_cast<int64>(seconds * 1E6);
294 task_runner_->PostDelayedTask(
296 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
297 base::TimeDelta::FromMicroseconds(microseconds));
301 double max_work_time_;
302 double max_outage_time_;
303 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
306 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
307 double average_outage_time) {
308 return scoped_ptr<PacketPipe>(
309 new NetworkGlitchPipe(average_work_time, average_outage_time))
315 class PacketSender : public PacketPipe {
317 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
318 : udp_proxy_(udp_proxy), destination_(destination) {}
319 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE;
320 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
325 UDPProxyImpl* udp_proxy_;
326 const net::IPEndPoint* destination_; // not owned
330 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
332 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
339 scoped_ptr<PacketPipe> WifiNetwork() {
340 // This represents the buffer on the sender.
341 scoped_ptr<PacketPipe> pipe;
342 BuildPipe(&pipe, new Buffer(256 << 10, 20));
343 BuildPipe(&pipe, new RandomDrop(0.005));
344 // This represents the buffer on the router.
345 BuildPipe(&pipe, new ConstantDelay(1E-3));
346 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
347 BuildPipe(&pipe, new Buffer(256 << 10, 20));
348 BuildPipe(&pipe, new ConstantDelay(1E-3));
349 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
350 BuildPipe(&pipe, new RandomDrop(0.005));
351 // This represents the buffer on the receiving device.
352 BuildPipe(&pipe, new Buffer(256 << 10, 20));
356 scoped_ptr<PacketPipe> BadNetwork() {
357 scoped_ptr<PacketPipe> pipe;
358 // This represents the buffer on the sender.
359 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
360 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
361 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
362 // This represents the buffer on the router.
363 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
364 BuildPipe(&pipe, new ConstantDelay(1E-3));
365 // Random 40ms every other second
366 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
367 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
368 // This represents the buffer on the receiving device.
369 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
374 scoped_ptr<PacketPipe> EvilNetwork() {
375 // This represents the buffer on the sender.
376 scoped_ptr<PacketPipe> pipe;
377 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
378 // This represents the buffer on the router.
379 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
380 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
381 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
382 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
383 BuildPipe(&pipe, new ConstantDelay(1E-3));
384 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
385 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
386 // This represents the buffer on the receiving device.
387 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
391 class UDPProxyImpl : public UDPProxy {
393 UDPProxyImpl(const net::IPEndPoint& local_port,
394 const net::IPEndPoint& destination,
395 scoped_ptr<PacketPipe> to_dest_pipe,
396 scoped_ptr<PacketPipe> from_dest_pipe,
397 net::NetLog* net_log)
398 : local_port_(local_port),
399 destination_(destination),
400 destination_is_mutable_(destination.address().empty()),
401 proxy_thread_("media::cast::test::UdpProxy Thread"),
402 to_dest_pipe_(to_dest_pipe.Pass()),
403 from_dest_pipe_(from_dest_pipe.Pass()),
405 weak_factory_(this) {
406 proxy_thread_.StartWithOptions(
407 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
408 base::WaitableEvent start_event(false, false);
409 proxy_thread_.message_loop_proxy()->PostTask(
411 base::Bind(&UDPProxyImpl::Start,
412 base::Unretained(this),
413 base::Unretained(&start_event),
418 virtual ~UDPProxyImpl() {
419 base::WaitableEvent stop_event(false, false);
420 proxy_thread_.message_loop_proxy()->PostTask(
422 base::Bind(&UDPProxyImpl::Stop,
423 base::Unretained(this),
424 base::Unretained(&stop_event)));
426 proxy_thread_.Stop();
429 void Send(scoped_ptr<transport::Packet> packet,
430 const net::IPEndPoint& destination) {
432 LOG(ERROR) << "Cannot write packet right now: blocked";
436 VLOG(1) << "Sending packet, len = " << packet->size();
437 // We ignore all problems, callbacks and errors.
438 // If it didn't work we just drop the packet at and call it a day.
439 scoped_refptr<net::IOBuffer> buf =
440 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
441 size_t buf_size = packet->size();
443 if (destination.address().empty()) {
444 VLOG(1) << "Destination has not been set yet.";
445 result = net::ERR_INVALID_ARGUMENT;
447 VLOG(1) << "Destination:" << destination.ToString();
448 result = socket_->SendTo(buf,
449 static_cast<int>(buf_size),
451 base::Bind(&UDPProxyImpl::AllowWrite,
452 weak_factory_.GetWeakPtr(),
454 base::Passed(&packet)));
456 if (result == net::ERR_IO_PENDING) {
458 } else if (result < 0) {
459 LOG(ERROR) << "Failed to write packet.";
464 void Start(base::WaitableEvent* start_event,
465 net::NetLog* net_log) {
466 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
467 net::RandIntCallback(),
469 net::NetLog::Source()));
470 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
471 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
472 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
474 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
477 VLOG(0) << "From:" << local_port_.ToString();
478 if (!destination_is_mutable_)
479 VLOG(0) << "To:" << destination_.ToString();
481 CHECK_GE(socket_->Bind(local_port_), 0);
483 start_event->Signal();
487 void Stop(base::WaitableEvent* stop_event) {
488 to_dest_pipe_.reset(NULL);
489 from_dest_pipe_.reset(NULL);
491 stop_event->Signal();
494 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
495 DCHECK_NE(len, net::ERR_IO_PENDING);
496 VLOG(1) << "Got packet, len = " << len;
498 LOG(WARNING) << "Socket read error: " << len;
501 packet_->resize(len);
502 if (destination_is_mutable_ && set_destination_next_ &&
503 !(recv_address_ == return_address_) &&
504 !(recv_address_ == destination_)) {
505 destination_ = recv_address_;
507 if (recv_address_ == destination_) {
508 set_destination_next_ = false;
509 from_dest_pipe_->Send(packet_.Pass());
511 set_destination_next_ = true;
512 VLOG(1) << "Return address = " << recv_address_.ToString();
513 return_address_ = recv_address_;
514 to_dest_pipe_->Send(packet_.Pass());
518 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
519 ProcessPacket(recv_buf, len);
525 packet_.reset(new transport::Packet(kMaxPacketSize));
526 scoped_refptr<net::IOBuffer> recv_buf =
527 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
528 int len = socket_->RecvFrom(
532 base::Bind(&UDPProxyImpl::ReadCallback,
533 base::Unretained(this),
535 if (len == net::ERR_IO_PENDING)
537 ProcessPacket(recv_buf, len);
541 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
542 scoped_ptr<transport::Packet> packet,
549 net::IPEndPoint local_port_;
551 net::IPEndPoint destination_;
552 bool destination_is_mutable_;
554 net::IPEndPoint return_address_;
555 bool set_destination_next_;
557 base::DefaultTickClock tick_clock_;
558 base::Thread proxy_thread_;
559 scoped_ptr<net::UDPSocket> socket_;
560 scoped_ptr<PacketPipe> to_dest_pipe_;
561 scoped_ptr<PacketPipe> from_dest_pipe_;
564 net::IPEndPoint recv_address_;
565 scoped_ptr<transport::Packet> packet_;
570 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
573 void PacketSender::Send(scoped_ptr<transport::Packet> packet) {
574 udp_proxy_->Send(packet.Pass(), *destination_);
577 scoped_ptr<UDPProxy> UDPProxy::Create(
578 const net::IPEndPoint& local_port,
579 const net::IPEndPoint& destination,
580 scoped_ptr<PacketPipe> to_dest_pipe,
581 scoped_ptr<PacketPipe> from_dest_pipe,
582 net::NetLog* net_log) {
583 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
586 from_dest_pipe.Pass(),