1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
9 #include "media/cast/test/utility/udp_proxy.h"
11 #include "base/logging.h"
12 #include "base/rand_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/thread.h"
15 #include "base/time/default_tick_clock.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/udp/udp_socket.h"
24 const size_t kMaxPacketSize = 65536;
26 PacketPipe::PacketPipe() {}
27 PacketPipe::~PacketPipe() {}
28 void PacketPipe::InitOnIOThread(
29 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
30 base::TickClock* clock) {
31 task_runner_ = task_runner;
34 pipe_->InitOnIOThread(task_runner, clock);
37 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
39 pipe_->AppendToPipe(pipe.Pass());
45 // Roughly emulates a buffer inside a device.
46 // If the buffer is full, packets are dropped.
47 // Packets are output at a maximum bandwidth.
48 class Buffer : public PacketPipe {
50 Buffer(size_t buffer_size, double max_megabits_per_second)
52 max_buffer_size_(buffer_size),
53 max_megabits_per_second_(max_megabits_per_second),
55 CHECK_GT(max_buffer_size_, 0UL);
56 CHECK_GT(max_megabits_per_second, 0);
59 void Send(scoped_ptr<Packet> packet) override {
60 if (packet->size() + buffer_size_ <= max_buffer_size_) {
61 buffer_size_ += packet->size();
62 buffer_.push_back(linked_ptr<Packet>(packet.release()));
63 if (buffer_.size() == 1) {
71 last_schedule_ = clock_->NowTicks();
72 double megabits = buffer_.front()->size() * 8 / 1000000.0;
73 double seconds = megabits / max_megabits_per_second_;
74 int64 microseconds = static_cast<int64>(seconds * 1E6);
75 task_runner_->PostDelayedTask(
77 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
78 base::TimeDelta::FromMicroseconds(microseconds));
81 void ProcessBuffer() {
82 int64 bytes_to_send = static_cast<int64>(
83 (clock_->NowTicks() - last_schedule_).InSecondsF() *
84 max_megabits_per_second_ * 1E6 / 8);
85 if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) {
86 bytes_to_send = buffer_.front()->size();
88 while (!buffer_.empty() &&
89 static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) {
90 CHECK(!buffer_.empty());
91 scoped_ptr<Packet> packet(buffer_.front().release());
92 bytes_to_send -= packet->size();
93 buffer_size_ -= packet->size();
95 pipe_->Send(packet.Pass());
97 if (!buffer_.empty()) {
102 std::deque<linked_ptr<Packet> > buffer_;
103 base::TimeTicks last_schedule_;
105 size_t max_buffer_size_;
106 double max_megabits_per_second_; // megabits per second
107 base::WeakPtrFactory<Buffer> weak_factory_;
110 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
111 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
114 class RandomDrop : public PacketPipe {
116 RandomDrop(double drop_fraction)
117 : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
119 void Send(scoped_ptr<Packet> packet) override {
120 if (rand() > drop_fraction_) {
121 pipe_->Send(packet.Pass());
129 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
130 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
133 class SimpleDelayBase : public PacketPipe {
135 SimpleDelayBase() : weak_factory_(this) {}
136 ~SimpleDelayBase() override {}
138 void Send(scoped_ptr<Packet> packet) override {
139 double seconds = GetDelay();
140 task_runner_->PostDelayedTask(
142 base::Bind(&SimpleDelayBase::SendInternal,
143 weak_factory_.GetWeakPtr(),
144 base::Passed(&packet)),
145 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
148 virtual double GetDelay() = 0;
151 virtual void SendInternal(scoped_ptr<Packet> packet) {
152 pipe_->Send(packet.Pass());
155 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
158 class ConstantDelay : public SimpleDelayBase {
160 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
161 double GetDelay() override { return delay_seconds_; }
164 double delay_seconds_;
167 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
168 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
171 class RandomUnsortedDelay : public SimpleDelayBase {
173 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
175 double GetDelay() override { return random_delay_ * base::RandDouble(); }
178 double random_delay_;
181 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
182 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
185 class DuplicateAndDelay : public RandomUnsortedDelay {
187 DuplicateAndDelay(double delay_min,
188 double random_delay) :
189 RandomUnsortedDelay(random_delay),
190 delay_min_(delay_min) {
192 void Send(scoped_ptr<Packet> packet) override {
193 pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get())));
194 RandomUnsortedDelay::Send(packet.Pass());
196 double GetDelay() override {
197 return RandomUnsortedDelay::GetDelay() + delay_min_;
203 scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
204 double random_delay) {
205 return scoped_ptr<PacketPipe>(
206 new DuplicateAndDelay(delay_min, random_delay)).Pass();
209 class RandomSortedDelay : public PacketPipe {
211 RandomSortedDelay(double random_delay,
213 double seconds_between_extra_delay)
214 : random_delay_(random_delay),
215 extra_delay_(extra_delay),
216 seconds_between_extra_delay_(seconds_between_extra_delay),
217 weak_factory_(this) {}
219 void Send(scoped_ptr<Packet> packet) override {
220 buffer_.push_back(linked_ptr<Packet>(packet.release()));
221 if (buffer_.size() == 1) {
222 next_send_ = std::max(
224 base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
230 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
231 base::TickClock* clock) override {
232 PacketPipe::InitOnIOThread(task_runner, clock);
233 // As we start the stream, assume that we are in a random
234 // place between two extra delays, thus multiplier = 1.0;
235 ScheduleExtraDelay(1.0);
239 void ScheduleExtraDelay(double mult) {
240 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
241 int64 microseconds = static_cast<int64>(seconds * 1E6);
242 task_runner_->PostDelayedTask(
244 base::Bind(&RandomSortedDelay::CauseExtraDelay,
245 weak_factory_.GetWeakPtr()),
246 base::TimeDelta::FromMicroseconds(microseconds));
249 void CauseExtraDelay() {
250 next_send_ = std::max<base::TimeTicks>(
252 base::TimeDelta::FromMicroseconds(
253 static_cast<int64>(extra_delay_ * 1E6)),
255 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
256 // before scheduling another one to make the average equal to
257 // seconds_between_extra_delay_.
258 ScheduleExtraDelay(2.0);
261 void ProcessBuffer() {
262 base::TimeTicks now = clock_->NowTicks();
263 while (!buffer_.empty() && next_send_ <= now) {
264 scoped_ptr<Packet> packet(buffer_.front().release());
265 pipe_->Send(packet.Pass());
268 next_send_ += base::TimeDelta::FromSecondsD(
269 base::RandDouble() * random_delay_);
272 if (!buffer_.empty()) {
273 task_runner_->PostDelayedTask(
275 base::Bind(&RandomSortedDelay::ProcessBuffer,
276 weak_factory_.GetWeakPtr()),
281 base::TimeTicks block_until_;
282 std::deque<linked_ptr<Packet> > buffer_;
283 double random_delay_;
285 double seconds_between_extra_delay_;
286 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
287 base::TimeTicks next_send_;
290 scoped_ptr<PacketPipe> NewRandomSortedDelay(
293 double seconds_between_extra_delay) {
294 return scoped_ptr<PacketPipe>(
295 new RandomSortedDelay(
296 random_delay, extra_delay, seconds_between_extra_delay))
300 class NetworkGlitchPipe : public PacketPipe {
302 NetworkGlitchPipe(double average_work_time, double average_outage_time)
304 max_work_time_(average_work_time * 2),
305 max_outage_time_(average_outage_time * 2),
306 weak_factory_(this) {}
309 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
310 base::TickClock* clock) override {
311 PacketPipe::InitOnIOThread(task_runner, clock);
315 void Send(scoped_ptr<Packet> packet) override {
317 pipe_->Send(packet.Pass());
324 double seconds = base::RandDouble() *
325 (works_ ? max_work_time_ : max_outage_time_);
326 int64 microseconds = static_cast<int64>(seconds * 1E6);
327 task_runner_->PostDelayedTask(
329 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
330 base::TimeDelta::FromMicroseconds(microseconds));
334 double max_work_time_;
335 double max_outage_time_;
336 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
339 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
340 double average_outage_time) {
341 return scoped_ptr<PacketPipe>(
342 new NetworkGlitchPipe(average_work_time, average_outage_time))
347 // Internal buffer object for a client of the IPP model.
348 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
350 InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
356 weak_factory_(this) {
359 void Send(scoped_ptr<Packet> packet) override {
360 // Drop if buffer is full.
361 if (stored_size_ >= stored_limit_)
363 stored_size_ += packet->size();
364 buffer_.push_back(linked_ptr<Packet>(packet.release()));
365 buffer_time_.push_back(clock_->NowTicks());
366 DCHECK(buffer_.size() == buffer_time_.size());
370 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
371 base::TickClock* clock) override {
374 ipp_->InitOnIOThread(task_runner, clock);
375 PacketPipe::InitOnIOThread(task_runner, clock);
378 void SendOnePacket() {
379 scoped_ptr<Packet> packet(buffer_.front().release());
380 stored_size_ -= packet->size();
382 buffer_time_.pop_front();
383 pipe_->Send(packet.Pass());
384 DCHECK(buffer_.size() == buffer_time_.size());
388 return buffer_.empty();
391 base::TimeTicks FirstPacketTime() const {
392 DCHECK(!buffer_time_.empty());
393 return buffer_time_.front();
396 base::WeakPtr<InternalBuffer> GetWeakPtr() {
397 return weak_factory_.GetWeakPtr();
402 const base::WeakPtr<InterruptedPoissonProcess> ipp_;
404 const size_t stored_limit_;
405 std::deque<linked_ptr<Packet> > buffer_;
406 std::deque<base::TimeTicks> buffer_time_;
407 base::TickClock* clock_;
408 base::WeakPtrFactory<InternalBuffer> weak_factory_;
410 DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
413 InterruptedPoissonProcess::InterruptedPoissonProcess(
414 const std::vector<double>& average_rates,
415 double coef_burstiness,
416 double coef_variance,
419 average_rates_(average_rates),
420 coef_burstiness_(coef_burstiness),
421 coef_variance_(coef_variance),
424 weak_factory_(this) {
425 mt_rand_.init_genrand(rand_seed);
426 DCHECK(!average_rates.empty());
430 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
433 void InterruptedPoissonProcess::InitOnIOThread(
434 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
435 base::TickClock* clock) {
436 // Already initialized and started.
437 if (task_runner_.get() && clock_)
439 task_runner_ = task_runner;
446 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
447 scoped_ptr<InternalBuffer> buffer(
448 new InternalBuffer(weak_factory_.GetWeakPtr(), size));
449 send_buffers_.push_back(buffer->GetWeakPtr());
450 return buffer.Pass();
453 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
454 // Rate is per milliseconds.
455 // The time until next event is exponentially distributed to the
456 // inverse of |rate|.
457 return base::TimeDelta::FromMillisecondsD(
458 fabs(-log(1.0 - RandDouble()) / rate));
461 double InterruptedPoissonProcess::RandDouble() {
462 // Generate a 64-bits random number from MT19937 and then convert
464 uint64 rand = mt_rand_.genrand_int32();
466 rand |= mt_rand_.genrand_int32();
467 return base::BitsToOpenEndedUnitInterval(rand);
470 void InterruptedPoissonProcess::ComputeRates() {
471 double avg_rate = average_rates_[rate_index_];
473 send_rate_ = avg_rate / coef_burstiness_;
475 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
476 coef_burstiness_ / (coef_variance_ - 1);
478 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
481 void InterruptedPoissonProcess::UpdateRates() {
484 // Rates are updated once per second.
485 rate_index_ = (rate_index_ + 1) % average_rates_.size();
486 task_runner_->PostDelayedTask(
488 base::Bind(&InterruptedPoissonProcess::UpdateRates,
489 weak_factory_.GetWeakPtr()),
490 base::TimeDelta::FromSeconds(1));
493 void InterruptedPoissonProcess::SwitchOff() {
495 task_runner_->PostDelayedTask(
497 base::Bind(&InterruptedPoissonProcess::SwitchOn,
498 weak_factory_.GetWeakPtr()),
499 NextEvent(switch_on_rate_));
502 void InterruptedPoissonProcess::SwitchOn() {
504 task_runner_->PostDelayedTask(
506 base::Bind(&InterruptedPoissonProcess::SwitchOff,
507 weak_factory_.GetWeakPtr()),
508 NextEvent(switch_off_rate_));
511 void InterruptedPoissonProcess::SendPacket() {
512 task_runner_->PostDelayedTask(
514 base::Bind(&InterruptedPoissonProcess::SendPacket,
515 weak_factory_.GetWeakPtr()),
516 NextEvent(send_rate_));
518 // If OFF then don't send.
522 // Find the earliest packet to send.
523 base::TimeTicks earliest_time;
524 for (size_t i = 0; i < send_buffers_.size(); ++i) {
525 if (!send_buffers_[i])
527 if (send_buffers_[i]->Empty())
529 if (earliest_time.is_null() ||
530 send_buffers_[i]->FirstPacketTime() < earliest_time)
531 earliest_time = send_buffers_[i]->FirstPacketTime();
533 for (size_t i = 0; i < send_buffers_.size(); ++i) {
534 if (!send_buffers_[i])
536 if (send_buffers_[i]->Empty())
538 if (send_buffers_[i]->FirstPacketTime() != earliest_time)
540 send_buffers_[i]->SendOnePacket();
547 class PacketSender : public PacketPipe {
549 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
550 : udp_proxy_(udp_proxy), destination_(destination) {}
551 void Send(scoped_ptr<Packet> packet) override;
552 void AppendToPipe(scoped_ptr<PacketPipe> pipe) override { NOTREACHED(); }
555 UDPProxyImpl* udp_proxy_;
556 const net::IPEndPoint* destination_; // not owned
560 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
562 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
569 scoped_ptr<PacketPipe> GoodNetwork() {
570 // This represents the buffer on the sender.
571 scoped_ptr<PacketPipe> pipe;
572 BuildPipe(&pipe, new Buffer(2 << 20, 50));
573 BuildPipe(&pipe, new ConstantDelay(1E-3));
574 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
575 // This represents the buffer on the receiving device.
576 BuildPipe(&pipe, new Buffer(2 << 20, 50));
580 scoped_ptr<PacketPipe> WifiNetwork() {
581 // This represents the buffer on the sender.
582 scoped_ptr<PacketPipe> pipe;
583 BuildPipe(&pipe, new Buffer(256 << 10, 20));
584 BuildPipe(&pipe, new RandomDrop(0.005));
585 // This represents the buffer on the router.
586 BuildPipe(&pipe, new ConstantDelay(1E-3));
587 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
588 BuildPipe(&pipe, new Buffer(256 << 10, 20));
589 BuildPipe(&pipe, new ConstantDelay(1E-3));
590 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
591 BuildPipe(&pipe, new RandomDrop(0.005));
592 // This represents the buffer on the receiving device.
593 BuildPipe(&pipe, new Buffer(256 << 10, 20));
597 scoped_ptr<PacketPipe> BadNetwork() {
598 scoped_ptr<PacketPipe> pipe;
599 // This represents the buffer on the sender.
600 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
601 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
602 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
603 // This represents the buffer on the router.
604 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
605 BuildPipe(&pipe, new ConstantDelay(1E-3));
606 // Random 40ms every other second
607 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
608 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
609 // This represents the buffer on the receiving device.
610 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
615 scoped_ptr<PacketPipe> EvilNetwork() {
616 // This represents the buffer on the sender.
617 scoped_ptr<PacketPipe> pipe;
618 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
619 // This represents the buffer on the router.
620 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
621 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
622 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
623 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
624 BuildPipe(&pipe, new ConstantDelay(1E-3));
625 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
626 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
627 // This represents the buffer on the receiving device.
628 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
632 class UDPProxyImpl : public UDPProxy {
634 UDPProxyImpl(const net::IPEndPoint& local_port,
635 const net::IPEndPoint& destination,
636 scoped_ptr<PacketPipe> to_dest_pipe,
637 scoped_ptr<PacketPipe> from_dest_pipe,
638 net::NetLog* net_log)
639 : local_port_(local_port),
640 destination_(destination),
641 destination_is_mutable_(destination.address().empty()),
642 proxy_thread_("media::cast::test::UdpProxy Thread"),
643 to_dest_pipe_(to_dest_pipe.Pass()),
644 from_dest_pipe_(from_dest_pipe.Pass()),
646 weak_factory_(this) {
647 proxy_thread_.StartWithOptions(
648 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
649 base::WaitableEvent start_event(false, false);
650 proxy_thread_.message_loop_proxy()->PostTask(
652 base::Bind(&UDPProxyImpl::Start,
653 base::Unretained(this),
654 base::Unretained(&start_event),
659 ~UDPProxyImpl() override {
660 base::WaitableEvent stop_event(false, false);
661 proxy_thread_.message_loop_proxy()->PostTask(
663 base::Bind(&UDPProxyImpl::Stop,
664 base::Unretained(this),
665 base::Unretained(&stop_event)));
667 proxy_thread_.Stop();
670 void Send(scoped_ptr<Packet> packet,
671 const net::IPEndPoint& destination) {
673 LOG(ERROR) << "Cannot write packet right now: blocked";
677 VLOG(1) << "Sending packet, len = " << packet->size();
678 // We ignore all problems, callbacks and errors.
679 // If it didn't work we just drop the packet at and call it a day.
680 scoped_refptr<net::IOBuffer> buf =
681 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
682 size_t buf_size = packet->size();
684 if (destination.address().empty()) {
685 VLOG(1) << "Destination has not been set yet.";
686 result = net::ERR_INVALID_ARGUMENT;
688 VLOG(1) << "Destination:" << destination.ToString();
689 result = socket_->SendTo(buf.get(),
690 static_cast<int>(buf_size),
692 base::Bind(&UDPProxyImpl::AllowWrite,
693 weak_factory_.GetWeakPtr(),
695 base::Passed(&packet)));
697 if (result == net::ERR_IO_PENDING) {
699 } else if (result < 0) {
700 LOG(ERROR) << "Failed to write packet.";
705 void Start(base::WaitableEvent* start_event,
706 net::NetLog* net_log) {
707 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
708 net::RandIntCallback(),
710 net::NetLog::Source()));
711 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
712 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
713 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
715 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
718 VLOG(0) << "From:" << local_port_.ToString();
719 if (!destination_is_mutable_)
720 VLOG(0) << "To:" << destination_.ToString();
722 CHECK_GE(socket_->Bind(local_port_), 0);
724 start_event->Signal();
728 void Stop(base::WaitableEvent* stop_event) {
729 to_dest_pipe_.reset(NULL);
730 from_dest_pipe_.reset(NULL);
732 stop_event->Signal();
735 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
736 DCHECK_NE(len, net::ERR_IO_PENDING);
737 VLOG(1) << "Got packet, len = " << len;
739 LOG(WARNING) << "Socket read error: " << len;
742 packet_->resize(len);
743 if (destination_is_mutable_ && set_destination_next_ &&
744 !(recv_address_ == return_address_) &&
745 !(recv_address_ == destination_)) {
746 destination_ = recv_address_;
748 if (recv_address_ == destination_) {
749 set_destination_next_ = false;
750 from_dest_pipe_->Send(packet_.Pass());
752 set_destination_next_ = true;
753 VLOG(1) << "Return address = " << recv_address_.ToString();
754 return_address_ = recv_address_;
755 to_dest_pipe_->Send(packet_.Pass());
759 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
760 ProcessPacket(recv_buf, len);
766 packet_.reset(new Packet(kMaxPacketSize));
767 scoped_refptr<net::IOBuffer> recv_buf =
768 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
769 int len = socket_->RecvFrom(
774 &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf));
775 if (len == net::ERR_IO_PENDING)
777 ProcessPacket(recv_buf, len);
781 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
782 scoped_ptr<Packet> packet,
789 net::IPEndPoint local_port_;
791 net::IPEndPoint destination_;
792 bool destination_is_mutable_;
794 net::IPEndPoint return_address_;
795 bool set_destination_next_;
797 base::DefaultTickClock tick_clock_;
798 base::Thread proxy_thread_;
799 scoped_ptr<net::UDPSocket> socket_;
800 scoped_ptr<PacketPipe> to_dest_pipe_;
801 scoped_ptr<PacketPipe> from_dest_pipe_;
804 net::IPEndPoint recv_address_;
805 scoped_ptr<Packet> packet_;
810 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
813 void PacketSender::Send(scoped_ptr<Packet> packet) {
814 udp_proxy_->Send(packet.Pass(), *destination_);
817 scoped_ptr<UDPProxy> UDPProxy::Create(
818 const net::IPEndPoint& local_port,
819 const net::IPEndPoint& destination,
820 scoped_ptr<PacketPipe> to_dest_pipe,
821 scoped_ptr<PacketPipe> from_dest_pipe,
822 net::NetLog* net_log) {
823 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
826 from_dest_pipe.Pass(),