} // namespace
UdpTransport::UdpTransport(
+ net::NetLog* net_log,
const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
const net::IPEndPoint& local_end_point,
const net::IPEndPoint& remote_end_point,
remote_addr_(remote_end_point),
udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
net::RandIntCallback(),
- NULL,
+ net_log,
net::NetLog::Source())),
send_pending_(false),
- recv_buf_(new net::IOBuffer(kMaxPacketSize)),
+ receive_pending_(false),
+ client_connected_(false),
status_callback_(status_callback),
weak_factory_(this) {
DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
LOG(ERROR) << "Failed to connect to remote address.";
return;
}
+ client_connected_ = true;
} else {
NOTREACHED() << "Either local or remote address has to be defined.";
}
- ReceiveOnePacket();
+
+ ScheduleReceiveNextPacket();
}
-void UdpTransport::ReceiveOnePacket() {
+void UdpTransport::ScheduleReceiveNextPacket() {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
-
- int result = udp_socket_->RecvFrom(
- recv_buf_,
- kMaxPacketSize,
- &recv_addr_,
- base::Bind(&UdpTransport::OnReceived, weak_factory_.GetWeakPtr()));
- if (result > 0) {
- OnReceived(result);
- } else if (result != net::ERR_IO_PENDING) {
- LOG(ERROR) << "Failed to receive packet: " << result << "."
- << " Stop receiving packets.";
- status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ if (!packet_receiver_.is_null() && !receive_pending_) {
+ receive_pending_ = true;
+ io_thread_proxy_->PostTask(FROM_HERE,
+ base::Bind(&UdpTransport::ReceiveNextPacket,
+ weak_factory_.GetWeakPtr(),
+ net::ERR_IO_PENDING));
}
}
-void UdpTransport::OnReceived(int result) {
+void UdpTransport::ReceiveNextPacket(int length_or_status) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
- if (result < 0) {
- LOG(ERROR) << "Failed to receive packet: " << result << "."
- << " Stop receiving packets.";
- status_callback_.Run(TRANSPORT_SOCKET_ERROR);
- return;
- }
- if (IsEmpty(remote_addr_)) {
- remote_addr_ = recv_addr_;
- VLOG(1) << "First packet received from: " << remote_addr_.ToString() << ".";
- } else if (!IsEqual(remote_addr_, recv_addr_)) {
- LOG(ERROR) << "Received from an unrecognized address: "
- << recv_addr_.ToString() << ".";
- return;
+ // Loop while UdpSocket is delivering data synchronously. When it responds
+ // with a "pending" status, break and expect this method to be called back in
+ // the future when a packet is ready.
+ while (true) {
+ if (length_or_status == net::ERR_IO_PENDING) {
+ next_packet_.reset(new Packet(kMaxPacketSize));
+ recv_buf_ = new net::WrappedIOBuffer(
+ reinterpret_cast<char*>(&next_packet_->front()));
+ length_or_status = udp_socket_->RecvFrom(
+ recv_buf_,
+ kMaxPacketSize,
+ &recv_addr_,
+ base::Bind(&UdpTransport::ReceiveNextPacket,
+ weak_factory_.GetWeakPtr()));
+ if (length_or_status == net::ERR_IO_PENDING) {
+ receive_pending_ = true;
+ return;
+ }
+ }
+
+ // Note: At this point, either a packet is ready or an error has occurred.
+ if (length_or_status < 0) {
+ VLOG(1) << "Failed to receive packet: Status code is "
+ << length_or_status;
+ status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ receive_pending_ = false;
+ return;
+ }
+
+ // Confirm the packet has come from the expected remote address; otherwise,
+ // ignore it. If this is the first packet being received and no remote
+ // address has been set, set the remote address and expect all future
+ // packets to come from the same one.
+ // TODO(hubbe): We should only do this if the caller used a valid ssrc.
+ if (IsEmpty(remote_addr_)) {
+ remote_addr_ = recv_addr_;
+ VLOG(1) << "Setting remote address from first received packet: "
+ << remote_addr_.ToString();
+ } else if (!IsEqual(remote_addr_, recv_addr_)) {
+ VLOG(1) << "Ignoring packet received from an unrecognized address: "
+ << recv_addr_.ToString() << ".";
+ length_or_status = net::ERR_IO_PENDING;
+ continue;
+ }
+
+ next_packet_->resize(length_or_status);
+ packet_receiver_.Run(next_packet_.Pass());
+ length_or_status = net::ERR_IO_PENDING;
}
- // TODO(hclam): The interfaces should use net::IOBuffer to eliminate memcpy.
- scoped_ptr<Packet> packet(
- new Packet(recv_buf_->data(), recv_buf_->data() + result));
- packet_receiver_.Run(packet.Pass());
- ReceiveOnePacket();
}
-bool UdpTransport::SendPacket(const Packet& packet) {
+bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
+ DCHECK(!send_pending_);
if (send_pending_) {
VLOG(1) << "Cannot send because of pending IO.";
- return false;
+ return true;
}
- // TODO(hclam): This interface should take a net::IOBuffer to minimize
- // memcpy.
scoped_refptr<net::IOBuffer> buf =
- new net::IOBuffer(static_cast<int>(packet.size()));
- memcpy(buf->data(), &packet[0], packet.size());
- int ret = udp_socket_->SendTo(
- buf,
- static_cast<int>(packet.size()),
- remote_addr_,
- base::Bind(&UdpTransport::OnSent, weak_factory_.GetWeakPtr(), buf));
- if (ret == net::ERR_IO_PENDING)
+ new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
+
+ int result;
+ base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
+ weak_factory_.GetWeakPtr(),
+ buf,
+ packet,
+ cb);
+ if (client_connected_) {
+ // If we called Connect() before we must call Write() instead of
+ // SendTo(). Otherwise on some platforms we might get
+ // ERR_SOCKET_IS_CONNECTED.
+ result = udp_socket_->Write(buf,
+ static_cast<int>(packet->data.size()),
+ callback);
+ } else if (!IsEmpty(remote_addr_)) {
+ result = udp_socket_->SendTo(buf,
+ static_cast<int>(packet->data.size()),
+ remote_addr_,
+ callback);
+ } else {
+ return true;
+ }
+
+ if (result == net::ERR_IO_PENDING) {
send_pending_ = true;
- // When ok, will return a positive value equal the number of bytes sent.
- return ret >= net::OK;
+ return false;
+ } else if (result < 0) {
+ LOG(ERROR) << "Failed to send packet: " << result << ".";
+ status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ return true;
+ } else {
+ // Successful send, re-start reading if needed.
+ ScheduleReceiveNextPacket();
+ return true;
+ }
}
-void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, int result) {
+void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
+ PacketRef packet,
+ const base::Closure& cb,
+ int result) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
send_pending_ = false;
if (result < 0) {
LOG(ERROR) << "Failed to send packet: " << result << ".";
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ } else {
+ // Successful send, re-start reading if needed.
+ ScheduleReceiveNextPacket();
+ }
+
+ if (!cb.is_null()) {
+ cb.Run();
}
}