Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / media / cast / test / utility / udp_proxy.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <math.h>
6 #include <stdlib.h>
7 #include <vector>
8
9 #include "media/cast/test/utility/udp_proxy.h"
10
11 #include "base/logging.h"
12 #include "base/rand_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/thread.h"
15 #include "base/time/default_tick_clock.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/udp/udp_socket.h"
19
20 namespace media {
21 namespace cast {
22 namespace test {
23
24 const size_t kMaxPacketSize = 65536;
25
26 PacketPipe::PacketPipe() {}
27 PacketPipe::~PacketPipe() {}
28 void PacketPipe::InitOnIOThread(
29     const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
30     base::TickClock* clock) {
31   task_runner_ = task_runner;
32   clock_ = clock;
33   if (pipe_) {
34     pipe_->InitOnIOThread(task_runner, clock);
35   }
36 }
37 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
38   if (pipe_) {
39     pipe_->AppendToPipe(pipe.Pass());
40   } else {
41     pipe_ = pipe.Pass();
42   }
43 }
44
45 // Roughly emulates a buffer inside a device.
46 // If the buffer is full, packets are dropped.
47 // Packets are output at a maximum bandwidth.
48 class Buffer : public PacketPipe {
49  public:
50   Buffer(size_t buffer_size, double max_megabits_per_second)
51       : buffer_size_(0),
52         max_buffer_size_(buffer_size),
53         max_megabits_per_second_(max_megabits_per_second),
54         weak_factory_(this) {
55     CHECK_GT(max_buffer_size_, 0UL);
56     CHECK_GT(max_megabits_per_second, 0);
57   }
58
59   void Send(scoped_ptr<Packet> packet) override {
60     if (packet->size() + buffer_size_ <= max_buffer_size_) {
61       buffer_size_ += packet->size();
62       buffer_.push_back(linked_ptr<Packet>(packet.release()));
63       if (buffer_.size() == 1) {
64         Schedule();
65       }
66     }
67   }
68
69  private:
70   void Schedule() {
71     last_schedule_ = clock_->NowTicks();
72     double megabits = buffer_.front()->size() * 8 / 1000000.0;
73     double seconds = megabits / max_megabits_per_second_;
74     int64 microseconds = static_cast<int64>(seconds * 1E6);
75     task_runner_->PostDelayedTask(
76         FROM_HERE,
77         base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
78         base::TimeDelta::FromMicroseconds(microseconds));
79   }
80
81   void ProcessBuffer() {
82     int64 bytes_to_send = static_cast<int64>(
83         (clock_->NowTicks() - last_schedule_).InSecondsF() *
84         max_megabits_per_second_ * 1E6 / 8);
85     if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) {
86       bytes_to_send = buffer_.front()->size();
87     }
88     while (!buffer_.empty() &&
89            static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) {
90       CHECK(!buffer_.empty());
91       scoped_ptr<Packet> packet(buffer_.front().release());
92       bytes_to_send -= packet->size();
93       buffer_size_ -= packet->size();
94       buffer_.pop_front();
95       pipe_->Send(packet.Pass());
96     }
97     if (!buffer_.empty()) {
98       Schedule();
99     }
100   }
101
102   std::deque<linked_ptr<Packet> > buffer_;
103   base::TimeTicks last_schedule_;
104   size_t buffer_size_;
105   size_t max_buffer_size_;
106   double max_megabits_per_second_;  // megabits per second
107   base::WeakPtrFactory<Buffer> weak_factory_;
108 };
109
110 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
111   return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
112 }
113
114 class RandomDrop : public PacketPipe {
115  public:
116   RandomDrop(double drop_fraction)
117       : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
118
119   void Send(scoped_ptr<Packet> packet) override {
120     if (rand() > drop_fraction_) {
121       pipe_->Send(packet.Pass());
122     }
123   }
124
125  private:
126   int drop_fraction_;
127 };
128
129 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
130   return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
131 }
132
133 class SimpleDelayBase : public PacketPipe {
134  public:
135   SimpleDelayBase() : weak_factory_(this) {}
136   ~SimpleDelayBase() override {}
137
138   void Send(scoped_ptr<Packet> packet) override {
139     double seconds = GetDelay();
140     task_runner_->PostDelayedTask(
141         FROM_HERE,
142         base::Bind(&SimpleDelayBase::SendInternal,
143                    weak_factory_.GetWeakPtr(),
144                    base::Passed(&packet)),
145         base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
146   }
147  protected:
148   virtual double GetDelay() = 0;
149
150  private:
151   virtual void SendInternal(scoped_ptr<Packet> packet) {
152     pipe_->Send(packet.Pass());
153   }
154
155   base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
156 };
157
158 class ConstantDelay : public SimpleDelayBase {
159  public:
160   ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
161   double GetDelay() override { return delay_seconds_; }
162
163  private:
164   double delay_seconds_;
165 };
166
167 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
168   return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
169 }
170
171 class RandomUnsortedDelay : public SimpleDelayBase {
172  public:
173   RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
174
175   double GetDelay() override { return random_delay_ * base::RandDouble(); }
176
177  private:
178   double random_delay_;
179 };
180
181 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
182   return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
183 }
184
185 class DuplicateAndDelay : public RandomUnsortedDelay {
186  public:
187   DuplicateAndDelay(double delay_min,
188                     double random_delay) :
189       RandomUnsortedDelay(random_delay),
190       delay_min_(delay_min) {
191   }
192   void Send(scoped_ptr<Packet> packet) override {
193     pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get())));
194     RandomUnsortedDelay::Send(packet.Pass());
195   }
196   double GetDelay() override {
197     return RandomUnsortedDelay::GetDelay() + delay_min_;
198   }
199  private:
200   double delay_min_;
201 };
202
203 scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
204                                             double random_delay) {
205   return scoped_ptr<PacketPipe>(
206       new DuplicateAndDelay(delay_min, random_delay)).Pass();
207 }
208
209 class RandomSortedDelay : public PacketPipe {
210  public:
211   RandomSortedDelay(double random_delay,
212                     double extra_delay,
213                     double seconds_between_extra_delay)
214       : random_delay_(random_delay),
215         extra_delay_(extra_delay),
216         seconds_between_extra_delay_(seconds_between_extra_delay),
217         weak_factory_(this) {}
218
219   void Send(scoped_ptr<Packet> packet) override {
220     buffer_.push_back(linked_ptr<Packet>(packet.release()));
221     if (buffer_.size() == 1) {
222       next_send_ = std::max(
223           clock_->NowTicks() +
224           base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
225           next_send_);
226       ProcessBuffer();
227     }
228   }
229   void InitOnIOThread(
230       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
231       base::TickClock* clock) override {
232     PacketPipe::InitOnIOThread(task_runner, clock);
233     // As we start the stream, assume that we are in a random
234     // place between two extra delays, thus multiplier = 1.0;
235     ScheduleExtraDelay(1.0);
236   }
237
238  private:
239   void ScheduleExtraDelay(double mult) {
240     double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
241     int64 microseconds = static_cast<int64>(seconds * 1E6);
242     task_runner_->PostDelayedTask(
243         FROM_HERE,
244         base::Bind(&RandomSortedDelay::CauseExtraDelay,
245                    weak_factory_.GetWeakPtr()),
246         base::TimeDelta::FromMicroseconds(microseconds));
247   }
248
249   void CauseExtraDelay() {
250     next_send_ = std::max<base::TimeTicks>(
251         clock_->NowTicks() +
252         base::TimeDelta::FromMicroseconds(
253             static_cast<int64>(extra_delay_ * 1E6)),
254         next_send_);
255     // An extra delay just happened, wait up to seconds_between_extra_delay_*2
256     // before scheduling another one to make the average equal to
257     // seconds_between_extra_delay_.
258     ScheduleExtraDelay(2.0);
259   }
260
261   void ProcessBuffer() {
262     base::TimeTicks now = clock_->NowTicks();
263     while (!buffer_.empty() && next_send_ <= now) {
264       scoped_ptr<Packet> packet(buffer_.front().release());
265       pipe_->Send(packet.Pass());
266       buffer_.pop_front();
267
268       next_send_ += base::TimeDelta::FromSecondsD(
269           base::RandDouble() * random_delay_);
270     }
271
272     if (!buffer_.empty()) {
273       task_runner_->PostDelayedTask(
274           FROM_HERE,
275           base::Bind(&RandomSortedDelay::ProcessBuffer,
276                      weak_factory_.GetWeakPtr()),
277           next_send_ - now);
278     }
279   }
280
281   base::TimeTicks block_until_;
282   std::deque<linked_ptr<Packet> > buffer_;
283   double random_delay_;
284   double extra_delay_;
285   double seconds_between_extra_delay_;
286   base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
287   base::TimeTicks next_send_;
288 };
289
290 scoped_ptr<PacketPipe> NewRandomSortedDelay(
291     double random_delay,
292     double extra_delay,
293     double seconds_between_extra_delay) {
294   return scoped_ptr<PacketPipe>(
295              new RandomSortedDelay(
296                  random_delay, extra_delay, seconds_between_extra_delay))
297       .Pass();
298 }
299
300 class NetworkGlitchPipe : public PacketPipe {
301  public:
302   NetworkGlitchPipe(double average_work_time, double average_outage_time)
303       : works_(false),
304         max_work_time_(average_work_time * 2),
305         max_outage_time_(average_outage_time * 2),
306         weak_factory_(this) {}
307
308   void InitOnIOThread(
309       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
310       base::TickClock* clock) override {
311     PacketPipe::InitOnIOThread(task_runner, clock);
312     Flip();
313   }
314
315   void Send(scoped_ptr<Packet> packet) override {
316     if (works_) {
317       pipe_->Send(packet.Pass());
318     }
319   }
320
321  private:
322   void Flip() {
323     works_ = !works_;
324     double seconds = base::RandDouble() *
325         (works_ ? max_work_time_ : max_outage_time_);
326     int64 microseconds = static_cast<int64>(seconds * 1E6);
327     task_runner_->PostDelayedTask(
328         FROM_HERE,
329         base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
330         base::TimeDelta::FromMicroseconds(microseconds));
331   }
332
333   bool works_;
334   double max_work_time_;
335   double max_outage_time_;
336   base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
337 };
338
339 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
340                                             double average_outage_time) {
341   return scoped_ptr<PacketPipe>(
342              new NetworkGlitchPipe(average_work_time, average_outage_time))
343       .Pass();
344 }
345
346
347 // Internal buffer object for a client of the IPP model.
348 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
349  public:
350   InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
351                  size_t size)
352       : ipp_(ipp),
353         stored_size_(0),
354         stored_limit_(size),
355         clock_(NULL),
356         weak_factory_(this) {
357   }
358
359   void Send(scoped_ptr<Packet> packet) override {
360     // Drop if buffer is full.
361     if (stored_size_ >= stored_limit_)
362       return;
363     stored_size_ += packet->size();
364     buffer_.push_back(linked_ptr<Packet>(packet.release()));
365     buffer_time_.push_back(clock_->NowTicks());
366     DCHECK(buffer_.size() == buffer_time_.size());
367   }
368
369   void InitOnIOThread(
370       const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
371       base::TickClock* clock) override {
372     clock_ = clock;
373     if (ipp_)
374       ipp_->InitOnIOThread(task_runner, clock);
375     PacketPipe::InitOnIOThread(task_runner, clock);
376   }
377
378   void SendOnePacket() {
379     scoped_ptr<Packet> packet(buffer_.front().release());
380     stored_size_ -= packet->size();
381     buffer_.pop_front();
382     buffer_time_.pop_front();
383     pipe_->Send(packet.Pass());
384     DCHECK(buffer_.size() == buffer_time_.size());
385   }
386
387   bool Empty() const {
388     return buffer_.empty();
389   }
390
391   base::TimeTicks FirstPacketTime() const {
392     DCHECK(!buffer_time_.empty());
393     return buffer_time_.front();
394   }
395
396   base::WeakPtr<InternalBuffer> GetWeakPtr() {
397     return weak_factory_.GetWeakPtr();
398
399   }
400
401  private:
402   const base::WeakPtr<InterruptedPoissonProcess> ipp_;
403   size_t stored_size_;
404   const size_t stored_limit_;
405   std::deque<linked_ptr<Packet> > buffer_;
406   std::deque<base::TimeTicks> buffer_time_;
407   base::TickClock* clock_;
408   base::WeakPtrFactory<InternalBuffer> weak_factory_;
409
410   DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
411 };
412
413 InterruptedPoissonProcess::InterruptedPoissonProcess(
414     const std::vector<double>& average_rates,
415     double coef_burstiness,
416     double coef_variance,
417     uint32 rand_seed)
418     : clock_(NULL),
419       average_rates_(average_rates),
420       coef_burstiness_(coef_burstiness),
421       coef_variance_(coef_variance),
422       rate_index_(0),
423       on_state_(true),
424       weak_factory_(this) {
425   mt_rand_.init_genrand(rand_seed);
426   DCHECK(!average_rates.empty());
427   ComputeRates();
428 }
429
430 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
431 }
432
433 void InterruptedPoissonProcess::InitOnIOThread(
434     const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
435     base::TickClock* clock) {
436   // Already initialized and started.
437   if (task_runner_.get() && clock_)
438     return;
439   task_runner_ = task_runner;
440   clock_ = clock;
441   UpdateRates();
442   SwitchOn();
443   SendPacket();
444 }
445
446 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
447   scoped_ptr<InternalBuffer> buffer(
448       new InternalBuffer(weak_factory_.GetWeakPtr(), size));
449   send_buffers_.push_back(buffer->GetWeakPtr());
450   return buffer.Pass();
451 }
452
453 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
454   // Rate is per milliseconds.
455   // The time until next event is exponentially distributed to the
456   // inverse of |rate|.
457   return base::TimeDelta::FromMillisecondsD(
458       fabs(-log(1.0 - RandDouble()) / rate));
459 }
460
461 double InterruptedPoissonProcess::RandDouble() {
462   // Generate a 64-bits random number from MT19937 and then convert
463   // it to double.
464   uint64 rand = mt_rand_.genrand_int32();
465   rand <<= 32;
466   rand |= mt_rand_.genrand_int32();
467   return base::BitsToOpenEndedUnitInterval(rand);
468 }
469
470 void InterruptedPoissonProcess::ComputeRates() {
471   double avg_rate = average_rates_[rate_index_];
472
473   send_rate_ = avg_rate / coef_burstiness_;
474   switch_off_rate_ =
475       2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
476       coef_burstiness_ / (coef_variance_ - 1);
477   switch_on_rate_ =
478       2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
479 }
480
481 void InterruptedPoissonProcess::UpdateRates() {
482   ComputeRates();
483
484   // Rates are updated once per second.
485   rate_index_ = (rate_index_ + 1) % average_rates_.size();
486   task_runner_->PostDelayedTask(
487       FROM_HERE,
488       base::Bind(&InterruptedPoissonProcess::UpdateRates,
489                  weak_factory_.GetWeakPtr()),
490       base::TimeDelta::FromSeconds(1));
491 }
492
493 void InterruptedPoissonProcess::SwitchOff() {
494   on_state_ = false;
495   task_runner_->PostDelayedTask(
496       FROM_HERE,
497       base::Bind(&InterruptedPoissonProcess::SwitchOn,
498                  weak_factory_.GetWeakPtr()),
499       NextEvent(switch_on_rate_));
500 }
501
502 void InterruptedPoissonProcess::SwitchOn() {
503   on_state_ = true;
504   task_runner_->PostDelayedTask(
505       FROM_HERE,
506       base::Bind(&InterruptedPoissonProcess::SwitchOff,
507                  weak_factory_.GetWeakPtr()),
508       NextEvent(switch_off_rate_));
509 }
510
511 void InterruptedPoissonProcess::SendPacket() {
512   task_runner_->PostDelayedTask(
513       FROM_HERE,
514       base::Bind(&InterruptedPoissonProcess::SendPacket,
515                  weak_factory_.GetWeakPtr()),
516       NextEvent(send_rate_));
517
518   // If OFF then don't send.
519   if (!on_state_)
520     return;
521
522   // Find the earliest packet to send.
523   base::TimeTicks earliest_time;
524   for (size_t i = 0; i < send_buffers_.size(); ++i) {
525     if (!send_buffers_[i])
526       continue;
527     if (send_buffers_[i]->Empty())
528       continue;
529     if (earliest_time.is_null() ||
530         send_buffers_[i]->FirstPacketTime() < earliest_time)
531       earliest_time = send_buffers_[i]->FirstPacketTime();
532   }
533   for (size_t i = 0; i < send_buffers_.size(); ++i) {
534     if (!send_buffers_[i])
535       continue;
536     if (send_buffers_[i]->Empty())
537       continue;
538     if (send_buffers_[i]->FirstPacketTime() != earliest_time)
539       continue;
540     send_buffers_[i]->SendOnePacket();
541     break;
542   }
543 }
544
545 class UDPProxyImpl;
546
547 class PacketSender : public PacketPipe {
548  public:
549   PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
550       : udp_proxy_(udp_proxy), destination_(destination) {}
551   void Send(scoped_ptr<Packet> packet) override;
552   void AppendToPipe(scoped_ptr<PacketPipe> pipe) override { NOTREACHED(); }
553
554  private:
555   UDPProxyImpl* udp_proxy_;
556   const net::IPEndPoint* destination_;  // not owned
557 };
558
559 namespace {
560 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
561   if (*pipe) {
562     (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
563   } else {
564     pipe->reset(next);
565   }
566 }
567 }  // namespace
568
569 scoped_ptr<PacketPipe> GoodNetwork() {
570   // This represents the buffer on the sender.
571   scoped_ptr<PacketPipe> pipe;
572   BuildPipe(&pipe, new Buffer(2 << 20, 50));
573   BuildPipe(&pipe, new ConstantDelay(1E-3));
574   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
575   // This represents the buffer on the receiving device.
576   BuildPipe(&pipe, new Buffer(2 << 20, 50));
577   return pipe.Pass();
578 }
579
580 scoped_ptr<PacketPipe> WifiNetwork() {
581   // This represents the buffer on the sender.
582   scoped_ptr<PacketPipe> pipe;
583   BuildPipe(&pipe, new Buffer(256 << 10, 20));
584   BuildPipe(&pipe, new RandomDrop(0.005));
585   // This represents the buffer on the router.
586   BuildPipe(&pipe, new ConstantDelay(1E-3));
587   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
588   BuildPipe(&pipe, new Buffer(256 << 10, 20));
589   BuildPipe(&pipe, new ConstantDelay(1E-3));
590   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
591   BuildPipe(&pipe, new RandomDrop(0.005));
592   // This represents the buffer on the receiving device.
593   BuildPipe(&pipe, new Buffer(256 << 10, 20));
594   return pipe.Pass();
595 }
596
597 scoped_ptr<PacketPipe> BadNetwork() {
598   scoped_ptr<PacketPipe> pipe;
599   // This represents the buffer on the sender.
600   BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
601   BuildPipe(&pipe, new RandomDrop(0.05));  // 5% packet drop
602   BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
603   // This represents the buffer on the router.
604   BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 4mbit/s
605   BuildPipe(&pipe, new ConstantDelay(1E-3));
606   // Random 40ms every other second
607   //  BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
608   BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
609   // This represents the buffer on the receiving device.
610   BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 5mbit/s
611   return pipe.Pass();
612 }
613
614
615 scoped_ptr<PacketPipe> EvilNetwork() {
616   // This represents the buffer on the sender.
617   scoped_ptr<PacketPipe> pipe;
618   BuildPipe(&pipe, new Buffer(4 << 10, 5));  // 4 kb buf, 2mbit/s
619   // This represents the buffer on the router.
620   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
621   BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
622   BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
623   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
624   BuildPipe(&pipe, new ConstantDelay(1E-3));
625   BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
626   BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
627   // This represents the buffer on the receiving device.
628   BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
629   return pipe.Pass();
630 }
631
632 class UDPProxyImpl : public UDPProxy {
633  public:
634   UDPProxyImpl(const net::IPEndPoint& local_port,
635                const net::IPEndPoint& destination,
636                scoped_ptr<PacketPipe> to_dest_pipe,
637                scoped_ptr<PacketPipe> from_dest_pipe,
638                net::NetLog* net_log)
639       : local_port_(local_port),
640         destination_(destination),
641         destination_is_mutable_(destination.address().empty()),
642         proxy_thread_("media::cast::test::UdpProxy Thread"),
643         to_dest_pipe_(to_dest_pipe.Pass()),
644         from_dest_pipe_(from_dest_pipe.Pass()),
645         blocked_(false),
646         weak_factory_(this) {
647     proxy_thread_.StartWithOptions(
648         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
649     base::WaitableEvent start_event(false, false);
650     proxy_thread_.message_loop_proxy()->PostTask(
651         FROM_HERE,
652         base::Bind(&UDPProxyImpl::Start,
653                    base::Unretained(this),
654                    base::Unretained(&start_event),
655                    net_log));
656     start_event.Wait();
657   }
658
659   ~UDPProxyImpl() override {
660     base::WaitableEvent stop_event(false, false);
661     proxy_thread_.message_loop_proxy()->PostTask(
662         FROM_HERE,
663         base::Bind(&UDPProxyImpl::Stop,
664                    base::Unretained(this),
665                    base::Unretained(&stop_event)));
666     stop_event.Wait();
667     proxy_thread_.Stop();
668   }
669
670   void Send(scoped_ptr<Packet> packet,
671             const net::IPEndPoint& destination) {
672     if (blocked_) {
673       LOG(ERROR) << "Cannot write packet right now: blocked";
674       return;
675     }
676
677     VLOG(1) << "Sending packet, len = " << packet->size();
678     // We ignore all problems, callbacks and errors.
679     // If it didn't work we just drop the packet at and call it a day.
680     scoped_refptr<net::IOBuffer> buf =
681         new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
682     size_t buf_size = packet->size();
683     int result;
684     if (destination.address().empty()) {
685       VLOG(1) << "Destination has not been set yet.";
686       result = net::ERR_INVALID_ARGUMENT;
687     } else {
688       VLOG(1) << "Destination:" << destination.ToString();
689       result = socket_->SendTo(buf.get(),
690                                static_cast<int>(buf_size),
691                                destination,
692                                base::Bind(&UDPProxyImpl::AllowWrite,
693                                           weak_factory_.GetWeakPtr(),
694                                           buf,
695                                           base::Passed(&packet)));
696     }
697     if (result == net::ERR_IO_PENDING) {
698       blocked_ = true;
699     } else if (result < 0) {
700       LOG(ERROR) << "Failed to write packet.";
701     }
702   }
703
704  private:
705   void Start(base::WaitableEvent* start_event,
706              net::NetLog* net_log) {
707     socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
708                                      net::RandIntCallback(),
709                                      net_log,
710                                      net::NetLog::Source()));
711     BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
712     BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
713     to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
714                                   &tick_clock_);
715     from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
716                                     &tick_clock_);
717
718     VLOG(0) << "From:" << local_port_.ToString();
719     if (!destination_is_mutable_)
720       VLOG(0) << "To:" << destination_.ToString();
721
722     CHECK_GE(socket_->Bind(local_port_), 0);
723
724     start_event->Signal();
725     PollRead();
726   }
727
728   void Stop(base::WaitableEvent* stop_event) {
729     to_dest_pipe_.reset(NULL);
730     from_dest_pipe_.reset(NULL);
731     socket_.reset(NULL);
732     stop_event->Signal();
733   }
734
735   void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
736     DCHECK_NE(len, net::ERR_IO_PENDING);
737     VLOG(1) << "Got packet, len = " << len;
738     if (len < 0) {
739       LOG(WARNING) << "Socket read error: " << len;
740       return;
741     }
742     packet_->resize(len);
743     if (destination_is_mutable_ && set_destination_next_ &&
744         !(recv_address_ == return_address_) &&
745         !(recv_address_ == destination_)) {
746       destination_ = recv_address_;
747     }
748     if (recv_address_ == destination_) {
749       set_destination_next_ = false;
750       from_dest_pipe_->Send(packet_.Pass());
751     } else {
752       set_destination_next_ = true;
753       VLOG(1) << "Return address = " << recv_address_.ToString();
754       return_address_ = recv_address_;
755       to_dest_pipe_->Send(packet_.Pass());
756     }
757   }
758
759   void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
760     ProcessPacket(recv_buf, len);
761     PollRead();
762   }
763
764   void PollRead() {
765     while (true) {
766       packet_.reset(new Packet(kMaxPacketSize));
767       scoped_refptr<net::IOBuffer> recv_buf =
768           new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
769       int len = socket_->RecvFrom(
770           recv_buf.get(),
771           kMaxPacketSize,
772           &recv_address_,
773           base::Bind(
774               &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf));
775       if (len == net::ERR_IO_PENDING)
776         break;
777       ProcessPacket(recv_buf, len);
778     }
779   }
780
781   void AllowWrite(scoped_refptr<net::IOBuffer> buf,
782                   scoped_ptr<Packet> packet,
783                   int unused_len) {
784     DCHECK(blocked_);
785     blocked_ = false;
786   }
787
788   // Input
789   net::IPEndPoint local_port_;
790
791   net::IPEndPoint destination_;
792   bool destination_is_mutable_;
793
794   net::IPEndPoint return_address_;
795   bool set_destination_next_;
796
797   base::DefaultTickClock tick_clock_;
798   base::Thread proxy_thread_;
799   scoped_ptr<net::UDPSocket> socket_;
800   scoped_ptr<PacketPipe> to_dest_pipe_;
801   scoped_ptr<PacketPipe> from_dest_pipe_;
802
803   // For receiving.
804   net::IPEndPoint recv_address_;
805   scoped_ptr<Packet> packet_;
806
807   // For sending.
808   bool blocked_;
809
810   base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
811 };
812
813 void PacketSender::Send(scoped_ptr<Packet> packet) {
814   udp_proxy_->Send(packet.Pass(), *destination_);
815 }
816
817 scoped_ptr<UDPProxy> UDPProxy::Create(
818     const net::IPEndPoint& local_port,
819     const net::IPEndPoint& destination,
820     scoped_ptr<PacketPipe> to_dest_pipe,
821     scoped_ptr<PacketPipe> from_dest_pipe,
822     net::NetLog* net_log) {
823   scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
824                                             destination,
825                                             to_dest_pipe.Pass(),
826                                             from_dest_pipe.Pass(),
827                                             net_log));
828   return ret.Pass();
829 }
830
831 }  // namespace test
832 }  // namespace cast
833 }  // namespace media