Upstream version 7.36.149.0
[platform/framework/web/crosswalk.git] / src / media / cast / transport / transport / udp_transport.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "media/cast/transport/transport/udp_transport.h"
6
7 #include <algorithm>
8 #include <string>
9
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/rand_callback.h"
19
20 namespace media {
21 namespace cast {
22 namespace transport {
23
24 namespace {
25 const int kMaxPacketSize = 1500;
26
27 bool IsEmpty(const net::IPEndPoint& addr) {
28   net::IPAddressNumber empty_addr(addr.address().size());
29   return std::equal(
30              empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
31          !addr.port();
32 }
33
34 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
35   return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
36                                                     addr1.address().end(),
37                                                     addr2.address().begin());
38 }
39 }  // namespace
40
41 UdpTransport::UdpTransport(
42     net::NetLog* net_log,
43     const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
44     const net::IPEndPoint& local_end_point,
45     const net::IPEndPoint& remote_end_point,
46     const CastTransportStatusCallback& status_callback)
47     : io_thread_proxy_(io_thread_proxy),
48       local_addr_(local_end_point),
49       remote_addr_(remote_end_point),
50       udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
51                                      net::RandIntCallback(),
52                                      net_log,
53                                      net::NetLog::Source())),
54       send_pending_(false),
55       receive_pending_(false),
56       client_connected_(false),
57       status_callback_(status_callback),
58       weak_factory_(this) {
59   DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
60 }
61
62 UdpTransport::~UdpTransport() {}
63
64 void UdpTransport::StartReceiving(
65     const PacketReceiverCallback& packet_receiver) {
66   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
67
68   packet_receiver_ = packet_receiver;
69   udp_socket_->AllowAddressReuse();
70   udp_socket_->SetMulticastLoopbackMode(true);
71   if (!IsEmpty(local_addr_)) {
72     if (udp_socket_->Bind(local_addr_) < 0) {
73       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
74       LOG(ERROR) << "Failed to bind local address.";
75       return;
76     }
77   } else if (!IsEmpty(remote_addr_)) {
78     if (udp_socket_->Connect(remote_addr_) < 0) {
79       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
80       LOG(ERROR) << "Failed to connect to remote address.";
81       return;
82     }
83     client_connected_ = true;
84   } else {
85     NOTREACHED() << "Either local or remote address has to be defined.";
86   }
87
88   ScheduleReceiveNextPacket();
89 }
90
91 void UdpTransport::ScheduleReceiveNextPacket() {
92   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
93   if (!packet_receiver_.is_null() && !receive_pending_) {
94     receive_pending_ = true;
95     io_thread_proxy_->PostTask(FROM_HERE,
96                                base::Bind(&UdpTransport::ReceiveNextPacket,
97                                           weak_factory_.GetWeakPtr(),
98                                           net::ERR_IO_PENDING));
99   }
100 }
101
102 void UdpTransport::ReceiveNextPacket(int length_or_status) {
103   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
104
105   // Loop while UdpSocket is delivering data synchronously.  When it responds
106   // with a "pending" status, break and expect this method to be called back in
107   // the future when a packet is ready.
108   while (true) {
109     if (length_or_status == net::ERR_IO_PENDING) {
110       next_packet_.reset(new Packet(kMaxPacketSize));
111       recv_buf_ = new net::WrappedIOBuffer(
112           reinterpret_cast<char*>(&next_packet_->front()));
113       length_or_status = udp_socket_->RecvFrom(
114           recv_buf_,
115           kMaxPacketSize,
116           &recv_addr_,
117           base::Bind(&UdpTransport::ReceiveNextPacket,
118                      weak_factory_.GetWeakPtr()));
119       if (length_or_status == net::ERR_IO_PENDING) {
120         receive_pending_ = true;
121         return;
122       }
123     }
124
125     // Note: At this point, either a packet is ready or an error has occurred.
126     if (length_or_status < 0) {
127       VLOG(1) << "Failed to receive packet: Status code is "
128               << length_or_status;
129       status_callback_.Run(TRANSPORT_SOCKET_ERROR);
130       receive_pending_ = false;
131       return;
132     }
133
134     // Confirm the packet has come from the expected remote address; otherwise,
135     // ignore it.  If this is the first packet being received and no remote
136     // address has been set, set the remote address and expect all future
137     // packets to come from the same one.
138     // TODO(hubbe): We should only do this if the caller used a valid ssrc.
139     if (IsEmpty(remote_addr_)) {
140       remote_addr_ = recv_addr_;
141       VLOG(1) << "Setting remote address from first received packet: "
142               << remote_addr_.ToString();
143     } else if (!IsEqual(remote_addr_, recv_addr_)) {
144       VLOG(1) << "Ignoring packet received from an unrecognized address: "
145               << recv_addr_.ToString() << ".";
146       length_or_status = net::ERR_IO_PENDING;
147       continue;
148     }
149
150     next_packet_->resize(length_or_status);
151     packet_receiver_.Run(next_packet_.Pass());
152     length_or_status = net::ERR_IO_PENDING;
153   }
154 }
155
156 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
157   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
158
159   DCHECK(!send_pending_);
160   if (send_pending_) {
161     VLOG(1) << "Cannot send because of pending IO.";
162     return true;
163   }
164
165   scoped_refptr<net::IOBuffer> buf =
166       new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
167
168   int result;
169   base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
170                                                   weak_factory_.GetWeakPtr(),
171                                                   buf,
172                                                   packet,
173                                                   cb);
174   if (client_connected_) {
175     // If we called Connect() before we must call Write() instead of
176     // SendTo(). Otherwise on some platforms we might get
177     // ERR_SOCKET_IS_CONNECTED.
178     result = udp_socket_->Write(buf,
179                                 static_cast<int>(packet->data.size()),
180                                 callback);
181   } else if (!IsEmpty(remote_addr_)) {
182     result = udp_socket_->SendTo(buf,
183                                  static_cast<int>(packet->data.size()),
184                                  remote_addr_,
185                                  callback);
186   } else {
187     return true;
188   }
189
190   if (result == net::ERR_IO_PENDING) {
191     send_pending_ = true;
192     return false;
193   } else if (result < 0) {
194     LOG(ERROR) << "Failed to send packet: " << result << ".";
195     status_callback_.Run(TRANSPORT_SOCKET_ERROR);
196     return true;
197   } else {
198     // Successful send, re-start reading if needed.
199     ScheduleReceiveNextPacket();
200     return true;
201   }
202 }
203
204 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
205                           PacketRef packet,
206                           const base::Closure& cb,
207                           int result) {
208   DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
209
210   send_pending_ = false;
211   if (result < 0) {
212     LOG(ERROR) << "Failed to send packet: " << result << ".";
213     status_callback_.Run(TRANSPORT_SOCKET_ERROR);
214   } else {
215     // Successful send, re-start reading if needed.
216     ScheduleReceiveNextPacket();
217   }
218
219   if (!cb.is_null()) {
220     cb.Run();
221   }
222 }
223
224 }  // namespace transport
225 }  // namespace cast
226 }  // namespace media