Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / media / cast / test / utility / udp_proxy.cc
index 1132815..95640a3 100644 (file)
@@ -2,13 +2,17 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include <math.h>
+#include <stdlib.h>
+#include <vector>
+
 #include "media/cast/test/utility/udp_proxy.h"
 
 #include "base/logging.h"
-#include "base/memory/linked_ptr.h"
 #include "base/rand_util.h"
 #include "base/synchronization/waitable_event.h"
 #include "base/threading/thread.h"
+#include "base/time/default_tick_clock.h"
 #include "net/base/io_buffer.h"
 #include "net/base/net_errors.h"
 #include "net/udp/udp_socket.h"
@@ -22,10 +26,12 @@ const size_t kMaxPacketSize = 65536;
 PacketPipe::PacketPipe() {}
 PacketPipe::~PacketPipe() {}
 void PacketPipe::InitOnIOThread(
-    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) {
+    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+    base::TickClock* clock) {
   task_runner_ = task_runner;
+  clock_ = clock;
   if (pipe_) {
-    pipe_->InitOnIOThread(task_runner);
+    pipe_->InitOnIOThread(task_runner, clock);
   }
 }
 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
@@ -45,12 +51,15 @@ class Buffer : public PacketPipe {
       : buffer_size_(0),
         max_buffer_size_(buffer_size),
         max_megabits_per_second_(max_megabits_per_second),
-        weak_factory_(this) {}
+        weak_factory_(this) {
+    CHECK_GT(max_buffer_size_, 0UL);
+    CHECK_GT(max_megabits_per_second, 0);
+  }
 
-  virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
     if (packet->size() + buffer_size_ <= max_buffer_size_) {
       buffer_size_ += packet->size();
-      buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
+      buffer_.push_back(linked_ptr<Packet>(packet.release()));
       if (buffer_.size() == 1) {
         Schedule();
       }
@@ -59,6 +68,7 @@ class Buffer : public PacketPipe {
 
  private:
   void Schedule() {
+    last_schedule_ = clock_->NowTicks();
     double megabits = buffer_.front()->size() * 8 / 1000000.0;
     double seconds = megabits / max_megabits_per_second_;
     int64 microseconds = static_cast<int64>(seconds * 1E6);
@@ -69,17 +79,28 @@ class Buffer : public PacketPipe {
   }
 
   void ProcessBuffer() {
-    CHECK(!buffer_.empty());
-    scoped_ptr<transport::Packet> packet(buffer_.front().release());
-    buffer_size_ -= packet->size();
-    buffer_.pop_front();
-    pipe_->Send(packet.Pass());
+    int64 bytes_to_send = static_cast<int64>(
+        (clock_->NowTicks() - last_schedule_).InSecondsF() *
+        max_megabits_per_second_ * 1E6 / 8);
+    if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) {
+      bytes_to_send = buffer_.front()->size();
+    }
+    while (!buffer_.empty() &&
+           static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) {
+      CHECK(!buffer_.empty());
+      scoped_ptr<Packet> packet(buffer_.front().release());
+      bytes_to_send -= packet->size();
+      buffer_size_ -= packet->size();
+      buffer_.pop_front();
+      pipe_->Send(packet.Pass());
+    }
     if (!buffer_.empty()) {
       Schedule();
     }
   }
 
-  std::deque<linked_ptr<transport::Packet> > buffer_;
+  std::deque<linked_ptr<Packet> > buffer_;
+  base::TimeTicks last_schedule_;
   size_t buffer_size_;
   size_t max_buffer_size_;
   double max_megabits_per_second_;  // megabits per second
@@ -92,17 +113,17 @@ scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
 
 class RandomDrop : public PacketPipe {
  public:
-  RandomDrop(double drop_fraction) : drop_fraction_(drop_fraction) {
-  }
+  RandomDrop(double drop_fraction)
+      : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
 
-  virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
-    if (base::RandDouble() >= drop_fraction_) {
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
+    if (rand() > drop_fraction_) {
       pipe_->Send(packet.Pass());
     }
   }
 
  private:
-  double drop_fraction_;
+  int drop_fraction_;
 };
 
 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
@@ -114,7 +135,7 @@ class SimpleDelayBase : public PacketPipe {
   SimpleDelayBase() : weak_factory_(this) {}
   virtual ~SimpleDelayBase() {}
 
-  virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
     double seconds = GetDelay();
     task_runner_->PostDelayedTask(
         FROM_HERE,
@@ -127,7 +148,7 @@ class SimpleDelayBase : public PacketPipe {
   virtual double GetDelay() = 0;
 
  private:
-  virtual void SendInternal(scoped_ptr<transport::Packet> packet) {
+  virtual void SendInternal(scoped_ptr<Packet> packet) {
     pipe_->Send(packet.Pass());
   }
 
@@ -165,6 +186,29 @@ scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
   return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
 }
 
+class DuplicateAndDelay : public RandomUnsortedDelay {
+ public:
+  DuplicateAndDelay(double delay_min,
+                    double random_delay) :
+      RandomUnsortedDelay(random_delay),
+      delay_min_(delay_min) {
+  }
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
+    pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get())));
+    RandomUnsortedDelay::Send(packet.Pass());
+  }
+  virtual double GetDelay() OVERRIDE {
+    return RandomUnsortedDelay::GetDelay() + delay_min_;
+  }
+ private:
+  double delay_min_;
+};
+
+scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
+                                            double random_delay) {
+  return scoped_ptr<PacketPipe>(
+      new DuplicateAndDelay(delay_min, random_delay)).Pass();
+}
 
 class RandomSortedDelay : public PacketPipe {
  public:
@@ -176,15 +220,20 @@ class RandomSortedDelay : public PacketPipe {
         seconds_between_extra_delay_(seconds_between_extra_delay),
         weak_factory_(this) {}
 
-  virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
-    buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
+    buffer_.push_back(linked_ptr<Packet>(packet.release()));
     if (buffer_.size() == 1) {
-      Schedule();
+      next_send_ = std::max(
+          clock_->NowTicks() +
+          base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
+          next_send_);
+      ProcessBuffer();
     }
   }
   virtual void InitOnIOThread(
-      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) OVERRIDE {
-    PacketPipe::InitOnIOThread(task_runner);
+      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+      base::TickClock* clock) OVERRIDE {
+    PacketPipe::InitOnIOThread(task_runner, clock);
     // As we start the stream, assume that we are in a random
     // place between two extra delays, thus multiplier = 1.0;
     ScheduleExtraDelay(1.0);
@@ -202,47 +251,44 @@ class RandomSortedDelay : public PacketPipe {
   }
 
   void CauseExtraDelay() {
-    block_until_ = base::TimeTicks::Now() +
+    next_send_ = std::max<base::TimeTicks>(
+        clock_->NowTicks() +
         base::TimeDelta::FromMicroseconds(
-            static_cast<int64>(extra_delay_ * 1E6));
+            static_cast<int64>(extra_delay_ * 1E6)),
+        next_send_);
     // An extra delay just happened, wait up to seconds_between_extra_delay_*2
     // before scheduling another one to make the average equal to
     // seconds_between_extra_delay_.
     ScheduleExtraDelay(2.0);
   }
 
-  void Schedule() {
-    double seconds = base::RandDouble() * random_delay_;
-    base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
-    base::TimeDelta delay_time =
-        base::TimeDelta::FromMicroseconds(
-            static_cast<int64>(seconds * 1E6));
-    if (block_time > delay_time) {
-      block_time = delay_time;
-    }
+  void ProcessBuffer() {
+    base::TimeTicks now = clock_->NowTicks();
+    while (!buffer_.empty() && next_send_ <= now) {
+      scoped_ptr<Packet> packet(buffer_.front().release());
+      pipe_->Send(packet.Pass());
+      buffer_.pop_front();
 
-    task_runner_->PostDelayedTask(FROM_HERE,
-                                  base::Bind(&RandomSortedDelay::ProcessBuffer,
-                                             weak_factory_.GetWeakPtr()),
-                                  delay_time);
-  }
+      next_send_ += base::TimeDelta::FromSecondsD(
+          base::RandDouble() * random_delay_);
+    }
 
-  void ProcessBuffer() {
-    CHECK(!buffer_.empty());
-    scoped_ptr<transport::Packet> packet(buffer_.front().release());
-    pipe_->Send(packet.Pass());
-    buffer_.pop_front();
     if (!buffer_.empty()) {
-      Schedule();
+      task_runner_->PostDelayedTask(
+          FROM_HERE,
+          base::Bind(&RandomSortedDelay::ProcessBuffer,
+                     weak_factory_.GetWeakPtr()),
+          next_send_ - now);
     }
   }
 
   base::TimeTicks block_until_;
-  std::deque<linked_ptr<transport::Packet> > buffer_;
+  std::deque<linked_ptr<Packet> > buffer_;
   double random_delay_;
   double extra_delay_;
   double seconds_between_extra_delay_;
   base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
+  base::TimeTicks next_send_;
 };
 
 scoped_ptr<PacketPipe> NewRandomSortedDelay(
@@ -264,12 +310,13 @@ class NetworkGlitchPipe : public PacketPipe {
         weak_factory_(this) {}
 
   virtual void InitOnIOThread(
-      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) OVERRIDE {
-    PacketPipe::InitOnIOThread(task_runner);
+      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+      base::TickClock* clock) OVERRIDE {
+    PacketPipe::InitOnIOThread(task_runner, clock);
     Flip();
   }
 
-  virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
     if (works_) {
       pipe_->Send(packet.Pass());
     }
@@ -300,62 +347,219 @@ scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
       .Pass();
 }
 
-class PacketSender : public PacketPipe {
+
+// Internal buffer object for a client of the IPP model.
+class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
  public:
-  PacketSender(net::UDPSocket* udp_socket,
-               const net::IPEndPoint* destination) :
-      blocked_(false),
-      udp_socket_(udp_socket),
-      destination_(destination),
-      weak_factory_(this) {
-  }
-  virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
-    if (blocked_) {
-      LOG(ERROR) << "Cannot write packet right now: blocked";
+  InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
+                 size_t size)
+      : ipp_(ipp),
+        stored_size_(0),
+        stored_limit_(size),
+        clock_(NULL),
+        weak_factory_(this) {
+  }
+
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE {
+    // Drop if buffer is full.
+    if (stored_size_ >= stored_limit_)
       return;
-    }
+    stored_size_ += packet->size();
+    buffer_.push_back(linked_ptr<Packet>(packet.release()));
+    buffer_time_.push_back(clock_->NowTicks());
+    DCHECK(buffer_.size() == buffer_time_.size());
+  }
 
-    VLOG(1) << "Sending packet, len = " << packet->size();
-    // We ignore all problems, callbacks and errors.
-    // If it didn't work we just drop the packet at and call it a day.
-    scoped_refptr<net::IOBuffer> buf =
-        new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
-    size_t buf_size = packet->size();
-    int result;
-    if (destination_->address().empty()) {
-      VLOG(1) << "Destination has not been set yet.";
-      result = net::ERR_INVALID_ARGUMENT;
-    } else {
-      VLOG(1) << "Destination:" << destination_->ToString();
-      result = udp_socket_->SendTo(buf,
-                                   static_cast<int>(buf_size),
-                                   *destination_,
-                                   base::Bind(&PacketSender::AllowWrite,
-                                              weak_factory_.GetWeakPtr(),
-                                              buf,
-                                              base::Passed(&packet)));
-    }
-    if (result == net::ERR_IO_PENDING) {
-      blocked_ = true;
-    } else if (result < 0) {
-      LOG(ERROR) << "Failed to write packet.";
-    }
+  virtual void InitOnIOThread(
+      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+      base::TickClock* clock) OVERRIDE {
+    clock_ = clock;
+    if (ipp_)
+      ipp_->InitOnIOThread(task_runner, clock);
+    PacketPipe::InitOnIOThread(task_runner, clock);
+  }
+
+  void SendOnePacket() {
+    scoped_ptr<Packet> packet(buffer_.front().release());
+    stored_size_ -= packet->size();
+    buffer_.pop_front();
+    buffer_time_.pop_front();
+    pipe_->Send(packet.Pass());
+    DCHECK(buffer_.size() == buffer_time_.size());
+  }
+
+  bool Empty() const {
+    return buffer_.empty();
+  }
+
+  base::TimeTicks FirstPacketTime() const {
+    DCHECK(!buffer_time_.empty());
+    return buffer_time_.front();
+  }
+
+  base::WeakPtr<InternalBuffer> GetWeakPtr() {
+    return weak_factory_.GetWeakPtr();
+
+  }
+
+ private:
+  const base::WeakPtr<InterruptedPoissonProcess> ipp_;
+  size_t stored_size_;
+  const size_t stored_limit_;
+  std::deque<linked_ptr<Packet> > buffer_;
+  std::deque<base::TimeTicks> buffer_time_;
+  base::TickClock* clock_;
+  base::WeakPtrFactory<InternalBuffer> weak_factory_;
+
+  DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
+};
+
+InterruptedPoissonProcess::InterruptedPoissonProcess(
+    const std::vector<double>& average_rates,
+    double coef_burstiness,
+    double coef_variance,
+    uint32 rand_seed)
+    : clock_(NULL),
+      average_rates_(average_rates),
+      coef_burstiness_(coef_burstiness),
+      coef_variance_(coef_variance),
+      rate_index_(0),
+      on_state_(true),
+      weak_factory_(this) {
+  mt_rand_.init_genrand(rand_seed);
+  DCHECK(!average_rates.empty());
+  ComputeRates();
+}
+
+InterruptedPoissonProcess::~InterruptedPoissonProcess() {
+}
+
+void InterruptedPoissonProcess::InitOnIOThread(
+    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
+    base::TickClock* clock) {
+  // Already initialized and started.
+  if (task_runner_.get() && clock_)
+    return;
+  task_runner_ = task_runner;
+  clock_ = clock;
+  UpdateRates();
+  SwitchOn();
+  SendPacket();
+}
+
+scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
+  scoped_ptr<InternalBuffer> buffer(
+      new InternalBuffer(weak_factory_.GetWeakPtr(), size));
+  send_buffers_.push_back(buffer->GetWeakPtr());
+  return buffer.PassAs<PacketPipe>();
+}
+
+base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
+  // Rate is per milliseconds.
+  // The time until next event is exponentially distributed to the
+  // inverse of |rate|.
+  return base::TimeDelta::FromMillisecondsD(
+      fabs(-log(1.0 - RandDouble()) / rate));
+}
+
+double InterruptedPoissonProcess::RandDouble() {
+  // Generate a 64-bits random number from MT19937 and then convert
+  // it to double.
+  uint64 rand = mt_rand_.genrand_int32();
+  rand <<= 32;
+  rand |= mt_rand_.genrand_int32();
+  return base::BitsToOpenEndedUnitInterval(rand);
+}
+
+void InterruptedPoissonProcess::ComputeRates() {
+  double avg_rate = average_rates_[rate_index_];
+
+  send_rate_ = avg_rate / coef_burstiness_;
+  switch_off_rate_ =
+      2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
+      coef_burstiness_ / (coef_variance_ - 1);
+  switch_on_rate_ =
+      2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
+}
+
+void InterruptedPoissonProcess::UpdateRates() {
+  ComputeRates();
+
+  // Rates are updated once per second.
+  rate_index_ = (rate_index_ + 1) % average_rates_.size();
+  task_runner_->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&InterruptedPoissonProcess::UpdateRates,
+                 weak_factory_.GetWeakPtr()),
+      base::TimeDelta::FromSeconds(1));
+}
+
+void InterruptedPoissonProcess::SwitchOff() {
+  on_state_ = false;
+  task_runner_->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&InterruptedPoissonProcess::SwitchOn,
+                 weak_factory_.GetWeakPtr()),
+      NextEvent(switch_on_rate_));
+}
+
+void InterruptedPoissonProcess::SwitchOn() {
+  on_state_ = true;
+  task_runner_->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&InterruptedPoissonProcess::SwitchOff,
+                 weak_factory_.GetWeakPtr()),
+      NextEvent(switch_off_rate_));
+}
+
+void InterruptedPoissonProcess::SendPacket() {
+  task_runner_->PostDelayedTask(
+      FROM_HERE,
+      base::Bind(&InterruptedPoissonProcess::SendPacket,
+                 weak_factory_.GetWeakPtr()),
+      NextEvent(send_rate_));
+
+  // If OFF then don't send.
+  if (!on_state_)
+    return;
+
+  // Find the earliest packet to send.
+  base::TimeTicks earliest_time;
+  for (size_t i = 0; i < send_buffers_.size(); ++i) {
+    if (!send_buffers_[i])
+      continue;
+    if (send_buffers_[i]->Empty())
+      continue;
+    if (earliest_time.is_null() ||
+        send_buffers_[i]->FirstPacketTime() < earliest_time)
+      earliest_time = send_buffers_[i]->FirstPacketTime();
+  }
+  for (size_t i = 0; i < send_buffers_.size(); ++i) {
+    if (!send_buffers_[i])
+      continue;
+    if (send_buffers_[i]->Empty())
+      continue;
+    if (send_buffers_[i]->FirstPacketTime() != earliest_time)
+      continue;
+    send_buffers_[i]->SendOnePacket();
+    break;
   }
+}
+
+class UDPProxyImpl;
+
+class PacketSender : public PacketPipe {
+ public:
+  PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
+      : udp_proxy_(udp_proxy), destination_(destination) {}
+  virtual void Send(scoped_ptr<Packet> packet) OVERRIDE;
   virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
     NOTREACHED();
   }
 
  private:
