Upstream version 6.35.121.0
[platform/framework/web/crosswalk.git] / src / media / cast / test / utility / udp_proxy.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/cast/test/utility/udp_proxy.h"
6
7 #include "base/logging.h"
8 #include "base/rand_util.h"
9 #include "base/synchronization/waitable_event.h"
10 #include "base/threading/thread.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
13 #include "net/udp/udp_socket.h"
14
15 namespace media {
16 namespace cast {
17 namespace test {
18
19 const size_t kMaxPacketSize = 65536;
20
21 Packet::Packet(size_t size) : data(size) {}
22 Packet::~Packet() {}
23
24 PacketPipe::PacketPipe() {}
25 PacketPipe::~PacketPipe() {}
26 void PacketPipe::InitOnIOThread() {}
27 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
28   if (pipe_) {
29     pipe_->AppendToPipe(pipe.Pass());
30   } else {
31     pipe_ = pipe.Pass();
32   }
33 }
34
35 // Roughly emulates a buffer inside a device.
36 // If the buffer is full, packets are dropped.
37 // Packets are output at a maximum bandwidth.
38 class Buffer : public PacketPipe {
39  public:
40   Buffer(size_t buffer_size,
41          double max_megabits_per_second) :
42       buffer_size_(0),
43       max_buffer_size_(buffer_size),
44       max_megabits_per_second_(max_megabits_per_second),
45       weak_factory_(this) {
46   }
47
48   virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
49     if (packet->data.size() + buffer_size_ <= max_buffer_size_) {
50       buffer_.push_back(packet);
51       buffer_size_ += packet->data.size();
52       if (buffer_.size() == 1) {
53         Schedule();
54       }
55     }
56   }
57
58  private:
59   void Schedule() {
60     double megabits = buffer_.front()->data.size() * 8 / 1000000.0;
61     double seconds = megabits / max_megabits_per_second_;
62     int64 microseconds = static_cast<int64>(seconds * 1E6);
63     base::MessageLoop::current()->PostDelayedTask(
64         FROM_HERE,
65         base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
66         base::TimeDelta::FromMicroseconds(microseconds));
67   }
68
69   void ProcessBuffer() {
70     CHECK(!buffer_.empty());
71     pipe_->Send(buffer_.front());
72     buffer_size_ -= buffer_.front()->data.size();
73     buffer_.pop_front();
74     if (!buffer_.empty()) {
75       Schedule();
76     }
77   }
78
79   std::deque<scoped_refptr<Packet> > buffer_;
80   size_t buffer_size_;
81   size_t max_buffer_size_;
82   double max_megabits_per_second_;  // megabits per second
83   base::WeakPtrFactory<Buffer> weak_factory_;
84 };
85
86 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
87   return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
88 }
89
90 class RandomDrop : public PacketPipe {
91  public:
92   RandomDrop(double drop_fraction) : drop_fraction_(drop_fraction) {
93   }
94
95   virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
96     if (base::RandDouble() >= drop_fraction_) {
97       pipe_->Send(packet);
98     }
99   }
100
101  private:
102   double drop_fraction_;
103 };
104
105 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
106   return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
107 }
108
109 class SimpleDelayBase : public PacketPipe {
110  public:
111   SimpleDelayBase() : weak_factory_(this) {
112   }
113   virtual ~SimpleDelayBase() {}
114
115   virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
116     double seconds = GetDelay();
117     base::MessageLoop::current()->PostDelayedTask(
118         FROM_HERE,
119         base::Bind(&SimpleDelayBase::SendInternal,
120                    weak_factory_.GetWeakPtr(),
121                    packet),
122         base::TimeDelta::FromMicroseconds(
123             static_cast<int64>(seconds * 1E6)));
124   }
125  protected:
126   virtual double GetDelay() = 0;
127
128  private:
129   virtual void SendInternal(scoped_refptr<Packet> packet) {
130     pipe_->Send(packet);
131   }
132
133   base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
134 };
135
136 class ConstantDelay : public SimpleDelayBase {
137  public:
138   ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {
139   }
140   virtual double GetDelay() OVERRIDE {
141     return delay_seconds_;
142   }
143
144  private:
145   double delay_seconds_;
146 };
147
148 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
149   return scoped_ptr<PacketPipe>(
150       new ConstantDelay(delay_seconds)).Pass();
151 }
152
153 class RandomUnsortedDelay : public SimpleDelayBase {
154  public:
155   RandomUnsortedDelay(double random_delay) :
156       random_delay_(random_delay) {
157   }
158
159   virtual double GetDelay() OVERRIDE {
160     return random_delay_ * base::RandDouble();
161   }
162
163  private:
164   double random_delay_;
165 };
166
167 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
168   return scoped_ptr<PacketPipe>(
169       new RandomUnsortedDelay(random_delay)).Pass();
170 }
171
172
173 class RandomSortedDelay : public PacketPipe {
174  public:
175   RandomSortedDelay(double random_delay,
176                     double extra_delay,
177                     double seconds_between_extra_delay) :
178       random_delay_(random_delay),
179       extra_delay_(extra_delay),
180       seconds_between_extra_delay_(seconds_between_extra_delay),
181       weak_factory_(this) {
182   }
183
184   virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
185     buffer_.push_back(packet);
186     if (buffer_.size() == 1) {
187       Schedule();
188     }
189   }
190   virtual void InitOnIOThread() OVERRIDE {
191     // As we start the stream, assume that we are in a random
192     // place between two extra delays, thus multiplier = 1.0;
193     ScheduleExtraDelay(1.0);
194   }
195
196  private:
197   void ScheduleExtraDelay(double mult) {
198     double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
199     int64 microseconds = static_cast<int64>(seconds * 1E6);
200     base::MessageLoop::current()->PostDelayedTask(
201         FROM_HERE,
202         base::Bind(&RandomSortedDelay::CauseExtraDelay,
203                    weak_factory_.GetWeakPtr()),
204         base::TimeDelta::FromMicroseconds(microseconds));
205   }
206
207   void CauseExtraDelay() {
208     block_until_ = base::TimeTicks::Now() +
209         base::TimeDelta::FromMicroseconds(
210             static_cast<int64>(extra_delay_ * 1E6));
211     // An extra delay just happened, wait up to seconds_between_extra_delay_*2
212     // before scheduling another one to make the average equal to
213     // seconds_between_extra_delay_.
214     ScheduleExtraDelay(2.0);
215   }
216
217   void Schedule() {
218     double seconds = base::RandDouble() * random_delay_;
219     base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
220     base::TimeDelta delay_time =
221         base::TimeDelta::FromMicroseconds(
222             static_cast<int64>(seconds * 1E6));
223     if (block_time > delay_time) {
224       block_time = delay_time;
225     }
226
227     base::MessageLoop::current()->PostDelayedTask(
228         FROM_HERE,
229         base::Bind(&RandomSortedDelay::ProcessBuffer,
230                    weak_factory_.GetWeakPtr()),
231         delay_time);
232   }
233
234   void ProcessBuffer() {
235     CHECK(!buffer_.empty());
236     pipe_->Send(buffer_.front());
237     buffer_.pop_front();
238     if (!buffer_.empty()) {
239       Schedule();
240     }
241   }
242
243   base::TimeTicks block_until_;
244   std::deque<scoped_refptr<Packet> > buffer_;
245   double random_delay_;
246   double extra_delay_;
247   double seconds_between_extra_delay_;
248   base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
249 };
250
251 scoped_ptr<PacketPipe> NewRandomSortedDelay(
252     double random_delay,
253     double extra_delay,
254     double seconds_between_extra_delay) {
255   return scoped_ptr<PacketPipe>(
256       new RandomSortedDelay(random_delay,
257                             extra_delay,
258                             seconds_between_extra_delay)).Pass();
259 }
260
261 class NetworkGlitchPipe : public PacketPipe {
262  public:
263   NetworkGlitchPipe(double average_work_time,
264                     double average_outage_time) :
265       works_(false),
266       max_work_time_(average_work_time * 2),
267       max_outage_time_(average_outage_time * 2),
268       weak_factory_(this) {
269   }
270
271   virtual void InitOnIOThread() OVERRIDE {
272     Flip();
273   }
274
275   virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
276     if (works_) {
277       pipe_->Send(packet);
278     }
279   }
280
281  private:
282   void Flip() {
283     works_ = !works_;
284     double seconds = base::RandDouble() *
285         (works_ ? max_work_time_ : max_outage_time_);
286     int64 microseconds = static_cast<int64>(seconds * 1E6);
287     base::MessageLoop::current()->PostDelayedTask(
288         FROM_HERE,
289         base::Bind(&NetworkGlitchPipe::Flip,
290                    weak_factory_.GetWeakPtr()),
291         base::TimeDelta::FromMicroseconds(microseconds));
292   }
293
294   bool works_;
295   double max_work_time_;
296   double max_outage_time_;
297   base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
298 };
299
300 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
301                                             double average_outage_time) {
302   return scoped_ptr<PacketPipe>(
303       new NetworkGlitchPipe(average_work_time, average_outage_time)).Pass();
304 }
305
306 class PacketSender : public PacketPipe {
307  public:
308   PacketSender(net::UDPSocket* udp_socket,
309                const net::IPEndPoint* destination) :
310       blocked_(false),
311       udp_socket_(udp_socket),
312       destination_(destination),
313       weak_factory_(this) {
314   }
315   virtual void Send(scoped_refptr<Packet> packet) OVERRIDE {
316     if (blocked_) {
317       LOG(ERROR) << "Cannot write packet right now: blocked";
318       return;
319     }
320
321     VLOG(1) << "Sending packet, len = " << packet->data.size();
322     // We ignore all problems, callbacks and errors.
323     // If it didn't work we just drop the packet at and call it a day.
324     scoped_refptr<net::IOBuffer> buf = new net::WrappedIOBuffer(
325         reinterpret_cast<char*>(&packet->data.front()));
326     int result;
327     if (destination_->address().empty()) {
328       VLOG(1) << "Destination has not been set yet.";
329     } else {
330       VLOG(1) << "Destination:" << destination_->ToString();
331       result = udp_socket_->SendTo(buf,
332                                    static_cast<int>(packet->data.size()),
333                                    *destination_,
334                                    base::Bind(&PacketSender::AllowWrite,
335                                               weak_factory_.GetWeakPtr(),
336                                               buf,
337                                               packet));
338     }
339     if (result == net::ERR_IO_PENDING) {
340       blocked_ = true;
341     } else if (result < 0) {
342       LOG(ERROR) << "Failed to write packet.";
343     }
344   }
345   virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
346     NOTREACHED();
347   }
348
349  private:
350   void AllowWrite(scoped_refptr<net::IOBuffer> buf,
351                   scoped_refptr<Packet> packet,
352                   int unused_len) {
353     DCHECK(blocked_);
354     blocked_ = false;
355   }
356   bool blocked_;
357   net::UDPSocket* udp_socket_;
358   const net::IPEndPoint* destination_;  // not owned
359   base::WeakPtrFactory<PacketSender> weak_factory_;
360 };
361
362 namespace {
363 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
364   if (*pipe) {
365     (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
366   } else {
367     pipe->reset(next);
368   }
369 }
370 }  // namespace
371
372 scoped_ptr<PacketPipe> WifiNetwork() {
373   // This represents the buffer on the sender.
374   scoped_ptr<PacketPipe> pipe;
375   BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
376   BuildPipe(&pipe, new RandomDrop(0.005));
377   // This represents the buffer on the router.
378   BuildPipe(&pipe, new ConstantDelay(1E-3));
379   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
380   BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
381   BuildPipe(&pipe, new ConstantDelay(1E-3));
382   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
383   BuildPipe(&pipe, new RandomDrop(0.005));
384   // This represents the buffer on the receiving device.
385   BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
386   return pipe.Pass();
387 }
388
389 scoped_ptr<PacketPipe> EvilNetwork() {
390   // This represents the buffer on the sender.
391   scoped_ptr<PacketPipe> pipe;
392   BuildPipe(&pipe, new Buffer(4 << 10, 2000000));
393   // This represents the buffer on the router.
394   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
395   BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
396   BuildPipe(&pipe, new Buffer(4 << 10, 1000000));  // 4 kb buf, 1mbit/s
397   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
398   BuildPipe(&pipe, new ConstantDelay(1E-3));
399   BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
400   BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
401   // This represents the buffer on the receiving device.
402   BuildPipe(&pipe, new Buffer(4 << 10, 2000000));  // 4 kb buf, 2mbit/s
403   return pipe.Pass();
404 }
405
406 class UDPProxyImpl : public UDPProxy {
407  public:
408   UDPProxyImpl(const net::IPEndPoint& local_port,
409                const net::IPEndPoint& destination,
410                scoped_ptr<PacketPipe> to_dest_pipe,
411                scoped_ptr<PacketPipe> from_dest_pipe,
412                net::NetLog* net_log) :
413       proxy_thread_("media::cast::test::UdpProxy Thread"),
414       local_port_(local_port),
415       destination_(destination),
416       start_event_(false, false),
417       to_dest_pipe_(to_dest_pipe.Pass()),
418       from_dest_pipe_(to_dest_pipe.Pass()) {
419     proxy_thread_.StartWithOptions(
420         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
421     proxy_thread_.message_loop_proxy()->PostTask(
422         FROM_HERE,
423         base::Bind(&UDPProxyImpl::Start,
424                    base::Unretained(this),
425                    net_log));
426     start_event_.Wait();
427   }
428
429   void Start(net::NetLog* net_log) {
430     socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
431                                      net::RandIntCallback(),
432                                      net_log,
433                                      net::NetLog::Source()));
434     BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_));
435     BuildPipe(&from_dest_pipe_,
436               new PacketSender(socket_.get(), &return_address_));
437     to_dest_pipe_->InitOnIOThread();
438     from_dest_pipe_->InitOnIOThread();
439
440     VLOG(0) << "From:" << local_port_.ToString();
441     VLOG(0) << "To:" << destination_.ToString();
442
443     CHECK_GE(socket_->Bind(local_port_), 0);
444
445     start_event_.Signal();
446     PollRead();
447   }
448
449   virtual ~UDPProxyImpl() {
450     proxy_thread_.Stop();
451   }
452
453
454   void ProcessPacket(scoped_refptr<Packet> packet,
455                      scoped_refptr<net::IOBuffer> recv_buf,
456                      int len) {
457     DCHECK_NE(len, net::ERR_IO_PENDING);
458     VLOG(1) << "Got packet, len = " << len;
459     if (len < 0) {
460       LOG(WARNING) << "Socket read error: " << len;
461       return;
462     }
463     packet->data.resize(len);
464     if (recv_address_ == destination_) {
465       from_dest_pipe_->Send(packet);
466     } else {
467       VLOG(1) << "Return address = " << recv_address_.ToString();
468       return_address_ = recv_address_;
469       to_dest_pipe_->Send(packet);
470     }
471   }
472
473   void ReadCallback(scoped_refptr<Packet> packet,
474                     scoped_refptr<net::IOBuffer> recv_buf,
475                     int len) {
476     ProcessPacket(packet, recv_buf, len);
477     PollRead();
478   }
479
480   void PollRead() {
481     while (true) {
482       scoped_refptr<Packet> packet(new Packet(kMaxPacketSize));
483       scoped_refptr<net::IOBuffer> recv_buf = new net::WrappedIOBuffer(
484           reinterpret_cast<char*>(&packet->data.front()));
485       int len = socket_->RecvFrom(
486           recv_buf,
487           kMaxPacketSize,
488           &recv_address_,
489           base::Bind(&UDPProxyImpl::ReadCallback,
490                      base::Unretained(this),
491                      packet,
492                      recv_buf));
493       if (len == net::ERR_IO_PENDING)
494         break;
495       ProcessPacket(packet, recv_buf, len);
496     }
497   }
498
499  private:
500   net::IPEndPoint local_port_;
501   net::IPEndPoint destination_;
502   net::IPEndPoint recv_address_;
503   net::IPEndPoint return_address_;
504   base::Thread proxy_thread_;
505   scoped_ptr<net::UDPSocket> socket_;
506   scoped_ptr<PacketPipe> to_dest_pipe_;
507   scoped_ptr<PacketPipe> from_dest_pipe_;
508   base::WaitableEvent start_event_;
509 };
510
511 scoped_ptr<UDPProxy> UDPProxy::Create(
512     const net::IPEndPoint& local_port,
513     const net::IPEndPoint& destination,
514     scoped_ptr<PacketPipe> to_dest_pipe,
515     scoped_ptr<PacketPipe> from_dest_pipe,
516     net::NetLog* net_log) {
517   scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
518                                             destination,
519                                             to_dest_pipe.Pass(),
520                                             from_dest_pipe.Pass(),
521                                             net_log));
522   return ret.Pass();
523 }
524
525 }  // namespace test
526 }  // namespace cast
527 }  // namespace media