net_log,
net::NetLog::Source())),
send_pending_(false),
+ receive_pending_(false),
client_connected_(false),
status_callback_(status_callback),
weak_factory_(this) {
NOTREACHED() << "Either local or remote address has to be defined.";
}
- ReceiveNextPacket(net::ERR_IO_PENDING);
+ ScheduleReceiveNextPacket();
+}
+
+void UdpTransport::ScheduleReceiveNextPacket() {
+ DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
+ if (!packet_receiver_.is_null() && !receive_pending_) {
+ receive_pending_ = true;
+ io_thread_proxy_->PostTask(FROM_HERE,
+ base::Bind(&UdpTransport::ReceiveNextPacket,
+ weak_factory_.GetWeakPtr(),
+ net::ERR_IO_PENDING));
+ }
}
void UdpTransport::ReceiveNextPacket(int length_or_status) {
&recv_addr_,
base::Bind(&UdpTransport::ReceiveNextPacket,
weak_factory_.GetWeakPtr()));
- if (length_or_status == net::ERR_IO_PENDING)
+ if (length_or_status == net::ERR_IO_PENDING) {
+ receive_pending_ = true;
return;
+ }
}
// Note: At this point, either a packet is ready or an error has occurred.
-
if (length_or_status < 0) {
VLOG(1) << "Failed to receive packet: Status code is "
- << length_or_status << ". Stop receiving packets.";
+ << length_or_status;
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ receive_pending_ = false;
return;
}
}
}
-bool UdpTransport::SendPacket(const Packet& packet) {
+bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
+ DCHECK(!send_pending_);
if (send_pending_) {
VLOG(1) << "Cannot send because of pending IO.";
- return false;
+ return true;
}
- // TODO(hclam): This interface should take a net::IOBuffer to minimize
- // memcpy.
scoped_refptr<net::IOBuffer> buf =
- new net::IOBuffer(static_cast<int>(packet.size()));
- memcpy(buf->data(), &packet[0], packet.size());
-
- int ret;
+ new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
+
+ int result;
+ base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
+ weak_factory_.GetWeakPtr(),
+ buf,
+ packet,
+ cb);
if (client_connected_) {
// If we called Connect() before we must call Write() instead of
// SendTo(). Otherwise on some platforms we might get
// ERR_SOCKET_IS_CONNECTED.
- ret = udp_socket_->Write(
- buf,
- static_cast<int>(packet.size()),
- base::Bind(&UdpTransport::OnSent, weak_factory_.GetWeakPtr(), buf));
+ result = udp_socket_->Write(buf,
+ static_cast<int>(packet->data.size()),
+ callback);
} else if (!IsEmpty(remote_addr_)) {
- ret = udp_socket_->SendTo(
- buf,
- static_cast<int>(packet.size()),
- remote_addr_,
- base::Bind(&UdpTransport::OnSent, weak_factory_.GetWeakPtr(), buf));
+ result = udp_socket_->SendTo(buf,
+ static_cast<int>(packet->data.size()),
+ remote_addr_,
+ callback);
} else {
- return false;
+ return true;
}
- if (ret == net::ERR_IO_PENDING)
+
+ if (result == net::ERR_IO_PENDING) {
send_pending_ = true;
- // When ok, will return a positive value equal the number of bytes sent.
- return ret >= net::OK;
+ return false;
+ } else if (result < 0) {
+ LOG(ERROR) << "Failed to send packet: " << result << ".";
+ status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ return true;
+ } else {
+ // Successful send, re-start reading if needed.
+ ScheduleReceiveNextPacket();
+ return true;
+ }
}
-void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, int result) {
+void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
+ PacketRef packet,
+ const base::Closure& cb,
+ int result) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
send_pending_ = false;
if (result < 0) {
LOG(ERROR) << "Failed to send packet: " << result << ".";
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+ } else {
+ // Successful send, re-start reading if needed.
+ ScheduleReceiveNextPacket();
+ }
+
+ if (!cb.is_null()) {
+ cb.Run();
}
}