-  void AllowWrite(scoped_refptr<net::IOBuffer> buf,
-                  scoped_ptr<transport::Packet> packet,
-                  int unused_len) {
-    DCHECK(blocked_);
-    blocked_ = false;
-  }
-  bool blocked_;
-  net::UDPSocket* udp_socket_;
+  UDPProxyImpl* udp_proxy_;
   const net::IPEndPoint* destination_;  // not owned
-  base::WeakPtrFactory<PacketSender> weak_factory_;
 };
 
 namespace {
@@ -368,37 +572,48 @@ void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
 }
 }  // namespace
 
+scoped_ptr<PacketPipe> GoodNetwork() {
+  // This represents the buffer on the sender.
+  scoped_ptr<PacketPipe> pipe;
+  BuildPipe(&pipe, new Buffer(2 << 20, 50));
+  BuildPipe(&pipe, new ConstantDelay(1E-3));
+  BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
+  // This represents the buffer on the receiving device.
+  BuildPipe(&pipe, new Buffer(2 << 20, 50));
+  return pipe.Pass();
+}
+
 scoped_ptr<PacketPipe> WifiNetwork() {
   // This represents the buffer on the sender.
   scoped_ptr<PacketPipe> pipe;
-  BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
+  BuildPipe(&pipe, new Buffer(256 << 10, 20));
   BuildPipe(&pipe, new RandomDrop(0.005));
   // This represents the buffer on the router.
   BuildPipe(&pipe, new ConstantDelay(1E-3));
   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
-  BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
+  BuildPipe(&pipe, new Buffer(256 << 10, 20));
   BuildPipe(&pipe, new ConstantDelay(1E-3));
   BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
   BuildPipe(&pipe, new RandomDrop(0.005));
   // This represents the buffer on the receiving device.
-  BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
+  BuildPipe(&pipe, new Buffer(256 << 10, 20));
   return pipe.Pass();
 }
 
 scoped_ptr<PacketPipe> BadNetwork() {
   scoped_ptr<PacketPipe> pipe;
   // This represents the buffer on the sender.
-  BuildPipe(&pipe, new Buffer(64 << 10, 5000000)); // 64 kb buf, 5mbit/s
+  BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
   BuildPipe(&pipe, new RandomDrop(0.05));  // 5% packet drop
   BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
   // This represents the buffer on the router.
-  BuildPipe(&pipe, new Buffer(64 << 10, 2000000));  // 64 kb buf, 2mbit/s
+  BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 4mbit/s
   BuildPipe(&pipe, new ConstantDelay(1E-3));
   // Random 40ms every other second
   //  BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
   BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
   // This represents the buffer on the receiving device.
-  BuildPipe(&pipe, new Buffer(64 << 10, 4000000));  // 64 kb buf, 4mbit/s
+  BuildPipe(&pipe, new Buffer(64 << 10, 5));  // 64 kb buf, 5mbit/s
   return pipe.Pass();
 }
 
