1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/p2p/ipc_socket_factory.h"
10 #include "base/compiler_specific.h"
11 #include "base/debug/trace_event.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/message_loop/message_loop_proxy.h"
14 #include "base/metrics/histogram.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/threading/non_thread_safe.h"
17 #include "content/renderer/media/webrtc_logging.h"
18 #include "content/renderer/p2p/host_address_request.h"
19 #include "content/renderer/p2p/socket_client_delegate.h"
20 #include "content/renderer/p2p/socket_client_impl.h"
21 #include "content/renderer/p2p/socket_dispatcher.h"
22 #include "jingle/glue/utils.h"
23 #include "third_party/webrtc/base/asyncpacketsocket.h"
29 const int kDefaultNonSetOptionValue = -1;
31 bool IsTcpClientSocket(P2PSocketType type) {
32 return (type == P2P_SOCKET_STUN_TCP_CLIENT) ||
33 (type == P2P_SOCKET_TCP_CLIENT) ||
34 (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) ||
35 (type == P2P_SOCKET_SSLTCP_CLIENT) ||
36 (type == P2P_SOCKET_TLS_CLIENT) ||
37 (type == P2P_SOCKET_STUN_TLS_CLIENT);
40 bool JingleSocketOptionToP2PSocketOption(rtc::Socket::Option option,
41 P2PSocketOption* ipc_option) {
43 case rtc::Socket::OPT_RCVBUF:
44 *ipc_option = P2P_SOCKET_OPT_RCVBUF;
46 case rtc::Socket::OPT_SNDBUF:
47 *ipc_option = P2P_SOCKET_OPT_SNDBUF;
49 case rtc::Socket::OPT_DSCP:
50 *ipc_option = P2P_SOCKET_OPT_DSCP;
52 case rtc::Socket::OPT_DONTFRAGMENT:
53 case rtc::Socket::OPT_NODELAY:
54 case rtc::Socket::OPT_IPV6_V6ONLY:
55 case rtc::Socket::OPT_RTP_SENDTIME_EXTN_ID:
56 return false; // Not supported by the chrome sockets.
64 // TODO(miu): This needs tuning. http://crbug.com/237960
65 // http://crbug.com/427555
66 const size_t kMaximumInFlightBytes = 256 * 1024; // 256 KB
68 // IpcPacketSocket implements rtc::AsyncPacketSocket interface
69 // using P2PSocketClient that works over IPC-channel. It must be used
70 // on the thread it was created.
71 class IpcPacketSocket : public rtc::AsyncPacketSocket,
72 public P2PSocketClientDelegate {
75 ~IpcPacketSocket() override;
77 // Always takes ownership of client even if initialization fails.
78 bool Init(P2PSocketType type, P2PSocketClientImpl* client,
79 const rtc::SocketAddress& local_address,
80 const rtc::SocketAddress& remote_address);
82 // rtc::AsyncPacketSocket interface.
83 rtc::SocketAddress GetLocalAddress() const override;
84 rtc::SocketAddress GetRemoteAddress() const override;
85 int Send(const void* pv,
87 const rtc::PacketOptions& options) override;
88 int SendTo(const void* pv,
90 const rtc::SocketAddress& addr,
91 const rtc::PacketOptions& options) override;
93 State GetState() const override;
94 int GetOption(rtc::Socket::Option option, int* value) override;
95 int SetOption(rtc::Socket::Option option, int value) override;
96 int GetError() const override;
97 void SetError(int error) override;
99 // P2PSocketClientDelegate implementation.
100 void OnOpen(const net::IPEndPoint& local_address,
101 const net::IPEndPoint& remote_address) override;
102 void OnIncomingTcpConnection(const net::IPEndPoint& address,
103 P2PSocketClient* client) override;
104 void OnSendComplete() override;
105 void OnError() override;
106 void OnDataReceived(const net::IPEndPoint& address,
107 const std::vector<char>& data,
108 const base::TimeTicks& timestamp) override;
119 // Increment the counter for consecutive bytes discarded as socket is running
121 void IncrementDiscardCounters(size_t bytes_discarded);
123 // Update trace of send throttling internal state. This should be called
124 // immediately after any changes to |send_bytes_available_| and/or
125 // |in_flight_packet_sizes_|.
126 void TraceSendThrottlingState() const;
128 void InitAcceptedTcp(P2PSocketClient* client,
129 const rtc::SocketAddress& local_address,
130 const rtc::SocketAddress& remote_address);
132 int DoSetOption(P2PSocketOption option, int value);
136 // Message loop on which this socket was created and being used.
137 base::MessageLoop* message_loop_;
139 // Corresponding P2P socket client.
140 scoped_refptr<P2PSocketClient> client_;
142 // Local address is allocated by the browser process, and the
143 // renderer side doesn't know the address until it receives OnOpen()
144 // event from the browser.
145 rtc::SocketAddress local_address_;
147 // Remote address for client TCP connections.
148 rtc::SocketAddress remote_address_;
150 // Current state of the object.
151 InternalState state_;
153 // Track the number of bytes allowed to be sent non-blocking. This is used to
154 // throttle the sending of packets to the browser process. For each packet
155 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
156 // from the browser process) are made, the value is increased back. This
157 // allows short bursts of high-rate sending without dropping packets, but
158 // quickly restricts the client to a sustainable steady-state rate.
159 size_t send_bytes_available_;
160 std::deque<size_t> in_flight_packet_sizes_;
162 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
163 // caller expects SignalWritable notification.
164 bool writable_signal_expected_;
166 // Current error code. Valid when state_ == IS_ERROR.
168 int options_[P2P_SOCKET_OPT_MAX];
170 // Track the maximum and current consecutive bytes discarded due to not enough
171 // send_bytes_available_.
172 size_t max_discard_bytes_sequence_;
173 size_t current_discard_bytes_sequence_;
175 // Track the total number of packets and the number of packets discarded.
176 size_t packets_discarded_;
177 size_t total_packets_;
179 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
182 // Simple wrapper around P2PAsyncAddressResolver. The main purpose of this
183 // class is to send SignalDone, after OnDone callback from
184 // P2PAsyncAddressResolver. Libjingle sig slots are not thread safe. In case
185 // of MT sig slots clients must call disconnect. This class is to make sure
186 // we destruct from the same thread on which is created.
187 class AsyncAddressResolverImpl : public base::NonThreadSafe,
188 public rtc::AsyncResolverInterface {
190 AsyncAddressResolverImpl(P2PSocketDispatcher* dispatcher);
191 ~AsyncAddressResolverImpl() override;
193 // rtc::AsyncResolverInterface interface.
194 void Start(const rtc::SocketAddress& addr) override;
195 bool GetResolvedAddress(int family, rtc::SocketAddress* addr) const override;
196 int GetError() const override;
197 void Destroy(bool wait) override;
200 virtual void OnAddressResolved(const net::IPAddressList& addresses);
202 scoped_refptr<P2PAsyncAddressResolver> resolver_;
203 int port_; // Port number in |addr| from Start() method.
204 std::vector<rtc::IPAddress> addresses_; // Resolved addresses.
207 IpcPacketSocket::IpcPacketSocket()
208 : type_(P2P_SOCKET_UDP),
209 message_loop_(base::MessageLoop::current()),
210 state_(IS_UNINITIALIZED),
211 send_bytes_available_(kMaximumInFlightBytes),
212 writable_signal_expected_(false),
214 max_discard_bytes_sequence_(0),
215 current_discard_bytes_sequence_(0),
216 packets_discarded_(0),
218 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
219 std::fill_n(options_, static_cast<int> (P2P_SOCKET_OPT_MAX),
220 kDefaultNonSetOptionValue);
223 IpcPacketSocket::~IpcPacketSocket() {
224 if (state_ == IS_OPENING || state_ == IS_OPEN ||
225 state_ == IS_ERROR) {
229 UMA_HISTOGRAM_COUNTS_10000("WebRTC.ApplicationMaxConsecutiveBytesDiscard",
230 max_discard_bytes_sequence_);
232 if (total_packets_ > 0) {
233 UMA_HISTOGRAM_PERCENTAGE("WebRTC.ApplicationPercentPacketsDiscarded",
234 (packets_discarded_ * 100) / total_packets_);
238 void IpcPacketSocket::TraceSendThrottlingState() const {
239 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
240 send_bytes_available_);
241 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
242 in_flight_packet_sizes_.size());
245 void IpcPacketSocket::IncrementDiscardCounters(size_t bytes_discarded) {
246 current_discard_bytes_sequence_ += bytes_discarded;
247 packets_discarded_++;
249 if (current_discard_bytes_sequence_ > max_discard_bytes_sequence_) {
250 max_discard_bytes_sequence_ = current_discard_bytes_sequence_;
254 bool IpcPacketSocket::Init(P2PSocketType type,
255 P2PSocketClientImpl* client,
256 const rtc::SocketAddress& local_address,
257 const rtc::SocketAddress& remote_address) {
258 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
259 DCHECK_EQ(state_, IS_UNINITIALIZED);
263 local_address_ = local_address;
264 remote_address_ = remote_address;
267 net::IPEndPoint local_endpoint;
268 if (!jingle_glue::SocketAddressToIPEndPoint(
269 local_address, &local_endpoint)) {
273 net::IPEndPoint remote_endpoint;
274 if (!remote_address.IsNil()) {
275 DCHECK(IsTcpClientSocket(type_));
277 if (remote_address.IsUnresolvedIP()) {
279 net::IPEndPoint(net::IPAddressNumber(), remote_address.port());
281 if (!jingle_glue::SocketAddressToIPEndPoint(remote_address,
288 // We need to send both resolved and unresolved address in Init. Unresolved
289 // address will be used in case of TLS for certificate hostname matching.
290 // Certificate will be tied to domain name not to IP address.
291 P2PHostAndIPEndPoint remote_info(remote_address.hostname(), remote_endpoint);
293 client->Init(type, local_endpoint, remote_info, this);
298 void IpcPacketSocket::InitAcceptedTcp(
299 P2PSocketClient* client,
300 const rtc::SocketAddress& local_address,
301 const rtc::SocketAddress& remote_address) {
302 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
303 DCHECK_EQ(state_, IS_UNINITIALIZED);
306 local_address_ = local_address;
307 remote_address_ = remote_address;
309 TraceSendThrottlingState();
310 client_->SetDelegate(this);
313 // rtc::AsyncPacketSocket interface.
314 rtc::SocketAddress IpcPacketSocket::GetLocalAddress() const {
315 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
316 return local_address_;
319 rtc::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
320 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
321 return remote_address_;
324 int IpcPacketSocket::Send(const void *data, size_t data_size,
325 const rtc::PacketOptions& options) {
326 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
327 return SendTo(data, data_size, remote_address_, options);
330 int IpcPacketSocket::SendTo(const void *data, size_t data_size,
331 const rtc::SocketAddress& address,
332 const rtc::PacketOptions& options) {
333 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
336 case IS_UNINITIALIZED:
346 // Continue sending the packet.
350 if (data_size == 0) {
357 if (data_size > send_bytes_available_) {
358 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
359 TRACE_EVENT_SCOPE_THREAD,
361 client_->GetSocketID());
362 if (!writable_signal_expected_) {
363 WebRtcLogMessage(base::StringPrintf(
364 "IpcPacketSocket: sending is blocked. %d packets_in_flight.",
365 static_cast<int>(in_flight_packet_sizes_.size())));
367 writable_signal_expected_ = true;
370 error_ = EWOULDBLOCK;
371 IncrementDiscardCounters(data_size);
374 current_discard_bytes_sequence_ = 0;
377 net::IPEndPoint address_chrome;
378 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
379 VLOG(1) << "Failed to convert remote address to IPEndPoint: address = "
380 << address.ToSensitiveString() << ", remote_address_ = "
381 << remote_address_.ToSensitiveString();
387 send_bytes_available_ -= data_size;
388 in_flight_packet_sizes_.push_back(data_size);
389 TraceSendThrottlingState();
391 const char* data_char = reinterpret_cast<const char*>(data);
392 std::vector<char> data_vector(data_char, data_char + data_size);
393 client_->SendWithDscp(address_chrome, data_vector, options);
395 // Fake successful send. The caller ignores result anyway.
399 int IpcPacketSocket::Close() {
400 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
408 rtc::AsyncPacketSocket::State IpcPacketSocket::GetState() const {
409 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
412 case IS_UNINITIALIZED:
417 return STATE_BINDING;
420 if (IsTcpClientSocket(type_)) {
421 return STATE_CONNECTED;
435 int IpcPacketSocket::GetOption(rtc::Socket::Option option, int* value) {
436 P2PSocketOption p2p_socket_option = P2P_SOCKET_OPT_MAX;
437 if (!JingleSocketOptionToP2PSocketOption(option, &p2p_socket_option)) {
438 // unsupported option.
442 *value = options_[p2p_socket_option];
446 int IpcPacketSocket::SetOption(rtc::Socket::Option option, int value) {
447 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
449 P2PSocketOption p2p_socket_option = P2P_SOCKET_OPT_MAX;
450 if (!JingleSocketOptionToP2PSocketOption(option, &p2p_socket_option)) {
451 // Option is not supported.
455 options_[p2p_socket_option] = value;
457 if (state_ == IS_OPEN) {
458 // Options will be applied when state becomes IS_OPEN in OnOpen.
459 return DoSetOption(p2p_socket_option, value);
464 int IpcPacketSocket::DoSetOption(P2PSocketOption option, int value) {
465 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
466 DCHECK_EQ(state_, IS_OPEN);
468 client_->SetOption(option, value);
472 int IpcPacketSocket::GetError() const {
473 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
477 void IpcPacketSocket::SetError(int error) {
478 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
482 void IpcPacketSocket::OnOpen(const net::IPEndPoint& local_address,
483 const net::IPEndPoint& remote_address) {
484 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
486 if (!jingle_glue::IPEndPointToSocketAddress(local_address, &local_address_)) {
487 // Always expect correct IPv4 address to be allocated.
494 TraceSendThrottlingState();
496 // Set all pending options if any.
497 for (int i = 0; i < P2P_SOCKET_OPT_MAX; ++i) {
498 if (options_[i] != kDefaultNonSetOptionValue)
499 DoSetOption(static_cast<P2PSocketOption> (i), options_[i]);
502 SignalAddressReady(this, local_address_);
503 if (IsTcpClientSocket(type_)) {
504 // If remote address is unresolved, set resolved remote IP address received
505 // in the callback. This address will be used while sending the packets
507 if (remote_address_.IsUnresolvedIP()) {
508 rtc::SocketAddress jingle_socket_address;
509 if (!jingle_glue::IPEndPointToSocketAddress(
510 remote_address, &jingle_socket_address)) {
513 // Set only the IP address.
514 remote_address_.SetResolvedIP(jingle_socket_address.ipaddr());
517 // SignalConnect after updating the |remote_address_| so that the listener
518 // can get the resolved remote address.
523 void IpcPacketSocket::OnIncomingTcpConnection(
524 const net::IPEndPoint& address,
525 P2PSocketClient* client) {
526 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
528 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
530 rtc::SocketAddress remote_address;
531 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
532 // Always expect correct IPv4 address to be allocated.
535 socket->InitAcceptedTcp(client, local_address_, remote_address);
536 SignalNewConnection(this, socket.release());
539 void IpcPacketSocket::OnSendComplete() {
540 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
542 CHECK(!in_flight_packet_sizes_.empty());
543 send_bytes_available_ += in_flight_packet_sizes_.front();
545 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
547 in_flight_packet_sizes_.pop_front();
548 TraceSendThrottlingState();
550 if (writable_signal_expected_ && send_bytes_available_ > 0) {
551 WebRtcLogMessage(base::StringPrintf(
552 "IpcPacketSocket: sending is unblocked. %d packets in flight.",
553 static_cast<int>(in_flight_packet_sizes_.size())));
555 SignalReadyToSend(this);
556 writable_signal_expected_ = false;
560 void IpcPacketSocket::OnError() {
561 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
562 bool was_closed = (state_ == IS_ERROR || state_ == IS_CLOSED);
564 error_ = ECONNABORTED;
566 SignalClose(this, 0);
570 void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address,
571 const std::vector<char>& data,
572 const base::TimeTicks& timestamp) {
573 DCHECK_EQ(base::MessageLoop::current(), message_loop_);
575 rtc::SocketAddress address_lj;
576 if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) {
577 // We should always be able to convert address here because we
578 // don't expect IPv6 address on IPv4 connections.
583 rtc::PacketTime packet_time(timestamp.ToInternalValue(), 0);
584 SignalReadPacket(this, &data[0], data.size(), address_lj,
588 AsyncAddressResolverImpl::AsyncAddressResolverImpl(
589 P2PSocketDispatcher* dispatcher)
590 : resolver_(new P2PAsyncAddressResolver(dispatcher)) {
593 AsyncAddressResolverImpl::~AsyncAddressResolverImpl() {
596 void AsyncAddressResolverImpl::Start(const rtc::SocketAddress& addr) {
597 DCHECK(CalledOnValidThread());
598 // Copy port number from |addr|. |port_| must be copied
599 // when resolved address is returned in GetResolvedAddress.
602 resolver_->Start(addr, base::Bind(
603 &AsyncAddressResolverImpl::OnAddressResolved,
604 base::Unretained(this)));
607 bool AsyncAddressResolverImpl::GetResolvedAddress(
608 int family, rtc::SocketAddress* addr) const {
609 DCHECK(CalledOnValidThread());
611 if (addresses_.empty())
614 for (size_t i = 0; i < addresses_.size(); ++i) {
615 if (family == addresses_[i].family()) {
616 addr->SetResolvedIP(addresses_[i]);
617 addr->SetPort(port_);
624 int AsyncAddressResolverImpl::GetError() const {
625 DCHECK(CalledOnValidThread());
626 return addresses_.empty() ? -1 : 0;
629 void AsyncAddressResolverImpl::Destroy(bool wait) {
630 DCHECK(CalledOnValidThread());
632 // Libjingle doesn't need this object any more and it's not going to delete
637 void AsyncAddressResolverImpl::OnAddressResolved(
638 const net::IPAddressList& addresses) {
639 DCHECK(CalledOnValidThread());
640 for (size_t i = 0; i < addresses.size(); ++i) {
641 rtc::SocketAddress socket_address;
642 if (!jingle_glue::IPEndPointToSocketAddress(
643 net::IPEndPoint(addresses[i], 0), &socket_address)) {
646 addresses_.push_back(socket_address.ipaddr());
653 IpcPacketSocketFactory::IpcPacketSocketFactory(
654 P2PSocketDispatcher* socket_dispatcher)
655 : socket_dispatcher_(socket_dispatcher) {
658 IpcPacketSocketFactory::~IpcPacketSocketFactory() {
661 rtc::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket(
662 const rtc::SocketAddress& local_address, uint16 min_port, uint16 max_port) {
663 rtc::SocketAddress crome_address;
664 P2PSocketClientImpl* socket_client =
665 new P2PSocketClientImpl(socket_dispatcher_);
666 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
667 // TODO(sergeyu): Respect local_address and port limits here (need
668 // to pass them over IPC channel to the browser).
669 if (!socket->Init(P2P_SOCKET_UDP, socket_client,
670 local_address, rtc::SocketAddress())) {
673 return socket.release();
676 rtc::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket(
677 const rtc::SocketAddress& local_address, uint16 min_port, uint16 max_port,
679 // TODO(sergeyu): Implement SSL support.
680 if (opts & rtc::PacketSocketFactory::OPT_SSLTCP)
683 P2PSocketType type = (opts & rtc::PacketSocketFactory::OPT_STUN) ?
684 P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER;
685 P2PSocketClientImpl* socket_client =
686 new P2PSocketClientImpl(socket_dispatcher_);
687 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
688 if (!socket->Init(type, socket_client, local_address,
689 rtc::SocketAddress())) {
692 return socket.release();
695 rtc::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket(
696 const rtc::SocketAddress& local_address,
697 const rtc::SocketAddress& remote_address,
698 const rtc::ProxyInfo& proxy_info,
699 const std::string& user_agent, int opts) {
701 if (opts & rtc::PacketSocketFactory::OPT_SSLTCP) {
702 type = (opts & rtc::PacketSocketFactory::OPT_STUN) ?
703 P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT;
704 } else if (opts & rtc::PacketSocketFactory::OPT_TLS) {
705 type = (opts & rtc::PacketSocketFactory::OPT_STUN) ?
706 P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT;
708 type = (opts & rtc::PacketSocketFactory::OPT_STUN) ?
709 P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT;
711 P2PSocketClientImpl* socket_client =
712 new P2PSocketClientImpl(socket_dispatcher_);
713 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
714 if (!socket->Init(type, socket_client, local_address, remote_address))
716 return socket.release();
719 rtc::AsyncResolverInterface*
720 IpcPacketSocketFactory::CreateAsyncResolver() {
721 scoped_ptr<AsyncAddressResolverImpl> resolver(
722 new AsyncAddressResolverImpl(socket_dispatcher_));
723 return resolver.release();
726 } // namespace content