1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/transport/transport/udp_transport.h"
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/rand_callback.h"
25 const int kMaxPacketSize = 1500;
27 bool IsEmpty(const net::IPEndPoint& addr) {
28 net::IPAddressNumber empty_addr(addr.address().size());
30 empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
34 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
35 return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
36 addr1.address().end(),
37 addr2.address().begin());
41 UdpTransport::UdpTransport(
43 const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
44 const net::IPEndPoint& local_end_point,
45 const net::IPEndPoint& remote_end_point,
46 const CastTransportStatusCallback& status_callback)
47 : io_thread_proxy_(io_thread_proxy),
48 local_addr_(local_end_point),
49 remote_addr_(remote_end_point),
50 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
51 net::RandIntCallback(),
53 net::NetLog::Source())),
55 receive_pending_(false),
56 client_connected_(false),
57 status_callback_(status_callback),
59 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
62 UdpTransport::~UdpTransport() {}
64 void UdpTransport::StartReceiving(
65 const PacketReceiverCallback& packet_receiver) {
66 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
68 packet_receiver_ = packet_receiver;
69 udp_socket_->AllowAddressReuse();
70 udp_socket_->SetMulticastLoopbackMode(true);
71 if (!IsEmpty(local_addr_)) {
72 if (udp_socket_->Bind(local_addr_) < 0) {
73 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
74 LOG(ERROR) << "Failed to bind local address.";
77 } else if (!IsEmpty(remote_addr_)) {
78 if (udp_socket_->Connect(remote_addr_) < 0) {
79 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
80 LOG(ERROR) << "Failed to connect to remote address.";
83 client_connected_ = true;
85 NOTREACHED() << "Either local or remote address has to be defined.";
88 ScheduleReceiveNextPacket();
91 void UdpTransport::ScheduleReceiveNextPacket() {
92 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
93 if (!packet_receiver_.is_null() && !receive_pending_) {
94 receive_pending_ = true;
95 io_thread_proxy_->PostTask(FROM_HERE,
96 base::Bind(&UdpTransport::ReceiveNextPacket,
97 weak_factory_.GetWeakPtr(),
98 net::ERR_IO_PENDING));
102 void UdpTransport::ReceiveNextPacket(int length_or_status) {
103 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
105 // Loop while UdpSocket is delivering data synchronously. When it responds
106 // with a "pending" status, break and expect this method to be called back in
107 // the future when a packet is ready.
109 if (length_or_status == net::ERR_IO_PENDING) {
110 next_packet_.reset(new Packet(kMaxPacketSize));
111 recv_buf_ = new net::WrappedIOBuffer(
112 reinterpret_cast<char*>(&next_packet_->front()));
113 length_or_status = udp_socket_->RecvFrom(
117 base::Bind(&UdpTransport::ReceiveNextPacket,
118 weak_factory_.GetWeakPtr()));
119 if (length_or_status == net::ERR_IO_PENDING) {
120 receive_pending_ = true;
125 // Note: At this point, either a packet is ready or an error has occurred.
126 if (length_or_status < 0) {
127 VLOG(1) << "Failed to receive packet: Status code is "
129 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
130 receive_pending_ = false;
134 // Confirm the packet has come from the expected remote address; otherwise,
135 // ignore it. If this is the first packet being received and no remote
136 // address has been set, set the remote address and expect all future
137 // packets to come from the same one.
138 // TODO(hubbe): We should only do this if the caller used a valid ssrc.
139 if (IsEmpty(remote_addr_)) {
140 remote_addr_ = recv_addr_;
141 VLOG(1) << "Setting remote address from first received packet: "
142 << remote_addr_.ToString();
143 } else if (!IsEqual(remote_addr_, recv_addr_)) {
144 VLOG(1) << "Ignoring packet received from an unrecognized address: "
145 << recv_addr_.ToString() << ".";
146 length_or_status = net::ERR_IO_PENDING;
150 next_packet_->resize(length_or_status);
151 packet_receiver_.Run(next_packet_.Pass());
152 length_or_status = net::ERR_IO_PENDING;
156 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
157 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
159 DCHECK(!send_pending_);
161 VLOG(1) << "Cannot send because of pending IO.";
165 scoped_refptr<net::IOBuffer> buf =
166 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
169 base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
170 weak_factory_.GetWeakPtr(),
174 if (client_connected_) {
175 // If we called Connect() before we must call Write() instead of
176 // SendTo(). Otherwise on some platforms we might get
177 // ERR_SOCKET_IS_CONNECTED.
178 result = udp_socket_->Write(buf,
179 static_cast<int>(packet->data.size()),
181 } else if (!IsEmpty(remote_addr_)) {
182 result = udp_socket_->SendTo(buf,
183 static_cast<int>(packet->data.size()),
190 if (result == net::ERR_IO_PENDING) {
191 send_pending_ = true;
193 } else if (result < 0) {
194 LOG(ERROR) << "Failed to send packet: " << result << ".";
195 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
198 // Successful send, re-start reading if needed.
199 ScheduleReceiveNextPacket();
204 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
206 const base::Closure& cb,
208 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
210 send_pending_ = false;
212 LOG(ERROR) << "Failed to send packet: " << result << ".";
213 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
215 // Successful send, re-start reading if needed.
216 ScheduleReceiveNextPacket();
224 } // namespace transport