@@ -406,17 +621,17 @@ scoped_ptr<PacketPipe> BadNetwork() {
 scoped_ptr<PacketPipe> EvilNetwork() {
   // This represents the buffer on the sender.
   scoped_ptr<PacketPipe> pipe;
-  BuildPipe(&pipe, new Buffer(4 << 10, 2000000));
+  BuildPipe(&pipe, new Buffer(4 << 10, 5));  // 4 kb buf, 2mbit/s
   // This represents the buffer on the router.
   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
   BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
-  BuildPipe(&pipe, new Buffer(4 << 10, 1000000));  // 4 kb buf, 1mbit/s
+  BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
   BuildPipe(&pipe, new RandomDrop(0.1));  // 10% packet drop
   BuildPipe(&pipe, new ConstantDelay(1E-3));
   BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
   BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
   // This represents the buffer on the receiving device.
-  BuildPipe(&pipe, new Buffer(4 << 10, 2000000));  // 4 kb buf, 2mbit/s
+  BuildPipe(&pipe, new Buffer(4 << 10, 2));  // 4 kb buf, 2mbit/s
   return pipe.Pass();
 }
 
@@ -426,12 +641,15 @@ class UDPProxyImpl : public UDPProxy {
                const net::IPEndPoint& destination,
                scoped_ptr<PacketPipe> to_dest_pipe,
                scoped_ptr<PacketPipe> from_dest_pipe,
-               net::NetLog* net_log) :
-      local_port_(local_port),
-      destination_(destination),
-      proxy_thread_("media::cast::test::UdpProxy Thread"),
-      to_dest_pipe_(to_dest_pipe.Pass()),
-      from_dest_pipe_(from_dest_pipe.Pass()) {
+               net::NetLog* net_log)
+      : local_port_(local_port),
+        destination_(destination),
+        destination_is_mutable_(destination.address().empty()),
+        proxy_thread_("media::cast::test::UdpProxy Thread"),
+        to_dest_pipe_(to_dest_pipe.Pass()),
+        from_dest_pipe_(from_dest_pipe.Pass()),
+        blocked_(false),
+        weak_factory_(this) {
     proxy_thread_.StartWithOptions(
         base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
     base::WaitableEvent start_event(false, false);
@@ -455,6 +673,40 @@ class UDPProxyImpl : public UDPProxy {
     proxy_thread_.Stop();
   }
 
+  void Send(scoped_ptr<Packet> packet,
+            const net::IPEndPoint& destination) {
+    if (blocked_) {
+      LOG(ERROR) << "Cannot write packet right now: blocked";
+      return;
+    }
+
+    VLOG(1) << "Sending packet, len = " << packet->size();
+    // We ignore all problems, callbacks and errors.
+    // If it didn't work we just drop the packet at and call it a day.
+    scoped_refptr<net::IOBuffer> buf =
+        new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
+    size_t buf_size = packet->size();
+    int result;
+    if (destination.address().empty()) {
+      VLOG(1) << "Destination has not been set yet.";
+      result = net::ERR_INVALID_ARGUMENT;
+    } else {
+      VLOG(1) << "Destination:" << destination.ToString();
+      result = socket_->SendTo(buf.get(),
+                               static_cast<int>(buf_size),
+                               destination,
+                               base::Bind(&UDPProxyImpl::AllowWrite,
+                                          weak_factory_.GetWeakPtr(),
+                                          buf,
+                                          base::Passed(&packet)));
+    }
+    if (result == net::ERR_IO_PENDING) {
+      blocked_ = true;
+    } else if (result < 0) {
+      LOG(ERROR) << "Failed to write packet.";
+    }
+  }
+
  private:
   void Start(base::WaitableEvent* start_event,
              net::NetLog* net_log) {
@@ -462,14 +714,16 @@ class UDPProxyImpl : public UDPProxy {
                                      net::RandIntCallback(),
                                      net_log,
                                      net::NetLog::Source()));
-    BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_));
-    BuildPipe(&from_dest_pipe_,
-              new PacketSender(socket_.get(), &return_address_));
-    to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current());
-    from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current());
+    BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
+    BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
+    to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
+                                  &tick_clock_);
+    from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
+                                    &tick_clock_);
 
     VLOG(0) << "From:" << local_port_.ToString();
