1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/p2p/ipc_socket_factory.h"
10 #include "base/compiler_specific.h"
11 #include "base/debug/trace_event.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/message_loop/message_loop_proxy.h"
14 #include "content/public/renderer/p2p_socket_client_delegate.h"
15 #include "content/renderer/p2p/host_address_request.h"
16 #include "content/renderer/p2p/socket_client_impl.h"
17 #include "content/renderer/p2p/socket_dispatcher.h"
18 #include "jingle/glue/utils.h"
19 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
25 const int kDefaultNonSetOptionValue = -1;
27 bool IsTcpClientSocket(P2PSocketType type) {
28 return (type == P2P_SOCKET_STUN_TCP_CLIENT) ||
29 (type == P2P_SOCKET_TCP_CLIENT) ||
30 (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) ||
31 (type == P2P_SOCKET_SSLTCP_CLIENT) ||
32 (type == P2P_SOCKET_TLS_CLIENT) ||
33 (type == P2P_SOCKET_STUN_TLS_CLIENT);
36 bool JingleSocketOptionToP2PSocketOption(talk_base::Socket::Option option,
37 P2PSocketOption* ipc_option) {
39 case talk_base::Socket::OPT_RCVBUF:
40 *ipc_option = P2P_SOCKET_OPT_RCVBUF;
42 case talk_base::Socket::OPT_SNDBUF:
43 *ipc_option = P2P_SOCKET_OPT_SNDBUF;
45 case talk_base::Socket::OPT_DSCP:
46 *ipc_option = P2P_SOCKET_OPT_DSCP;
48 case talk_base::Socket::OPT_DONTFRAGMENT:
49 case talk_base::Socket::OPT_NODELAY:
50 case talk_base::Socket::OPT_IPV6_V6ONLY:
51 case talk_base::Socket::OPT_RTP_SENDTIME_EXTN_ID:
52 return false; // Not supported by the chrome sockets.
60 // TODO(miu): This needs tuning. http://crbug.com/237960
61 const size_t kMaximumInFlightBytes = 64 * 1024; // 64 KB
63 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
64 // using P2PSocketClient that works over IPC-channel. It must be used
65 // on the thread it was created.
66 class IpcPacketSocket : public talk_base::AsyncPacketSocket,
67 public P2PSocketClientDelegate {
70 virtual ~IpcPacketSocket();
72 // Always takes ownership of client even if initialization fails.
73 bool Init(P2PSocketType type, P2PSocketClientImpl* client,
74 const talk_base::SocketAddress& local_address,
75 const talk_base::SocketAddress& remote_address);
77 // talk_base::AsyncPacketSocket interface.
78 virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE;
79 virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE;
80 virtual int Send(const void *pv, size_t cb,
81 const talk_base::PacketOptions& options) OVERRIDE;
82 virtual int SendTo(const void *pv, size_t cb,
83 const talk_base::SocketAddress& addr,
84 const talk_base::PacketOptions& options) OVERRIDE;
85 virtual int Close() OVERRIDE;
86 virtual State GetState() const OVERRIDE;
87 virtual int GetOption(talk_base::Socket::Option option, int* value) OVERRIDE;
88 virtual int SetOption(talk_base::Socket::Option option, int value) OVERRIDE;
89 virtual int GetError() const OVERRIDE;
90 virtual void SetError(int error) OVERRIDE;
92 // P2PSocketClientDelegate implementation.
93 virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE;
94 virtual void OnIncomingTcpConnection(
95 const net::IPEndPoint& address,
96 P2PSocketClient* client) OVERRIDE;
97 virtual void OnSendComplete() OVERRIDE;
98 virtual void OnError() OVERRIDE;
99 virtual void OnDataReceived(const net::IPEndPoint& address,
100 const std::vector<char>& data,
101 const base::TimeTicks& timestamp) OVERRIDE;
112 // Update trace of send throttling internal state. This should be called
113 // immediately after any changes to |send_bytes_available_| and/or
114 // |in_flight_packet_sizes_|.
115 void TraceSendThrottlingState() const;
117 void InitAcceptedTcp(P2PSocketClient* client,
118 const talk_base::SocketAddress& local_address,
119 const talk_base::SocketAddress& remote_address);
121 int DoSetOption(P2PSocketOption option, int value);
125 // Message loop on which this socket was created and being used.
126 base::MessageLoop* message_loop_;
128 // Corresponding P2P socket client.
129 scoped_refptr<P2PSocketClient> client_;
131 // Local address is allocated by the browser process, and the
132 // renderer side doesn't know the address until it receives OnOpen()
133 // event from the browser.
134 talk_base::SocketAddress local_address_;
136 // Remote address for client TCP connections.
137 talk_base::SocketAddress remote_address_;
139 // Current state of the object.
140 InternalState state_;
142 // Track the number of bytes allowed to be sent non-blocking. This is used to
143 // throttle the sending of packets to the browser process. For each packet
144 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
145 // from the browser process) are made, the value is increased back. This
146 // allows short bursts of high-rate sending without dropping packets, but
147 // quickly restricts the client to a sustainable steady-state rate.
148 size_t send_bytes_available_;
149 std::deque<size_t> in_flight_packet_sizes_;
151 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
152 // caller expects SignalWritable notification.
153 bool writable_signal_expected_;
155 // Current error code. Valid when state_ == IS_ERROR.
157 int options_[P2P_SOCKET_OPT_MAX];
159 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
162 IpcPacketSocket::IpcPacketSocket()
163 : type_(P2P_SOCKET_UDP),
164 message_loop_(base::MessageLoop::current()),
165 state_(IS_UNINITIALIZED),
166 send_bytes_available_(kMaximumInFlightBytes),
167 writable_signal_expected_(false),
169 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
170 std::fill_n(options_, static_cast<int> (P2P_SOCKET_OPT_MAX),
171 kDefaultNonSetOptionValue);
174 IpcPacketSocket::~IpcPacketSocket() {
175 if (state_ == IS_OPENING || state_ == IS_OPEN ||
176 state_ == IS_ERROR) {
181 void IpcPacketSocket::TraceSendThrottlingState() const {
182 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
183 send_bytes_available_);
184 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
185 in_flight_packet_sizes_.size());
188 bool IpcPacketSocket::Init(P2PSocketType type,
189 P2PSocketClientImpl* client,
190 const talk_base::SocketAddress& local_address,
191 const talk_base::SocketAddress& remote_address) {
192 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
193 DCHECK_EQ(state_, IS_UNINITIALIZED);
197 local_address_ = local_address;
198 remote_address_ = remote_address;
201 net::IPEndPoint local_endpoint;
202 if (!jingle_glue::SocketAddressToIPEndPoint(
203 local_address, &local_endpoint)) {
207 net::IPEndPoint remote_endpoint;
208 if (!remote_address.IsNil() &&
209 !jingle_glue::SocketAddressToIPEndPoint(
210 remote_address, &remote_endpoint)) {
214 client->Init(type, local_endpoint, remote_endpoint, this);
219 void IpcPacketSocket::InitAcceptedTcp(
220 P2PSocketClient* client,
221 const talk_base::SocketAddress& local_address,
222 const talk_base::SocketAddress& remote_address) {
223 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
224 DCHECK_EQ(state_, IS_UNINITIALIZED);
227 local_address_ = local_address;
228 remote_address_ = remote_address;
230 TraceSendThrottlingState();
231 client_->SetDelegate(this);
234 // talk_base::AsyncPacketSocket interface.
235 talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
236 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
237 return local_address_;
240 talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
241 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
242 return remote_address_;
245 int IpcPacketSocket::Send(const void *data, size_t data_size,
246 const talk_base::PacketOptions& options) {
247 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
248 return SendTo(data, data_size, remote_address_, options);
251 int IpcPacketSocket::SendTo(const void *data, size_t data_size,
252 const talk_base::SocketAddress& address,
253 const talk_base::PacketOptions& options) {
254 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
257 case IS_UNINITIALIZED:
267 // Continue sending the packet.
271 if (data_size == 0) {
276 if (data_size > send_bytes_available_) {
277 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
278 TRACE_EVENT_SCOPE_THREAD,
280 client_->GetSocketID());
281 writable_signal_expected_ = true;
282 error_ = EWOULDBLOCK;
286 net::IPEndPoint address_chrome;
287 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
293 send_bytes_available_ -= data_size;
294 in_flight_packet_sizes_.push_back(data_size);
295 TraceSendThrottlingState();
297 const char* data_char = reinterpret_cast<const char*>(data);
298 std::vector<char> data_vector(data_char, data_char + data_size);
299 client_->SendWithDscp(address_chrome, data_vector,
300 static_cast<net::DiffServCodePoint>(options.dscp));
302 // Fake successful send. The caller ignores result anyway.
306 int IpcPacketSocket::Close() {
307 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
315 talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const {
316 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
319 case IS_UNINITIALIZED:
324 return STATE_BINDING;
327 if (IsTcpClientSocket(type_)) {
328 return STATE_CONNECTED;
342 int IpcPacketSocket::GetOption(talk_base::Socket::Option option, int* value) {
343 P2PSocketOption p2p_socket_option = P2P_SOCKET_OPT_MAX;
344 if (!JingleSocketOptionToP2PSocketOption(option, &p2p_socket_option)) {
345 // unsupported option.
349 *value = options_[p2p_socket_option];
353 int IpcPacketSocket::SetOption(talk_base::Socket::Option option, int value) {
354 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
356 P2PSocketOption p2p_socket_option = P2P_SOCKET_OPT_MAX;
357 if (!JingleSocketOptionToP2PSocketOption(option, &p2p_socket_option)) {
358 // Option is not supported.
362 options_[p2p_socket_option] = value;
364 if (state_ == IS_OPEN) {
365 // Options will be applied when state becomes IS_OPEN in OnOpen.
366 return DoSetOption(p2p_socket_option, value);
371 int IpcPacketSocket::DoSetOption(P2PSocketOption option, int value) {
372 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
373 DCHECK_EQ(state_, IS_OPEN);
375 client_->SetOption(option, value);
379 int IpcPacketSocket::GetError() const {
380 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
384 void IpcPacketSocket::SetError(int error) {
385 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
389 void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) {
390 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
392 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
393 // Always expect correct IPv4 address to be allocated.
400 TraceSendThrottlingState();
402 // Set all pending options if any.
403 for (int i = 0; i < P2P_SOCKET_OPT_MAX; ++i) {
404 if (options_[i] != kDefaultNonSetOptionValue)
405 DoSetOption(static_cast<P2PSocketOption> (i), options_[i]);
408 SignalAddressReady(this, local_address_);
409 if (IsTcpClientSocket(type_))
413 void IpcPacketSocket::OnIncomingTcpConnection(
414 const net::IPEndPoint& address,
415 P2PSocketClient* client) {
416 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
418 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
420 talk_base::SocketAddress remote_address;
421 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
422 // Always expect correct IPv4 address to be allocated.
425 socket->InitAcceptedTcp(client, local_address_, remote_address);
426 SignalNewConnection(this, socket.release());
429 void IpcPacketSocket::OnSendComplete() {
430 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
432 CHECK(!in_flight_packet_sizes_.empty());
433 send_bytes_available_ += in_flight_packet_sizes_.front();
434 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
435 in_flight_packet_sizes_.pop_front();
436 TraceSendThrottlingState();
438 if (writable_signal_expected_ && send_bytes_available_ > 0) {
439 SignalReadyToSend(this);
440 writable_signal_expected_ = false;
444 void IpcPacketSocket::OnError() {
445 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
447 error_ = ECONNABORTED;
450 void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address,
451 const std::vector<char>& data,
452 const base::TimeTicks& timestamp) {
453 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
455 talk_base::SocketAddress address_lj;
456 if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) {
457 // We should always be able to convert address here because we
458 // don't expect IPv6 address on IPv4 connections.
463 talk_base::PacketTime packet_time(timestamp.ToInternalValue(), 0);
464 SignalReadPacket(this, &data[0], data.size(), address_lj,
470 IpcPacketSocketFactory::IpcPacketSocketFactory(
471 P2PSocketDispatcher* socket_dispatcher)
472 : socket_dispatcher_(socket_dispatcher) {
475 IpcPacketSocketFactory::~IpcPacketSocketFactory() {
478 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket(
479 const talk_base::SocketAddress& local_address, int min_port, int max_port) {
480 talk_base::SocketAddress crome_address;
481 P2PSocketClientImpl* socket_client =
482 new P2PSocketClientImpl(socket_dispatcher_);
483 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
484 // TODO(sergeyu): Respect local_address and port limits here (need
485 // to pass them over IPC channel to the browser).
486 if (!socket->Init(P2P_SOCKET_UDP, socket_client,
487 local_address, talk_base::SocketAddress())) {
490 return socket.release();
493 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket(
494 const talk_base::SocketAddress& local_address, int min_port, int max_port,
496 // TODO(sergeyu): Implement SSL support.
497 if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP)
500 P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
501 P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER;
502 P2PSocketClientImpl* socket_client =
503 new P2PSocketClientImpl(socket_dispatcher_);
504 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
505 if (!socket->Init(type, socket_client, local_address,
506 talk_base::SocketAddress())) {
509 return socket.release();
512 talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket(
513 const talk_base::SocketAddress& local_address,
514 const talk_base::SocketAddress& remote_address,
515 const talk_base::ProxyInfo& proxy_info,
516 const std::string& user_agent, int opts) {
518 if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) {
519 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
520 P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT;
521 } else if (opts & talk_base::PacketSocketFactory::OPT_TLS) {
522 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
523 P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT;
525 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
526 P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT;
528 P2PSocketClientImpl* socket_client =
529 new P2PSocketClientImpl(socket_dispatcher_);
530 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
531 if (!socket->Init(type, socket_client, local_address, remote_address))
533 return socket.release();
536 talk_base::AsyncResolverInterface*
537 IpcPacketSocketFactory::CreateAsyncResolver() {
538 return new P2PAsyncAddressResolver(socket_dispatcher_);
541 } // namespace content