Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / media / cast / transport / transport / udp_transport.cc
index d16a117..bcce4f7 100644 (file)
@@ -39,6 +39,7 @@ bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
 }  // namespace
 
 UdpTransport::UdpTransport(
+    net::NetLog* net_log,
     const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
     const net::IPEndPoint& local_end_point,
     const net::IPEndPoint& remote_end_point,
@@ -48,10 +49,11 @@ UdpTransport::UdpTransport(
       remote_addr_(remote_end_point),
       udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
                                      net::RandIntCallback(),
-                                     NULL,
+                                     net_log,
                                      net::NetLog::Source())),
       send_pending_(false),
-      recv_buf_(new net::IOBuffer(kMaxPacketSize)),
+      receive_pending_(false),
+      client_connected_(false),
       status_callback_(status_callback),
       weak_factory_(this) {
   DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
@@ -78,84 +80,144 @@ void UdpTransport::StartReceiving(
       LOG(ERROR) << "Failed to connect to remote address.";
       return;
     }
+    client_connected_ = true;
   } else {
     NOTREACHED() << "Either local or remote address has to be defined.";
   }
-  ReceiveOnePacket();
+
+  ScheduleReceiveNextPacket();
 }
 
-void UdpTransport::ReceiveOnePacket() {
+void UdpTransport::ScheduleReceiveNextPacket() {
   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
-
-  int result = udp_socket_->RecvFrom(
-      recv_buf_,
-      kMaxPacketSize,
-      &recv_addr_,
-      base::Bind(&UdpTransport::OnReceived, weak_factory_.GetWeakPtr()));
-  if (result > 0) {
-    OnReceived(result);
-  } else if (result != net::ERR_IO_PENDING) {
-    LOG(ERROR) << "Failed to receive packet: " << result << "."
-               << " Stop receiving packets.";
-    status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+  if (!packet_receiver_.is_null() && !receive_pending_) {
+    receive_pending_ = true;
+    io_thread_proxy_->PostTask(FROM_HERE,
+                               base::Bind(&UdpTransport::ReceiveNextPacket,
+                                          weak_factory_.GetWeakPtr(),
+                                          net::ERR_IO_PENDING));
   }
 }
 
-void UdpTransport::OnReceived(int result) {
+void UdpTransport::ReceiveNextPacket(int length_or_status) {
   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
-  if (result < 0) {
-    LOG(ERROR) << "Failed to receive packet: " << result << "."
-               << " Stop receiving packets.";
-    status_callback_.Run(TRANSPORT_SOCKET_ERROR);
-    return;
-  }
 
-  if (IsEmpty(remote_addr_)) {
-    remote_addr_ = recv_addr_;
-    VLOG(1) << "First packet received from: " << remote_addr_.ToString() << ".";
-  } else if (!IsEqual(remote_addr_, recv_addr_)) {
-    LOG(ERROR) << "Received from an unrecognized address: "
-               << recv_addr_.ToString() << ".";
-    return;
+  // Loop while UdpSocket is delivering data synchronously.  When it responds
+  // with a "pending" status, break and expect this method to be called back in
+  // the future when a packet is ready.
+  while (true) {
+    if (length_or_status == net::ERR_IO_PENDING) {
+      next_packet_.reset(new Packet(kMaxPacketSize));
+      recv_buf_ = new net::WrappedIOBuffer(
+          reinterpret_cast<char*>(&next_packet_->front()));
+      length_or_status = udp_socket_->RecvFrom(
+          recv_buf_,
+          kMaxPacketSize,
+          &recv_addr_,
+          base::Bind(&UdpTransport::ReceiveNextPacket,
+                     weak_factory_.GetWeakPtr()));
+      if (length_or_status == net::ERR_IO_PENDING) {
+        receive_pending_ = true;
+        return;
+      }
+    }
+
+    // Note: At this point, either a packet is ready or an error has occurred.
+    if (length_or_status < 0) {
+      VLOG(1) << "Failed to receive packet: Status code is "
+              << length_or_status;
+      status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+      receive_pending_ = false;
+      return;
+    }
+
+    // Confirm the packet has come from the expected remote address; otherwise,
+    // ignore it.  If this is the first packet being received and no remote
+    // address has been set, set the remote address and expect all future
+    // packets to come from the same one.
+    // TODO(hubbe): We should only do this if the caller used a valid ssrc.
+    if (IsEmpty(remote_addr_)) {
+      remote_addr_ = recv_addr_;
+      VLOG(1) << "Setting remote address from first received packet: "
+              << remote_addr_.ToString();
+    } else if (!IsEqual(remote_addr_, recv_addr_)) {
+      VLOG(1) << "Ignoring packet received from an unrecognized address: "
+              << recv_addr_.ToString() << ".";
+      length_or_status = net::ERR_IO_PENDING;
+      continue;
+    }
+
+    next_packet_->resize(length_or_status);
+    packet_receiver_.Run(next_packet_.Pass());
+    length_or_status = net::ERR_IO_PENDING;
   }
-  // TODO(hclam): The interfaces should use net::IOBuffer to eliminate memcpy.
-  scoped_ptr<Packet> packet(
-      new Packet(recv_buf_->data(), recv_buf_->data() + result));
-  packet_receiver_.Run(packet.Pass());
-  ReceiveOnePacket();
 }
 
-bool UdpTransport::SendPacket(const Packet& packet) {
+bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
 
+  DCHECK(!send_pending_);
   if (send_pending_) {
     VLOG(1) << "Cannot send because of pending IO.";
-    return false;
+    return true;
   }
 
-  // TODO(hclam): This interface should take a net::IOBuffer to minimize
-  // memcpy.
   scoped_refptr<net::IOBuffer> buf =
-      new net::IOBuffer(static_cast<int>(packet.size()));
-  memcpy(buf->data(), &packet[0], packet.size());
-  int ret = udp_socket_->SendTo(
-      buf,
-      static_cast<int>(packet.size()),
-      remote_addr_,
-      base::Bind(&UdpTransport::OnSent, weak_factory_.GetWeakPtr(), buf));
-  if (ret == net::ERR_IO_PENDING)
+      new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
+
+  int result;
+  base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
+                                                  weak_factory_.GetWeakPtr(),
+                                                  buf,
+                                                  packet,
+                                                  cb);
+  if (client_connected_) {
+    // If we called Connect() before we must call Write() instead of
+    // SendTo(). Otherwise on some platforms we might get
+    // ERR_SOCKET_IS_CONNECTED.
+    result = udp_socket_->Write(buf,
+                                static_cast<int>(packet->data.size()),
+                                callback);
+  } else if (!IsEmpty(remote_addr_)) {
+    result = udp_socket_->SendTo(buf,
+                                 static_cast<int>(packet->data.size()),
+                                 remote_addr_,
+                                 callback);
+  } else {
+    return true;
+  }
+
+  if (result == net::ERR_IO_PENDING) {
     send_pending_ = true;
-  // When ok, will return a positive value equal the number of bytes sent.
-  return ret >= net::OK;
+    return false;
+  } else if (result < 0) {
+    LOG(ERROR) << "Failed to send packet: " << result << ".";
+    status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+    return true;
+  } else {
+    // Successful send, re-start reading if needed.
+    ScheduleReceiveNextPacket();
+    return true;
+  }
 }
 
-void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, int result) {
+void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
+                          PacketRef packet,
+                          const base::Closure& cb,
+                          int result) {
   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
 
   send_pending_ = false;
   if (result < 0) {
     LOG(ERROR) << "Failed to send packet: " << result << ".";
     status_callback_.Run(TRANSPORT_SOCKET_ERROR);
+  } else {
+    // Successful send, re-start reading if needed.
+    ScheduleReceiveNextPacket();
+  }
+
+  if (!cb.is_null()) {
+    cb.Run();
   }
 }