Upstream version 8.37.180.0
[platform/framework/web/crosswalk.git] / src / media / cast / test / utility / udp_proxy.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stdlib.h>
6
7 #include "media/cast/test/utility/udp_proxy.h"
8
9 #include "base/logging.h"
10 #include "base/memory/linked_ptr.h"
11 #include "base/rand_util.h"
12 #include "base/synchronization/waitable_event.h"
13 #include "base/threading/thread.h"
14 #include "base/time/default_tick_clock.h"
15 #include "net/base/io_buffer.h"
16 #include "net/base/net_errors.h"
17 #include "net/udp/udp_socket.h"
18
19 namespace media {
20 namespace cast {
21 namespace test {
22
23 const size_t kMaxPacketSize = 65536;
24
25 PacketPipe::PacketPipe() {}
26 PacketPipe::~PacketPipe() {}
27 void PacketPipe::InitOnIOThread(
28     const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
29     base::TickClock* clock) {
30   task_runner_ = task_runner;
31   clock_ = clock;
32   if (pipe_) {
33     pipe_->InitOnIOThread(task_runner, clock);
34   }
35 }
36 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
37   if (pipe_) {
38     pipe_->AppendToPipe(pipe.Pass());
39   } else {
40     pipe_ = pipe.Pass();
41   }
42 }
43
44 // Roughly emulates a buffer inside a device.
45 // If the buffer is full, packets are dropped.
46 // Packets are output at a maximum bandwidth.
47 class Buffer : public PacketPipe {
48  public:
49   Buffer(size_t buffer_size, double max_megabits_per_second)
50       : buffer_size_(0),
51         max_buffer_size_(buffer_size),
52         max_megabits_per_second_(max_megabits_per_second),
53         weak_factory_(this) {
54     CHECK_GT(max_buffer_size_, 0UL);
55     CHECK_GT(max_megabits_per_second, 0);
56   }
57
58   virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
59     if (packet->size() + buffer_size_ <= max_buffer_size_) {
60       buffer_size_ += packet->size();
61       buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
62       if (buffer_.size() == 1) {
63         Schedule();
64       }
65     }
66   }
67
68  private:
69   void Schedule() {
70     double megabits = buffer_.front()->size() * 8 / 1000000.0;
71     double seconds = megabits / max_megabits_per_second_;
72     int64 microseconds = static_cast<int64>(seconds * 1E6);
73     task_runner_->PostDelayedTask(
74         FROM_HERE,
75         base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
76         base::TimeDelta::FromMicroseconds(microseconds));
77   }
78
79   void ProcessBuffer() {
80     CHECK(!buffer_.empty());
81     scoped_ptr<transport::Packet> packet(buffer_.front().release());
82     buffer_size_ -= packet->size();
83     buffer_.pop_front();
84     pipe_->Send(packet.Pass());
85     if (!buffer_.empty()) {
86       Schedule();
87     }
88   }
89
90   std::deque<linked_ptr<transport::Packet> > buffer_;
91   size_t buffer_size_;
92   size_t max_buffer_size_;
93   double max_megabits_per_second_;  // megabits per second
94   base::WeakPtrFactory<Buffer> weak_factory_;
95 };
96
97 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
98   return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
99 }
100
101 class RandomDrop : public PacketPipe {
102  public:
103   RandomDrop(double drop_fraction)
104       : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
105
106   virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
107     if (rand() > drop_fraction_) {
108       pipe_->Send(packet.Pass());
109     }
110   }
111
112  private:
113   int drop_fraction_;
114 };
115
116 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
117   return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
118 }
119
120 class SimpleDelayBase : public PacketPipe {
121  public:
122   SimpleDelayBase() : weak_factory_(this) {}
123   virtual ~SimpleDelayBase() {}
124
125   virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
126     double seconds = GetDelay();
127     task_runner_->PostDelayedTask(
128         FROM_HERE,
129         base::Bind(&SimpleDelayBase::SendInternal,
130                    weak_factory_.GetWeakPtr(),
131                    base::Passed(&packet)),
132         base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
133   }
134  protected:
135   virtual double GetDelay() = 0;
136
137  private:
138   virtual void SendInternal(scoped_ptr<transport::Packet> packet) {
139     pipe_->Send(packet.Pass());
140   }
141
142   base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
143 };
144
145 class ConstantDelay : public SimpleDelayBase {
146  public:
147   ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
148   virtual double GetDelay() OVERRIDE {
149     return delay_seconds_;
150   }
151
152  private:
153   double delay_seconds_;
154 };
155
156 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
157   return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
158 }
159
160 class RandomUnsortedDelay : public SimpleDelayBase {
161  public:
162   RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
163
164   virtual double GetDelay() OVERRIDE {
165     return random_delay_ * base::RandDouble();
166   }
167
168  private:
169   double random_delay_;
170 };
171
172 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
173   return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
174 }
175
176
177 class RandomSortedDelay : public PacketPipe {
178  public:
179   RandomSortedDelay(double random_delay,
180                     double extra_delay,
181                     double seconds_between_extra_delay)
182       : random_delay_(random_delay),
183         extra_delay_(extra_delay),
184         seconds_between_extra_delay_(seconds_between_extra_delay),
185         weak_factory_(this) {}
186
187   virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
188     buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
189     if (buffer_.size() == 1) {
190       Schedule();
191     }
192   }
193   virtual void InitOnIOThread(
194       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
195       base::TickClock* clock) OVERRIDE {
196     PacketPipe::InitOnIOThread(task_runner, clock);
197     // As we start the stream, assume that we are in a random
198     // place between two extra delays, thus multiplier = 1.0;
199     ScheduleExtraDelay(1.0);
200   }
201
202  private:
203   void ScheduleExtraDelay(double mult) {
204     double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
205     int64 microseconds = static_cast<int64>(seconds * 1E6);
206     task_runner_->PostDelayedTask(
207         FROM_HERE,
208         base::Bind(&RandomSortedDelay::CauseExtraDelay,
209                    weak_factory_.GetWeakPtr()),
210         base::TimeDelta::FromMicroseconds(microseconds));
211   }
212
213   void CauseExtraDelay() {
214     block_until_ = clock_->NowTicks() +
215         base::TimeDelta::FromMicroseconds(
216             static_cast<int64>(extra_delay_ * 1E6));
217     // An extra delay just happened, wait up to seconds_between_extra_delay_*2
218     // before scheduling another one to make the average equal to
219     // seconds_between_extra_delay_.
220     ScheduleExtraDelay(2.0);
221   }
222
223   void Schedule() {
224     double seconds = base::RandDouble() * random_delay_;
225     base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
226     base::TimeDelta delay_time =
227         base::TimeDelta::FromMicroseconds(
228             static_cast<int64>(seconds * 1E6));
229     if (block_time > delay_time) {
230       block_time = delay_time;
231     }
232
233     task_runner_->PostDelayedTask(FROM_HERE,
234                                   base::Bind(&RandomSortedDelay::ProcessBuffer,
235                                              weak_factory_.GetWeakPtr()),
236                                   delay_time);
237   }
238
239   void ProcessBuffer() {
240     CHECK(!buffer_.empty());
241     scoped_ptr<transport::Packet> packet(buffer_.front().release());
242     pipe_->Send(packet.Pass());
243     buffer_.pop_front();
244     if (!buffer_.empty()) {
245       Schedule();
246     }
247   }
248
249   base::TimeTicks block_until_;
250   std::deque<linked_ptr<transport::Packet> > buffer_;
251   double random_delay_;
252   double extra_delay_;
253   double seconds_between_extra_delay_;
254   base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
255 };
256
257 scoped_ptr<PacketPipe> NewRandomSortedDelay(
258     double random_delay,
259     double extra_delay,
260     double seconds_between_extra_delay) {
261   return scoped_ptr<PacketPipe>(
262              new RandomSortedDelay(
263                  random_delay, extra_delay, seconds_between_extra_delay))
264       .Pass();
265 }
266
267 class NetworkGlitchPipe : public PacketPipe {
268  public:
269   NetworkGlitchPipe(double average_work_time, double average_outage_time)
270       : works_(false),
271         max_work_time_(average_work_time * 2),
272         max_outage_time_(average_outage_time * 2),
273         weak_factory_(this) {}
274
275   virtual void InitOnIOThread(
276       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
277       base::TickClock* clock) OVERRIDE {
278     PacketPipe::InitOnIOThread(task_runner, clock);
279     Flip();
280   }
281
282   virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
283     if (works_) {
284       pipe_->Send(packet.Pass());
285     }
286   }
287
288  private:
289   void Flip() {
290     works_ = !works_;
291     double seconds = base::RandDouble() *
292         (works_ ? max_work_time_ : max_outage_time_);
293     int64 microseconds = static_cast<int64>(seconds * 1E6);
294     task_runner_->PostDelayedTask(
295         FROM_HERE,
296         base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
297         base::TimeDelta::FromMicroseconds(microseconds));
298   }
299
300   bool works_;
301   double max_work_time_;
302   double max_outage_time_;
303   base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
304 };
305
306 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
307                                             double average_outage_time) {
308   return scoped_ptr<PacketPipe>(
309              new NetworkGlitchPipe(average_work_time, average_outage_time))
310       .Pass();
311 }
312
313 class UDPProxyImpl;
314
315 class PacketSender : public PacketPipe {
316  public:
317   PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
318       : udp_proxy_(udp_proxy), destination_(destination) {}
319   virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE;
320   virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
321     NOTREACHED();
322   }
323
324  private:
325   UDPProxyImpl* udp_proxy_;
326   const net::IPEndPoint* destination_;  // not owned
327 };
328
329 namespace {
330 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
331   if (*pipe) {
332     (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
333   } else {
334     pipe->reset(next);
335   }
336 }
337 }  // namespace
338
339 scoped_ptr<PacketPipe> WifiNetwork() {
340   // This represents the buffer on the sender.
341   scoped_ptr<PacketPipe> pipe;
342   BuildPipe(&pipe, new Buffer(256 << 10, 20));
343   BuildPipe(&pipe, new RandomDrop(0.005));
344   // This represents the buffer on the router.
345   BuildPipe(&pipe, new ConstantDelay(1E-3));
346   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
347   BuildPipe(&pipe, new Buffer(256 << 10, 20));
348   BuildPipe(&pipe, new ConstantDelay(1E-3));
349   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
350   BuildPipe(&pipe, new RandomDrop(0.005));
351   // This represents the buffer on the receiving device.
352   BuildPipe(&pipe, new Buffer(256 << 10, 20));
353   return pipe.Pass();
354 }
355
356 scoped_ptr<PacketPipe> BadNetwork() {
357   scoped_ptr<PacketPipe> pipe;
358   // This represents the buffer on the sender.
359   BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
360   BuildPipe(&pipe, new RandomDrop(0.05));  // 5% packet drop
361   BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
362   // This represents the buffer on the router.
363   BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 4mbit/s
364   BuildPipe(&pipe, new ConstantDelay(1E-3));
365   // Random 40ms every other second
366   //  BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
367   BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
368   // This represents the buffer on the receiving device.
369   BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 5mbit/s
370   return pipe.Pass();
371 }
372
373
374 scoped_ptr<PacketPipe> EvilNetwork() {
375   // This represents the buffer on the sender.
376   scoped_ptr<PacketPipe> pipe;
377   BuildPipe(&pipe, new Buffer(4 << 10, 5));  // 4 kb buf, 2mbit/s
378   // This represents the buffer on the router.
379   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
380   BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
381   BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
382   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
383   BuildPipe(&pipe, new ConstantDelay(1E-3));
384   BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
385   BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
386   // This represents the buffer on the receiving device.
387   BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
388   return pipe.Pass();
389 }
390
391 class UDPProxyImpl : public UDPProxy {
392  public:
393   UDPProxyImpl(const net::IPEndPoint& local_port,
394                const net::IPEndPoint& destination,
395                scoped_ptr<PacketPipe> to_dest_pipe,
396                scoped_ptr<PacketPipe> from_dest_pipe,
397                net::NetLog* net_log)
398       : local_port_(local_port),
399         destination_(destination),
400         destination_is_mutable_(destination.address().empty()),
401         proxy_thread_("media::cast::test::UdpProxy Thread"),
402         to_dest_pipe_(to_dest_pipe.Pass()),
403         from_dest_pipe_(from_dest_pipe.Pass()),
404         blocked_(false),
405         weak_factory_(this) {
406     proxy_thread_.StartWithOptions(
407         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
408     base::WaitableEvent start_event(false, false);
409     proxy_thread_.message_loop_proxy()->PostTask(
410         FROM_HERE,
411         base::Bind(&UDPProxyImpl::Start,
412                    base::Unretained(this),
413                    base::Unretained(&start_event),
414                    net_log));
415     start_event.Wait();
416   }
417
418   virtual ~UDPProxyImpl() {
419     base::WaitableEvent stop_event(false, false);
420     proxy_thread_.message_loop_proxy()->PostTask(
421         FROM_HERE,
422         base::Bind(&UDPProxyImpl::Stop,
423                    base::Unretained(this),
424                    base::Unretained(&stop_event)));
425     stop_event.Wait();
426     proxy_thread_.Stop();
427   }
428
429   void Send(scoped_ptr<transport::Packet> packet,
430             const net::IPEndPoint& destination) {
431     if (blocked_) {
432       LOG(ERROR) << "Cannot write packet right now: blocked";
433       return;
434     }
435
436     VLOG(1) << "Sending packet, len = " << packet->size();
437     // We ignore all problems, callbacks and errors.
438     // If it didn't work we just drop the packet at and call it a day.
439     scoped_refptr<net::IOBuffer> buf =
440         new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
441     size_t buf_size = packet->size();
442     int result;
443     if (destination.address().empty()) {
444       VLOG(1) << "Destination has not been set yet.";
445       result = net::ERR_INVALID_ARGUMENT;
446     } else {
447       VLOG(1) << "Destination:" << destination.ToString();
448       result = socket_->SendTo(buf,
449                                static_cast<int>(buf_size),
450                                destination,
451                                base::Bind(&UDPProxyImpl::AllowWrite,
452                                           weak_factory_.GetWeakPtr(),
453                                           buf,
454                                           base::Passed(&packet)));
455     }
456     if (result == net::ERR_IO_PENDING) {
457       blocked_ = true;
458     } else if (result < 0) {
459       LOG(ERROR) << "Failed to write packet.";
460     }
461   }
462
463  private:
464   void Start(base::WaitableEvent* start_event,
465              net::NetLog* net_log) {
466     socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
467                                      net::RandIntCallback(),
468                                      net_log,
469                                      net::NetLog::Source()));
470     BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
471     BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
472     to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
473                                   &tick_clock_);
474     from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
475                                     &tick_clock_);
476
477     VLOG(0) << "From:" << local_port_.ToString();
478     if (!destination_is_mutable_)
479       VLOG(0) << "To:" << destination_.ToString();
480
481     CHECK_GE(socket_->Bind(local_port_), 0);
482
483     start_event->Signal();
484     PollRead();
485   }
486
487   void Stop(base::WaitableEvent* stop_event) {
488     to_dest_pipe_.reset(NULL);
489     from_dest_pipe_.reset(NULL);
490     socket_.reset(NULL);
491     stop_event->Signal();
492   }
493
494   void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
495     DCHECK_NE(len, net::ERR_IO_PENDING);
496     VLOG(1) << "Got packet, len = " << len;
497     if (len < 0) {
498       LOG(WARNING) << "Socket read error: " << len;
499       return;
500     }
501     packet_->resize(len);
502     if (destination_is_mutable_ && set_destination_next_ &&
503         !(recv_address_ == return_address_) &&
504         !(recv_address_ == destination_)) {
505       destination_ = recv_address_;
506     }
507     if (recv_address_ == destination_) {
508       set_destination_next_ = false;
509       from_dest_pipe_->Send(packet_.Pass());
510     } else {
511       set_destination_next_ = true;
512       VLOG(1) << "Return address = " << recv_address_.ToString();
513       return_address_ = recv_address_;
514       to_dest_pipe_->Send(packet_.Pass());
515     }
516   }
517
518   void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
519     ProcessPacket(recv_buf, len);
520     PollRead();
521   }
522
523   void PollRead() {
524     while (true) {
525       packet_.reset(new transport::Packet(kMaxPacketSize));
526       scoped_refptr<net::IOBuffer> recv_buf =
527           new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
528       int len = socket_->RecvFrom(
529           recv_buf,
530           kMaxPacketSize,
531           &recv_address_,
532           base::Bind(&UDPProxyImpl::ReadCallback,
533                      base::Unretained(this),
534                      recv_buf));
535       if (len == net::ERR_IO_PENDING)
536         break;
537       ProcessPacket(recv_buf, len);
538     }
539   }
540
541   void AllowWrite(scoped_refptr<net::IOBuffer> buf,
542                   scoped_ptr<transport::Packet> packet,
543                   int unused_len) {
544     DCHECK(blocked_);
545     blocked_ = false;
546   }
547
548   // Input
549   net::IPEndPoint local_port_;
550
551   net::IPEndPoint destination_;
552   bool destination_is_mutable_;
553
554   net::IPEndPoint return_address_;
555   bool set_destination_next_;
556
557   base::DefaultTickClock tick_clock_;
558   base::Thread proxy_thread_;
559   scoped_ptr<net::UDPSocket> socket_;
560   scoped_ptr<PacketPipe> to_dest_pipe_;
561   scoped_ptr<PacketPipe> from_dest_pipe_;
562
563   // For receiving.
564   net::IPEndPoint recv_address_;
565   scoped_ptr<transport::Packet> packet_;
566
567   // For sending.
568   bool blocked_;
569
570   base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
571 };
572
573 void PacketSender::Send(scoped_ptr<transport::Packet> packet) {
574   udp_proxy_->Send(packet.Pass(), *destination_);
575 }
576
577 scoped_ptr<UDPProxy> UDPProxy::Create(
578     const net::IPEndPoint& local_port,
579     const net::IPEndPoint& destination,
580     scoped_ptr<PacketPipe> to_dest_pipe,
581     scoped_ptr<PacketPipe> from_dest_pipe,
582     net::NetLog* net_log) {
583   scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
584                                             destination,
585                                             to_dest_pipe.Pass(),
586                                             from_dest_pipe.Pass(),
587                                             net_log));
588   return ret.Pass();
589 }
590
591 }  // namespace test
592 }  // namespace cast
593 }  // namespace media