-    VLOG(0) << "To:" << destination_.ToString();
+    if (!destination_is_mutable_)
+      VLOG(0) << "To:" << destination_.ToString();
 
     CHECK_GE(socket_->Bind(local_port_), 0);
 
@@ -492,9 +746,16 @@ class UDPProxyImpl : public UDPProxy {
       return;
     }
     packet_->resize(len);
+    if (destination_is_mutable_ && set_destination_next_ &&
+        !(recv_address_ == return_address_) &&
+        !(recv_address_ == destination_)) {
+      destination_ = recv_address_;
+    }
     if (recv_address_ == destination_) {
+      set_destination_next_ = false;
       from_dest_pipe_->Send(packet_.Pass());
     } else {
+      set_destination_next_ = true;
       VLOG(1) << "Return address = " << recv_address_.ToString();
       return_address_ = recv_address_;
       to_dest_pipe_->Send(packet_.Pass());
@@ -508,34 +769,57 @@ class UDPProxyImpl : public UDPProxy {
 
   void PollRead() {
     while (true) {
-      packet_.reset(new transport::Packet(kMaxPacketSize));
+      packet_.reset(new Packet(kMaxPacketSize));
       scoped_refptr<net::IOBuffer> recv_buf =
           new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
       int len = socket_->RecvFrom(
-          recv_buf,
+          recv_buf.get(),
           kMaxPacketSize,
           &recv_address_,
-          base::Bind(&UDPProxyImpl::ReadCallback,
-                     base::Unretained(this),
-                     recv_buf));
+          base::Bind(
+              &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf));
       if (len == net::ERR_IO_PENDING)
         break;
       ProcessPacket(recv_buf, len);
     }
   }
 
+  void AllowWrite(scoped_refptr<net::IOBuffer> buf,
+                  scoped_ptr<Packet> packet,
+                  int unused_len) {
+    DCHECK(blocked_);
+    blocked_ = false;
+  }
 
+  // Input
   net::IPEndPoint local_port_;
+
   net::IPEndPoint destination_;
-  net::IPEndPoint recv_address_;
+  bool destination_is_mutable_;
+
   net::IPEndPoint return_address_;
+  bool set_destination_next_;
+
+  base::DefaultTickClock tick_clock_;
   base::Thread proxy_thread_;
   scoped_ptr<net::UDPSocket> socket_;
   scoped_ptr<PacketPipe> to_dest_pipe_;
   scoped_ptr<PacketPipe> from_dest_pipe_;
-  scoped_ptr<transport::Packet> packet_;
+
+  // For receiving.
+  net::IPEndPoint recv_address_;
+  scoped_ptr<Packet> packet_;
+
+  // For sending.
+  bool blocked_;
+
+  base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
 };
 
+void PacketSender::Send(scoped_ptr<Packet> packet) {
+  udp_proxy_->Send(packet.Pass(), *destination_);
+}
+
 scoped_ptr<UDPProxy> UDPProxy::Create(
     const net::IPEndPoint& local_port,
     const net::IPEndPoint& destination,