3 * Copyright 2004--2005, Google Inc.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include "talk/p2p/base/p2ptransportchannel.h"
31 #include "talk/base/common.h"
32 #include "talk/base/crc32.h"
33 #include "talk/base/logging.h"
34 #include "talk/base/stringencode.h"
35 #include "talk/p2p/base/common.h"
36 #include "talk/p2p/base/relayport.h" // For RELAY_PORT_TYPE.
37 #include "talk/p2p/base/stunport.h" // For STUN_PORT_TYPE.
41 // messages for queuing up work for ourselves
47 // When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)
48 // for pinging. When the socket is writable, we will use only 1 Kbps because
49 // we don't want to degrade the quality on a modem. These numbers should work
50 // well on a 28.8K modem, which is the slowest connection on which the voice
51 // quality is reasonable at all.
52 static const uint32 PING_PACKET_SIZE = 60 * 8;
53 static const uint32 WRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 1000; // 480ms
54 static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms
56 // If there is a current writable connection, then we will also try hard to
57 // make sure it is pinged at this rate.
58 static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit
60 // The minimum improvement in RTT that justifies a switch.
61 static const double kMinImprovement = 10;
63 cricket::PortInterface::CandidateOrigin GetOrigin(cricket::PortInterface* port,
64 cricket::PortInterface* origin_port) {
66 return cricket::PortInterface::ORIGIN_MESSAGE;
67 else if (port == origin_port)
68 return cricket::PortInterface::ORIGIN_THIS_PORT;
70 return cricket::PortInterface::ORIGIN_OTHER_PORT;
73 // Compares two connections based only on static information about them.
74 int CompareConnectionCandidates(cricket::Connection* a,
75 cricket::Connection* b) {
76 // Compare connection priority. Lower values get sorted last.
77 if (a->priority() > b->priority())
79 if (a->priority() < b->priority())
82 // If we're still tied at this point, prefer a younger generation.
83 return (a->remote_candidate().generation() + a->port()->generation()) -
84 (b->remote_candidate().generation() + b->port()->generation());
87 // Compare two connections based on their writability and static preferences.
88 int CompareConnections(cricket::Connection *a, cricket::Connection *b) {
89 // Sort based on write-state. Better states have lower values.
90 if (a->write_state() < b->write_state())
92 if (a->write_state() > b->write_state())
95 // Compare the candidate information.
96 return CompareConnectionCandidates(a, b);
99 // Wraps the comparison connection into a less than operator that puts higher
100 // priority writable connections first.
101 class ConnectionCompare {
103 bool operator()(const cricket::Connection *ca,
104 const cricket::Connection *cb) {
105 cricket::Connection* a = const_cast<cricket::Connection*>(ca);
106 cricket::Connection* b = const_cast<cricket::Connection*>(cb);
108 ASSERT(a->port()->IceProtocol() == b->port()->IceProtocol());
110 // Compare first on writability and static preferences.
111 int cmp = CompareConnections(a, b);
117 // Otherwise, sort based on latency estimate.
118 return a->rtt() < b->rtt();
120 // Should we bother checking for the last connection that last received
121 // data? It would help rendezvous on the connection that is also receiving
124 // TODO: Yes we should definitely do this. The TCP protocol gains
125 // efficiency by being used bidirectionally, as opposed to two separate
126 // unidirectional streams. This test should probably occur before
127 // comparison of local prefs (assuming combined prefs are the same). We
128 // need to be careful though, not to bounce back and forth with both sides
129 // trying to rendevous with the other.
133 // Determines whether we should switch between two connections, based first on
134 // static preferences and then (if those are equal) on latency estimates.
135 bool ShouldSwitch(cricket::Connection* a_conn, cricket::Connection* b_conn) {
136 if (a_conn == b_conn)
139 if (!a_conn || !b_conn) // don't think the latter should happen
142 int prefs_cmp = CompareConnections(a_conn, b_conn);
148 return b_conn->rtt() <= a_conn->rtt() + kMinImprovement;
151 } // unnamed namespace
155 P2PTransportChannel::P2PTransportChannel(const std::string& content_name,
157 P2PTransport* transport,
158 PortAllocator *allocator) :
159 TransportChannelImpl(content_name, component),
160 transport_(transport),
161 allocator_(allocator),
162 worker_thread_(talk_base::Thread::Current()),
163 incoming_only_(false),
164 waiting_for_signaling_(false),
166 best_connection_(NULL),
167 pending_best_connection_(NULL),
169 was_writable_(false),
170 protocol_type_(ICEPROTO_HYBRID),
171 remote_ice_mode_(ICEMODE_FULL),
172 ice_role_(ICEROLE_UNKNOWN),
174 remote_candidate_generation_(0) {
177 P2PTransportChannel::~P2PTransportChannel() {
178 ASSERT(worker_thread_ == talk_base::Thread::Current());
180 for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
181 delete allocator_sessions_[i];
184 // Add the allocator session to our list so that we know which sessions
186 void P2PTransportChannel::AddAllocatorSession(PortAllocatorSession* session) {
187 session->set_generation(static_cast<uint32>(allocator_sessions_.size()));
188 allocator_sessions_.push_back(session);
190 // We now only want to apply new candidates that we receive to the ports
191 // created by this new session because these are replacing those of the
192 // previous sessions.
195 session->SignalPortReady.connect(this, &P2PTransportChannel::OnPortReady);
196 session->SignalCandidatesReady.connect(
197 this, &P2PTransportChannel::OnCandidatesReady);
198 session->SignalCandidatesAllocationDone.connect(
199 this, &P2PTransportChannel::OnCandidatesAllocationDone);
200 session->StartGettingPorts();
203 void P2PTransportChannel::AddConnection(Connection* connection) {
204 connections_.push_back(connection);
205 connection->set_remote_ice_mode(remote_ice_mode_);
206 connection->SignalReadPacket.connect(
207 this, &P2PTransportChannel::OnReadPacket);
208 connection->SignalReadyToSend.connect(
209 this, &P2PTransportChannel::OnReadyToSend);
210 connection->SignalStateChange.connect(
211 this, &P2PTransportChannel::OnConnectionStateChange);
212 connection->SignalDestroyed.connect(
213 this, &P2PTransportChannel::OnConnectionDestroyed);
214 connection->SignalUseCandidate.connect(
215 this, &P2PTransportChannel::OnUseCandidate);
218 void P2PTransportChannel::SetIceRole(IceRole ice_role) {
219 ASSERT(worker_thread_ == talk_base::Thread::Current());
220 if (ice_role_ != ice_role) {
221 ice_role_ = ice_role;
222 for (std::vector<PortInterface *>::iterator it = ports_.begin();
223 it != ports_.end(); ++it) {
224 (*it)->SetIceRole(ice_role);
229 void P2PTransportChannel::SetIceTiebreaker(uint64 tiebreaker) {
230 ASSERT(worker_thread_ == talk_base::Thread::Current());
231 if (!ports_.empty()) {
233 << "Attempt to change tiebreaker after Port has been allocated.";
237 tiebreaker_ = tiebreaker;
240 bool P2PTransportChannel::GetIceProtocolType(IceProtocolType* type) const {
241 *type = protocol_type_;
245 void P2PTransportChannel::SetIceProtocolType(IceProtocolType type) {
246 ASSERT(worker_thread_ == talk_base::Thread::Current());
248 protocol_type_ = type;
249 for (std::vector<PortInterface *>::iterator it = ports_.begin();
250 it != ports_.end(); ++it) {
251 (*it)->SetIceProtocolType(protocol_type_);
255 void P2PTransportChannel::SetIceCredentials(const std::string& ice_ufrag,
256 const std::string& ice_pwd) {
257 ASSERT(worker_thread_ == talk_base::Thread::Current());
258 bool ice_restart = false;
259 if (!ice_ufrag_.empty() && !ice_pwd_.empty()) {
260 // Restart candidate allocation if there is any change in either
261 // ice ufrag or password.
262 ice_restart = (ice_ufrag_ != ice_ufrag) || (ice_pwd_!= ice_pwd);
265 ice_ufrag_ = ice_ufrag;
269 // Restart candidate gathering.
274 void P2PTransportChannel::SetRemoteIceCredentials(const std::string& ice_ufrag,
275 const std::string& ice_pwd) {
276 ASSERT(worker_thread_ == talk_base::Thread::Current());
277 bool ice_restart = false;
278 if (!remote_ice_ufrag_.empty() && !remote_ice_pwd_.empty()) {
279 ice_restart = (remote_ice_ufrag_ != ice_ufrag) ||
280 (remote_ice_pwd_!= ice_pwd);
283 remote_ice_ufrag_ = ice_ufrag;
284 remote_ice_pwd_ = ice_pwd;
287 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245.
288 // Therefore we need to keep track of the remote ice restart so
289 // newer connections are prioritized over the older.
290 ++remote_candidate_generation_;
294 void P2PTransportChannel::SetRemoteIceMode(IceMode mode) {
295 remote_ice_mode_ = mode;
298 // Go into the state of processing candidates, and running in general
299 void P2PTransportChannel::Connect() {
300 ASSERT(worker_thread_ == talk_base::Thread::Current());
301 if (ice_ufrag_.empty() || ice_pwd_.empty()) {
303 LOG(LS_ERROR) << "P2PTransportChannel::Connect: The ice_ufrag_ and the "
304 << "ice_pwd_ are not set.";
308 // Kick off an allocator session
311 // Start pinging as the ports come in.
312 thread()->Post(this, MSG_PING);
315 // Reset the socket, clear up any previous allocations and start over
316 void P2PTransportChannel::Reset() {
317 ASSERT(worker_thread_ == talk_base::Thread::Current());
319 // Get rid of all the old allocators. This should clean up everything.
320 for (uint32 i = 0; i < allocator_sessions_.size(); ++i)
321 delete allocator_sessions_[i];
323 allocator_sessions_.clear();
325 connections_.clear();
326 best_connection_ = NULL;
328 // Forget about all of the candidates we got before.
329 remote_candidates_.clear();
331 // Revert to the initial state.
335 // Reinitialize the rest of our state.
336 waiting_for_signaling_ = false;
339 // If we allocated before, start a new one now.
340 if (transport_->connect_requested())
343 // Start pinging as the ports come in.
344 thread()->Clear(this);
345 thread()->Post(this, MSG_PING);
348 // A new port is available, attempt to make connections for it
349 void P2PTransportChannel::OnPortReady(PortAllocatorSession *session,
350 PortInterface* port) {
351 ASSERT(worker_thread_ == talk_base::Thread::Current());
353 // Set in-effect options on the new port
354 for (OptionMap::const_iterator it = options_.begin();
355 it != options_.end();
357 int val = port->SetOption(it->first, it->second);
359 LOG_J(LS_WARNING, port) << "SetOption(" << it->first
360 << ", " << it->second
361 << ") failed: " << port->GetError();
365 // Remember the ports and candidates, and signal that candidates are ready.
366 // The session will handle this, and send an initiate/accept/modify message
367 // if one is pending.
369 port->SetIceProtocolType(protocol_type_);
370 port->SetIceRole(ice_role_);
371 port->SetIceTiebreaker(tiebreaker_);
372 ports_.push_back(port);
373 port->SignalUnknownAddress.connect(
374 this, &P2PTransportChannel::OnUnknownAddress);
375 port->SignalDestroyed.connect(this, &P2PTransportChannel::OnPortDestroyed);
376 port->SignalRoleConflict.connect(
377 this, &P2PTransportChannel::OnRoleConflict);
379 // Attempt to create a connection from this new port to all of the remote
380 // candidates that we were given so far.
382 std::vector<RemoteCandidate>::iterator iter;
383 for (iter = remote_candidates_.begin(); iter != remote_candidates_.end();
385 CreateConnection(port, *iter, iter->origin_port(), false);
391 // A new candidate is available, let listeners know
392 void P2PTransportChannel::OnCandidatesReady(
393 PortAllocatorSession *session, const std::vector<Candidate>& candidates) {
394 ASSERT(worker_thread_ == talk_base::Thread::Current());
395 for (size_t i = 0; i < candidates.size(); ++i) {
396 SignalCandidateReady(this, candidates[i]);
400 void P2PTransportChannel::OnCandidatesAllocationDone(
401 PortAllocatorSession* session) {
402 ASSERT(worker_thread_ == talk_base::Thread::Current());
403 SignalCandidatesAllocationDone(this);
406 // Handle stun packets
407 void P2PTransportChannel::OnUnknownAddress(
409 const talk_base::SocketAddress& address, ProtocolType proto,
410 IceMessage* stun_msg, const std::string &remote_username,
412 ASSERT(worker_thread_ == talk_base::Thread::Current());
414 // Port has received a valid stun packet from an address that no Connection
415 // is currently available for. See if we already have a candidate with the
416 // address. If it isn't we need to create new candidate for it.
418 // Determine if the remote candidates use shared ufrag.
419 bool ufrag_per_port = false;
420 std::vector<RemoteCandidate>::iterator it;
421 if (remote_candidates_.size() > 0) {
422 it = remote_candidates_.begin();
423 std::string username = it->username();
424 for (; it != remote_candidates_.end(); ++it) {
425 if (it->username() != username) {
426 ufrag_per_port = true;
432 const Candidate* candidate = NULL;
433 bool known_username = false;
434 std::string remote_password;
435 for (it = remote_candidates_.begin(); it != remote_candidates_.end(); ++it) {
436 if (it->username() == remote_username) {
437 remote_password = it->password();
438 known_username = true;
439 if (ufrag_per_port ||
440 (it->address() == address &&
441 it->protocol() == ProtoToString(proto))) {
445 // We don't want to break here because we may find a match of the address
450 if (!known_username) {
452 // When Ports are muxed, SignalUnknownAddress is delivered to all
453 // P2PTransportChannel belong to a session. Return from here will
454 // save us from sending stun binding error message from incorrect channel.
457 // Don't know about this username, the request is bogus
458 // This sometimes happens if a binding response comes in before the ACCEPT
459 // message. It is totally valid; the retry state machine will try again.
460 port->SendBindingErrorResponse(stun_msg, address,
461 STUN_ERROR_STALE_CREDENTIALS, STUN_ERROR_REASON_STALE_CREDENTIALS);
465 Candidate new_remote_candidate;
466 if (candidate != NULL) {
467 new_remote_candidate = *candidate;
468 if (ufrag_per_port) {
469 new_remote_candidate.set_address(address);
472 // Create a new candidate with this address.
475 if (port->IceProtocol() == ICEPROTO_RFC5245) {
476 type = PRFLX_PORT_TYPE;
478 // G-ICE doesn't support prflx candidate.
479 // We set candidate type to STUN_PORT_TYPE if the binding request comes
480 // from a relay port or the shared socket is used. Otherwise we use the
481 // port's type as the candidate type.
482 if (port->Type() == RELAY_PORT_TYPE || port->SharedSocket()) {
483 type = STUN_PORT_TYPE;
489 std::string id = talk_base::CreateRandomString(8);
490 new_remote_candidate = Candidate(
491 id, component(), ProtoToString(proto), address,
492 0, remote_username, remote_password, type,
493 port->Network()->name(), 0U,
494 talk_base::ToString<uint32>(talk_base::ComputeCrc32(id)));
495 new_remote_candidate.set_priority(
496 new_remote_candidate.GetPriority(ICE_TYPE_PREFERENCE_SRFLX,
497 port->Network()->preference()));
500 if (port->IceProtocol() == ICEPROTO_RFC5245) {
502 // If the source transport address of the request does not match any
503 // existing remote candidates, it represents a new peer reflexive remote
506 // The priority of the candidate is set to the PRIORITY attribute
508 const StunUInt32Attribute* priority_attr =
509 stun_msg->GetUInt32(STUN_ATTR_PRIORITY);
510 if (!priority_attr) {
511 LOG(LS_WARNING) << "P2PTransportChannel::OnUnknownAddress - "
512 << "No STUN_ATTR_PRIORITY found in the "
513 << "stun request message";
514 port->SendBindingErrorResponse(stun_msg, address,
515 STUN_ERROR_BAD_REQUEST,
516 STUN_ERROR_REASON_BAD_REQUEST);
519 new_remote_candidate.set_priority(priority_attr->value());
521 // RFC5245, the agent constructs a pair whose local candidate is equal to
522 // the transport address on which the STUN request was received, and a
523 // remote candidate equal to the source transport address where the
524 // request came from.
526 // There shouldn't be an existing connection with this remote address.
527 // When ports are muxed, this channel might get multiple unknown address
528 // signals. In that case if the connection is already exists, we should
529 // simply ignore the signal othewise send server error.
530 if (port->GetConnection(new_remote_candidate.address())) {
532 LOG(LS_INFO) << "Connection already exists for peer reflexive "
533 << "candidate: " << new_remote_candidate.ToString();
537 port->SendBindingErrorResponse(stun_msg, address,
538 STUN_ERROR_SERVER_ERROR,
539 STUN_ERROR_REASON_SERVER_ERROR);
544 Connection* connection = port->CreateConnection(
545 new_remote_candidate, cricket::PortInterface::ORIGIN_THIS_PORT);
548 port->SendBindingErrorResponse(stun_msg, address,
549 STUN_ERROR_SERVER_ERROR,
550 STUN_ERROR_REASON_SERVER_ERROR);
554 AddConnection(connection);
555 connection->ReceivedPing();
557 // Send the pinger a successful stun response.
558 port->SendBindingResponse(stun_msg, address);
560 // Update the list of connections since we just added another. We do this
561 // after sending the response since it could (in principle) delete the
562 // connection in question.
565 // Check for connectivity to this address. Create connections
566 // to this address across all local ports. First, add this as a new remote
568 if (!CreateConnections(new_remote_candidate, port, true)) {
569 // Hopefully this won't occur, because changing a destination address
570 // shouldn't cause a new connection to fail
572 port->SendBindingErrorResponse(stun_msg, address, STUN_ERROR_SERVER_ERROR,
573 STUN_ERROR_REASON_SERVER_ERROR);
577 // Send the pinger a successful stun response.
578 port->SendBindingResponse(stun_msg, address);
580 // Update the list of connections since we just added another. We do this
581 // after sending the response since it could (in principle) delete the
582 // connection in question.
587 void P2PTransportChannel::OnRoleConflict(PortInterface* port) {
588 SignalRoleConflict(this); // STUN ping will be sent when SetRole is called
592 // When the signalling channel is ready, we can really kick off the allocator
593 void P2PTransportChannel::OnSignalingReady() {
594 ASSERT(worker_thread_ == talk_base::Thread::Current());
595 if (waiting_for_signaling_) {
596 waiting_for_signaling_ = false;
597 AddAllocatorSession(allocator_->CreateSession(
598 SessionId(), content_name(), component(), ice_ufrag_, ice_pwd_));
602 void P2PTransportChannel::OnUseCandidate(Connection* conn) {
603 ASSERT(worker_thread_ == talk_base::Thread::Current());
604 ASSERT(ice_role_ == ICEROLE_CONTROLLED);
605 ASSERT(protocol_type_ == ICEPROTO_RFC5245);
606 if (conn->write_state() == Connection::STATE_WRITABLE) {
607 if (best_connection_ != conn) {
608 pending_best_connection_ = NULL;
609 SwitchBestConnectionTo(conn);
610 // Now we have selected the best connection, time to prune other existing
611 // connections and update the read/write state of the channel.
615 pending_best_connection_ = conn;
619 void P2PTransportChannel::OnCandidate(const Candidate& candidate) {
620 ASSERT(worker_thread_ == talk_base::Thread::Current());
622 // Create connections to this remote candidate.
623 CreateConnections(candidate, NULL, false);
625 // Resort the connections list, which may have new elements.
629 // Creates connections from all of the ports that we care about to the given
630 // remote candidate. The return value is true if we created a connection from
632 bool P2PTransportChannel::CreateConnections(const Candidate &remote_candidate,
633 PortInterface* origin_port,
635 ASSERT(worker_thread_ == talk_base::Thread::Current());
637 Candidate new_remote_candidate(remote_candidate);
638 new_remote_candidate.set_generation(
639 GetRemoteCandidateGeneration(remote_candidate));
640 // ICE candidates don't need to have username and password set, but
641 // the code below this (specifically, ConnectionRequest::Prepare in
642 // port.cc) uses the remote candidates's username. So, we set it
644 if (remote_candidate.username().empty()) {
645 new_remote_candidate.set_username(remote_ice_ufrag_);
647 if (remote_candidate.password().empty()) {
648 new_remote_candidate.set_password(remote_ice_pwd_);
651 // Add a new connection for this candidate to every port that allows such a
652 // connection (i.e., if they have compatible protocols) and that does not
653 // already have a connection to an equivalent candidate. We must be careful
654 // to make sure that the origin port is included, even if it was pruned,
655 // since that may be the only port that can create this connection.
657 bool created = false;
659 std::vector<PortInterface *>::reverse_iterator it;
660 for (it = ports_.rbegin(); it != ports_.rend(); ++it) {
661 if (CreateConnection(*it, new_remote_candidate, origin_port, readable)) {
662 if (*it == origin_port)
667 if ((origin_port != NULL) &&
668 std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) {
669 if (CreateConnection(
670 origin_port, new_remote_candidate, origin_port, readable))
674 // Remember this remote candidate so that we can add it to future ports.
675 RememberRemoteCandidate(new_remote_candidate, origin_port);
680 // Setup a connection object for the local and remote candidate combination.
681 // And then listen to connection object for changes.
682 bool P2PTransportChannel::CreateConnection(PortInterface* port,
683 const Candidate& remote_candidate,
684 PortInterface* origin_port,
686 // Look for an existing connection with this remote address. If one is not
687 // found, then we can create a new connection for this address.
688 Connection* connection = port->GetConnection(remote_candidate.address());
689 if (connection != NULL) {
690 // It is not legal to try to change any of the parameters of an existing
691 // connection; however, the other side can send a duplicate candidate.
692 if (!remote_candidate.IsEquivalent(connection->remote_candidate())) {
693 LOG(INFO) << "Attempt to change a remote candidate";
697 PortInterface::CandidateOrigin origin = GetOrigin(port, origin_port);
699 // Don't create connection if this is a candidate we received in a
700 // message and we are not allowed to make outgoing connections.
701 if (origin == cricket::PortInterface::ORIGIN_MESSAGE && incoming_only_)
704 connection = port->CreateConnection(remote_candidate, origin);
708 AddConnection(connection);
710 LOG_J(LS_INFO, this) << "Created connection with origin=" << origin << ", ("
711 << connections_.size() << " total)";
714 // If we are readable, it is because we are creating this in response to a
715 // ping from the other side. This will cause the state to become readable.
717 connection->ReceivedPing();
722 bool P2PTransportChannel::FindConnection(
723 cricket::Connection* connection) const {
724 std::vector<Connection*>::const_iterator citer =
725 std::find(connections_.begin(), connections_.end(), connection);
726 return citer != connections_.end();
729 uint32 P2PTransportChannel::GetRemoteCandidateGeneration(
730 const Candidate& candidate) {
731 if (protocol_type_ == ICEPROTO_GOOGLE) {
732 // The Candidate.generation() can be trusted. Nothing needs to be done.
733 return candidate.generation();
735 // |candidate.generation()| is not signaled in ICEPROTO_RFC5245.
736 // Therefore we need to keep track of the remote ice restart so
737 // newer connections are prioritized over the older.
738 ASSERT(candidate.generation() == 0 ||
739 candidate.generation() == remote_candidate_generation_);
740 return remote_candidate_generation_;
743 // Maintain our remote candidate list, adding this new remote one.
744 void P2PTransportChannel::RememberRemoteCandidate(
745 const Candidate& remote_candidate, PortInterface* origin_port) {
746 // Remove any candidates whose generation is older than this one. The
747 // presence of a new generation indicates that the old ones are not useful.
749 while (i < remote_candidates_.size()) {
750 if (remote_candidates_[i].generation() < remote_candidate.generation()) {
751 LOG(INFO) << "Pruning candidate from old generation: "
752 << remote_candidates_[i].address().ToSensitiveString();
753 remote_candidates_.erase(remote_candidates_.begin() + i);
759 // Make sure this candidate is not a duplicate.
760 for (uint32 i = 0; i < remote_candidates_.size(); ++i) {
761 if (remote_candidates_[i].IsEquivalent(remote_candidate)) {
762 LOG(INFO) << "Duplicate candidate: "
763 << remote_candidate.address().ToSensitiveString();
768 // Try this candidate for all future ports.
769 remote_candidates_.push_back(RemoteCandidate(remote_candidate, origin_port));
772 // Set options on ourselves is simply setting options on all of our available
774 int P2PTransportChannel::SetOption(talk_base::Socket::Option opt, int value) {
775 OptionMap::iterator it = options_.find(opt);
776 if (it == options_.end()) {
777 options_.insert(std::make_pair(opt, value));
778 } else if (it->second == value) {
784 for (uint32 i = 0; i < ports_.size(); ++i) {
785 int val = ports_[i]->SetOption(opt, value);
787 // Because this also occurs deferred, probably no point in reporting an
789 LOG(WARNING) << "SetOption(" << opt << ", " << value << ") failed: "
790 << ports_[i]->GetError();
796 // Send data to the other side, using our best connection.
797 int P2PTransportChannel::SendPacket(const char *data, size_t len,
798 const talk_base::PacketOptions& options,
800 ASSERT(worker_thread_ == talk_base::Thread::Current());
805 if (best_connection_ == NULL) {
806 error_ = EWOULDBLOCK;
810 int sent = best_connection_->Send(data, len, options);
813 error_ = best_connection_->GetError();
818 bool P2PTransportChannel::GetStats(ConnectionInfos *infos) {
819 ASSERT(worker_thread_ == talk_base::Thread::Current());
820 // Gather connection infos.
823 std::vector<Connection *>::const_iterator it;
824 for (it = connections_.begin(); it != connections_.end(); ++it) {
825 Connection *connection = *it;
827 info.best_connection = (best_connection_ == connection);
829 (connection->read_state() == Connection::STATE_READABLE);
831 (connection->write_state() == Connection::STATE_WRITABLE);
833 (connection->write_state() == Connection::STATE_WRITE_TIMEOUT);
834 info.new_connection = !connection->reported();
835 connection->set_reported(true);
836 info.rtt = connection->rtt();
837 info.sent_total_bytes = connection->sent_total_bytes();
838 info.sent_bytes_second = connection->sent_bytes_second();
839 info.recv_total_bytes = connection->recv_total_bytes();
840 info.recv_bytes_second = connection->recv_bytes_second();
841 info.local_candidate = connection->local_candidate();
842 info.remote_candidate = connection->remote_candidate();
843 info.key = connection;
844 infos->push_back(info);
850 talk_base::DiffServCodePoint P2PTransportChannel::DefaultDscpValue() const {
851 OptionMap::const_iterator it = options_.find(talk_base::Socket::OPT_DSCP);
852 if (it == options_.end()) {
853 return talk_base::DSCP_NO_CHANGE;
855 return static_cast<talk_base::DiffServCodePoint> (it->second);
858 // Begin allocate (or immediately re-allocate, if MSG_ALLOCATE pending)
859 void P2PTransportChannel::Allocate() {
860 // Time for a new allocator, lets make sure we have a signalling channel
861 // to communicate candidates through first.
862 waiting_for_signaling_ = true;
863 SignalRequestSignaling(this);
866 // Monitor connection states.
867 void P2PTransportChannel::UpdateConnectionStates() {
868 uint32 now = talk_base::Time();
870 // We need to copy the list of connections since some may delete themselves
871 // when we call UpdateState.
872 for (uint32 i = 0; i < connections_.size(); ++i)
873 connections_[i]->UpdateState(now);
876 // Prepare for best candidate sorting.
877 void P2PTransportChannel::RequestSort() {
879 worker_thread_->Post(this, MSG_SORT);
884 // Sort the available connections to find the best one. We also monitor
885 // the number of available connections and the current state.
886 void P2PTransportChannel::SortConnections() {
887 ASSERT(worker_thread_ == talk_base::Thread::Current());
889 // Make sure the connection states are up-to-date since this affects how they
891 UpdateConnectionStates();
893 if (protocol_type_ == ICEPROTO_HYBRID) {
894 // If we are in hybrid mode, we are not sending any ping requests, so there
895 // is no point in sorting the connections. In hybrid state, ports can have
896 // different protocol than hybrid and protocol may differ from one another.
897 // Instead just update the state of this channel
898 UpdateChannelState();
902 // Any changes after this point will require a re-sort.
905 // Get a list of the networks that we are using.
906 std::set<talk_base::Network*> networks;
907 for (uint32 i = 0; i < connections_.size(); ++i)
908 networks.insert(connections_[i]->port()->Network());
910 // Find the best alternative connection by sorting. It is important to note
911 // that amongst equal preference, writable connections, this will choose the
912 // one whose estimated latency is lowest. So it is the only one that we
913 // need to consider switching to.
915 ConnectionCompare cmp;
916 std::stable_sort(connections_.begin(), connections_.end(), cmp);
917 LOG(LS_VERBOSE) << "Sorting available connections:";
918 for (uint32 i = 0; i < connections_.size(); ++i) {
919 LOG(LS_VERBOSE) << connections_[i]->ToString();
922 Connection* top_connection = NULL;
923 if (connections_.size() > 0)
924 top_connection = connections_[0];
926 // We don't want to pick the best connections if channel is using RFC5245
927 // and it's mode is CONTROLLED, as connections will be selected by the
928 // CONTROLLING agent.
930 // If necessary, switch to the new choice.
931 if (protocol_type_ != ICEPROTO_RFC5245 || ice_role_ == ICEROLE_CONTROLLING) {
932 if (ShouldSwitch(best_connection_, top_connection))
933 SwitchBestConnectionTo(top_connection);
936 // We can prune any connection for which there is a writable connection on
937 // the same network with better or equal priority. We leave those with
938 // better priority just in case they become writable later (at which point,
939 // we would prune out the current best connection). We leave connections on
940 // other networks because they may not be using the same resources and they
941 // may represent very distinct paths over which we can switch.
942 std::set<talk_base::Network*>::iterator network;
943 for (network = networks.begin(); network != networks.end(); ++network) {
944 Connection* primier = GetBestConnectionOnNetwork(*network);
945 if (!primier || (primier->write_state() != Connection::STATE_WRITABLE))
948 for (uint32 i = 0; i < connections_.size(); ++i) {
949 if ((connections_[i] != primier) &&
950 (connections_[i]->port()->Network() == *network) &&
951 (CompareConnectionCandidates(primier, connections_[i]) >= 0)) {
952 connections_[i]->Prune();
957 // Check if all connections are timedout.
958 bool all_connections_timedout = true;
959 for (uint32 i = 0; i < connections_.size(); ++i) {
960 if (connections_[i]->write_state() != Connection::STATE_WRITE_TIMEOUT) {
961 all_connections_timedout = false;
966 // Now update the writable state of the channel with the information we have
968 if (best_connection_ && best_connection_->writable()) {
970 } else if (all_connections_timedout) {
976 // Update the state of this channel. This method is called whenever the
977 // state of any connection changes, so this is a good place to do this.
978 UpdateChannelState();
982 // Track the best connection, and let listeners know
983 void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) {
984 // Note: if conn is NULL, the previous best_connection_ has been destroyed,
986 Connection* old_best_connection = best_connection_;
987 best_connection_ = conn;
988 if (best_connection_) {
989 if (old_best_connection) {
990 LOG_J(LS_INFO, this) << "Previous best connection: "
991 << old_best_connection->ToString();
993 LOG_J(LS_INFO, this) << "New best connection: "
994 << best_connection_->ToString();
995 SignalRouteChange(this, best_connection_->remote_candidate());
997 LOG_J(LS_INFO, this) << "No best connection";
1001 void P2PTransportChannel::UpdateChannelState() {
1002 // The Handle* functions already set the writable state. We'll just double-
1004 bool writable = ((best_connection_ != NULL) &&
1005 (best_connection_->write_state() ==
1006 Connection::STATE_WRITABLE));
1007 ASSERT(writable == this->writable());
1008 if (writable != this->writable())
1009 LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch";
1011 bool readable = false;
1012 for (uint32 i = 0; i < connections_.size(); ++i) {
1013 if (connections_[i]->read_state() == Connection::STATE_READABLE) {
1018 set_readable(readable);
1021 // We checked the status of our connections and we had at least one that
1022 // was writable, go into the writable state.
1023 void P2PTransportChannel::HandleWritable() {
1024 ASSERT(worker_thread_ == talk_base::Thread::Current());
1026 for (uint32 i = 0; i < allocator_sessions_.size(); ++i) {
1027 if (allocator_sessions_[i]->IsGettingPorts()) {
1028 allocator_sessions_[i]->StopGettingPorts();
1033 was_writable_ = true;
1037 // Notify upper layer about channel not writable state, if it was before.
1038 void P2PTransportChannel::HandleNotWritable() {
1039 ASSERT(worker_thread_ == talk_base::Thread::Current());
1040 if (was_writable_) {
1041 was_writable_ = false;
1042 set_writable(false);
1046 void P2PTransportChannel::HandleAllTimedOut() {
1047 // Currently we are treating this as channel not writable.
1048 HandleNotWritable();
1051 // If we have a best connection, return it, otherwise return top one in the
1052 // list (later we will mark it best).
1053 Connection* P2PTransportChannel::GetBestConnectionOnNetwork(
1054 talk_base::Network* network) {
1055 // If the best connection is on this network, then it wins.
1056 if (best_connection_ && (best_connection_->port()->Network() == network))
1057 return best_connection_;
1059 // Otherwise, we return the top-most in sorted order.
1060 for (uint32 i = 0; i < connections_.size(); ++i) {
1061 if (connections_[i]->port()->Network() == network)
1062 return connections_[i];
1068 // Handle any queued up requests
1069 void P2PTransportChannel::OnMessage(talk_base::Message *pmsg) {
1070 switch (pmsg->message_id) {
1083 // Handle queued up sort request
1084 void P2PTransportChannel::OnSort() {
1085 // Resort the connections based on the new statistics.
1089 // Handle queued up ping request
1090 void P2PTransportChannel::OnPing() {
1091 // Make sure the states of the connections are up-to-date (since this affects
1092 // which ones are pingable).
1093 UpdateConnectionStates();
1095 // Find the oldest pingable connection and have it do a ping.
1096 Connection* conn = FindNextPingableConnection();
1098 PingConnection(conn);
1100 // Post ourselves a message to perform the next ping.
1101 uint32 delay = writable() ? WRITABLE_DELAY : UNWRITABLE_DELAY;
1102 thread()->PostDelayed(delay, this, MSG_PING);
1105 // Is the connection in a state for us to even consider pinging the other side?
1106 bool P2PTransportChannel::IsPingable(Connection* conn) {
1107 // An unconnected connection cannot be written to at all, so pinging is out
1109 if (!conn->connected())
1113 // If we are writable, then we only want to ping connections that could be
1114 // better than this one, i.e., the ones that were not pruned.
1115 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT);
1117 // If we are not writable, then we need to try everything that might work.
1118 // This includes both connections that do not have write timeout as well as
1119 // ones that do not have read timeout. A connection could be readable but
1120 // be in write-timeout if we pruned it before. Since the other side is
1121 // still pinging it, it very well might still work.
1122 return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) ||
1123 (conn->read_state() != Connection::STATE_READ_TIMEOUT);
1127 // Returns the next pingable connection to ping. This will be the oldest
1128 // pingable connection unless we have a writable connection that is past the
1129 // maximum acceptable ping delay.
1130 Connection* P2PTransportChannel::FindNextPingableConnection() {
1131 uint32 now = talk_base::Time();
1132 if (best_connection_ &&
1133 (best_connection_->write_state() == Connection::STATE_WRITABLE) &&
1134 (best_connection_->last_ping_sent()
1135 + MAX_CURRENT_WRITABLE_DELAY <= now)) {
1136 return best_connection_;
1139 Connection* oldest_conn = NULL;
1140 uint32 oldest_time = 0xFFFFFFFF;
1141 for (uint32 i = 0; i < connections_.size(); ++i) {
1142 if (IsPingable(connections_[i])) {
1143 if (connections_[i]->last_ping_sent() < oldest_time) {
1144 oldest_time = connections_[i]->last_ping_sent();
1145 oldest_conn = connections_[i];
1152 // Apart from sending ping from |conn| this method also updates
1153 // |use_candidate_attr| flag. The criteria to update this flag is
1155 // Set USE-CANDIDATE if doing ICE AND this channel is in CONTROLLING AND
1156 // a) Channel is in FULL ICE AND
1157 // a.1) |conn| is the best connection OR
1158 // a.2) there is no best connection OR
1159 // a.3) the best connection is unwritable OR
1160 // a.4) |conn| has higher priority than best_connection.
1161 // b) we're doing LITE ICE AND
1162 // b.1) |conn| is the best_connection AND
1163 // b.2) |conn| is writable.
1164 void P2PTransportChannel::PingConnection(Connection* conn) {
1165 bool use_candidate = false;
1166 if (protocol_type_ == ICEPROTO_RFC5245) {
1167 if (remote_ice_mode_ == ICEMODE_FULL && ice_role_ == ICEROLE_CONTROLLING) {
1168 use_candidate = (conn == best_connection_) ||
1169 (best_connection_ == NULL) ||
1170 (!best_connection_->writable()) ||
1171 (conn->priority() > best_connection_->priority());
1172 } else if (remote_ice_mode_ == ICEMODE_LITE && conn == best_connection_) {
1173 use_candidate = best_connection_->writable();
1176 conn->set_use_candidate_attr(use_candidate);
1177 conn->Ping(talk_base::Time());
1180 // When a connection's state changes, we need to figure out who to use as
1181 // the best connection again. It could have become usable, or become unusable.
1182 void P2PTransportChannel::OnConnectionStateChange(Connection* connection) {
1183 ASSERT(worker_thread_ == talk_base::Thread::Current());
1185 // Update the best connection if the state change is from pending best
1186 // connection and role is controlled.
1187 if (protocol_type_ == ICEPROTO_RFC5245 && ice_role_ == ICEROLE_CONTROLLED) {
1188 if (connection == pending_best_connection_ && connection->writable()) {
1189 pending_best_connection_ = NULL;
1190 SwitchBestConnectionTo(connection);
1194 // We have to unroll the stack before doing this because we may be changing
1195 // the state of connections while sorting.
1199 // When a connection is removed, edit it out, and then update our best
1201 void P2PTransportChannel::OnConnectionDestroyed(Connection* connection) {
1202 ASSERT(worker_thread_ == talk_base::Thread::Current());
1204 // Note: the previous best_connection_ may be destroyed by now, so don't
1207 // Remove this connection from the list.
1208 std::vector<Connection*>::iterator iter =
1209 std::find(connections_.begin(), connections_.end(), connection);
1210 ASSERT(iter != connections_.end());
1211 connections_.erase(iter);
1213 LOG_J(LS_INFO, this) << "Removed connection ("
1214 << static_cast<int>(connections_.size()) << " remaining)";
1216 if (pending_best_connection_ == connection) {
1217 pending_best_connection_ = NULL;
1220 // If this is currently the best connection, then we need to pick a new one.
1221 // The call to SortConnections will pick a new one. It looks at the current
1222 // best connection in order to avoid switching between fairly similar ones.
1223 // Since this connection is no longer an option, we can just set best to NULL
1224 // and re-choose a best assuming that there was no best connection.
1225 if (best_connection_ == connection) {
1226 SwitchBestConnectionTo(NULL);
1230 SignalConnectionRemoved(this);
1233 // When a port is destroyed remove it from our list of ports to use for
1234 // connection attempts.
1235 void P2PTransportChannel::OnPortDestroyed(PortInterface* port) {
1236 ASSERT(worker_thread_ == talk_base::Thread::Current());
1238 // Remove this port from the list (if we didn't drop it already).
1239 std::vector<PortInterface*>::iterator iter =
1240 std::find(ports_.begin(), ports_.end(), port);
1241 if (iter != ports_.end())
1244 LOG(INFO) << "Removed port from p2p socket: "
1245 << static_cast<int>(ports_.size()) << " remaining";
1248 // We data is available, let listeners know
1249 void P2PTransportChannel::OnReadPacket(
1250 Connection *connection, const char *data, size_t len,
1251 const talk_base::PacketTime& packet_time) {
1252 ASSERT(worker_thread_ == talk_base::Thread::Current());
1254 // Do not deliver, if packet doesn't belong to the correct transport channel.
1255 if (!FindConnection(connection))
1258 // Let the client know of an incoming packet
1259 SignalReadPacket(this, data, len, packet_time, 0);
1262 void P2PTransportChannel::OnReadyToSend(Connection* connection) {
1263 if (connection == best_connection_ && writable()) {
1264 SignalReadyToSend(this);
1268 } // namespace cricket