#include <sstream>
#include <vector>
-#include "talk/base/buffer.h"
-#include "talk/base/helpers.h"
-#include "talk/base/logging.h"
#include "talk/media/base/codec.h"
#include "talk/media/base/constants.h"
#include "talk/media/base/streamparams.h"
#include "usrsctplib/usrsctp.h"
+#include "webrtc/base/buffer.h"
+#include "webrtc/base/helpers.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/base/safe_conversions.h"
namespace {
typedef cricket::SctpDataMediaChannel::StreamSet StreamSet;
} // namespace
namespace cricket {
-typedef talk_base::ScopedMessageData<SctpInboundPacket> InboundPacketMessage;
-typedef talk_base::ScopedMessageData<talk_base::Buffer> OutboundPacketMessage;
-
-// This is the SCTP port to use. It is passed along the wire and the listener
-// and connector must be using the same port. It is not related to the ports at
-// the IP level. (Corresponds to: sockaddr_conn.sconn_port in usrsctp.h)
-//
-// TODO(ldixon): Allow port to be set from higher level code.
-static const int kSctpDefaultPort = 5001;
-// TODO(ldixon): Find where this is defined, and also check is Sctp really
-// respects this.
-static const size_t kSctpMtu = 1280;
+typedef rtc::ScopedMessageData<SctpInboundPacket> InboundPacketMessage;
+typedef rtc::ScopedMessageData<rtc::Buffer> OutboundPacketMessage;
+
+// The biggest SCTP packet. Starting from a 'safe' wire MTU value of 1280,
+// take off 80 bytes for DTLS/TURN/TCP/IP overhead.
+static const size_t kSctpMtu = 1200;
enum {
MSG_SCTPINBOUNDPACKET = 1, // MessageData is SctpInboundPacket
- MSG_SCTPOUTBOUNDPACKET = 2, // MessageData is talk_base:Buffer
+ MSG_SCTPOUTBOUNDPACKET = 2, // MessageData is rtc:Buffer
};
struct SctpInboundPacket {
- talk_base::Buffer buffer;
+ rtc::Buffer buffer;
ReceiveDataParams params;
// The |flags| parameter is used by SCTP to distinguish notification packets
// from other types of packets.
<< "; set_df: " << std::hex << static_cast<int>(set_df);
// Note: We have to copy the data; the caller will delete it.
OutboundPacketMessage* msg =
- new OutboundPacketMessage(new talk_base::Buffer(data, length));
+ new OutboundPacketMessage(new rtc::Buffer(data, length));
channel->worker_thread()->Post(channel, MSG_SCTPOUTBOUNDPACKET, msg);
return 0;
}
// memory cleanup. But this does simplify code.
const SctpDataMediaChannel::PayloadProtocolIdentifier ppid =
static_cast<SctpDataMediaChannel::PayloadProtocolIdentifier>(
- talk_base::HostToNetwork32(rcv.rcv_ppid));
+ rtc::HostToNetwork32(rcv.rcv_ppid));
cricket::DataMessageType type = cricket::DMT_NONE;
if (!GetDataMediaType(ppid, &type) && !(flags & MSG_NOTIFICATION)) {
// It's neither a notification nor a recognized data packet. Drop it.
}
usrsctp_engines_count++;
- // We don't put in a codec because we don't want one offered when we
- // use the hybrid data engine.
- // codecs_.push_back(cricket::DataCodec( kGoogleSctpDataCodecId,
- // kGoogleSctpDataCodecName, 0));
+ cricket::DataCodec codec(kGoogleSctpDataCodecId, kGoogleSctpDataCodecName, 0);
+ codec.SetParam(kCodecParamPort, kSctpDefaultPort);
+ codecs_.push_back(codec);
}
SctpDataEngine::~SctpDataEngine() {
- // TODO(ldixon): There is currently a bug in teardown of usrsctp that blocks
- // indefintely if a finish call made too soon after close calls. So teardown
- // has been skipped. Once the bug is fixed, retest and enable teardown.
- //
- // usrsctp_engines_count--;
- // LOG(LS_VERBOSE) << "usrsctp_engines_count:" << usrsctp_engines_count;
- // if (usrsctp_engines_count == 0) {
- // if (usrsctp_finish() != 0) {
- // LOG(LS_WARNING) << "usrsctp_finish.";
- // }
- // }
+ usrsctp_engines_count--;
+ LOG(LS_VERBOSE) << "usrsctp_engines_count:" << usrsctp_engines_count;
+
+ if (usrsctp_engines_count == 0) {
+ // usrsctp_finish() may fail if it's called too soon after the channels are
+ // closed. Wait and try again until it succeeds for up to 3 seconds.
+ for (size_t i = 0; i < 300; ++i) {
+ if (usrsctp_finish() == 0)
+ return;
+
+ rtc::Thread::SleepMs(10);
+ }
+ LOG(LS_ERROR) << "Failed to shutdown usrsctp.";
+ }
}
DataMediaChannel* SctpDataEngine::CreateChannel(
if (data_channel_type != DCT_SCTP) {
return NULL;
}
- return new SctpDataMediaChannel(talk_base::Thread::Current());
+ return new SctpDataMediaChannel(rtc::Thread::Current());
}
-SctpDataMediaChannel::SctpDataMediaChannel(talk_base::Thread* thread)
+SctpDataMediaChannel::SctpDataMediaChannel(rtc::Thread* thread)
: worker_thread_(thread),
- local_port_(-1),
- remote_port_(-1),
+ local_port_(kSctpDefaultPort),
+ remote_port_(kSctpDefaultPort),
sock_(NULL),
sending_(false),
receiving_(false),
sconn.sconn_len = sizeof(sockaddr_conn);
#endif
// Note: conversion from int to uint16_t happens here.
- sconn.sconn_port = talk_base::HostToNetwork16(port);
+ sconn.sconn_port = rtc::HostToNetwork16(port);
sconn.sconn_addr = this;
return sconn;
}
return false;
}
+ // Disable MTU discovery
+ struct sctp_paddrparams params = {0};
+ params.spp_assoc_id = 0;
+ params.spp_flags = SPP_PMTUD_DISABLE;
+ params.spp_pathmtu = kSctpMtu;
+ if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, ¶ms,
+ sizeof(params))) {
+ LOG_ERRNO(LS_ERROR) << debug_name_
+ << "Failed to set SCTP_PEER_ADDR_PARAMS.";
+ return false;
+ }
+
// Subscribe to SCTP event notifications.
int event_types[] = {SCTP_ASSOC_CHANGE,
SCTP_PEER_ADDR_CHANGE,
bool SctpDataMediaChannel::Connect() {
LOG(LS_VERBOSE) << debug_name_ << "->Connect().";
- if (remote_port_ < 0) {
- remote_port_ = kSctpDefaultPort;
- }
- if (local_port_ < 0) {
- local_port_ = kSctpDefaultPort;
- }
// If we already have a socket connection, just return.
if (sock_) {
bool SctpDataMediaChannel::SendData(
const SendDataParams& params,
- const talk_base::Buffer& payload,
+ const rtc::Buffer& payload,
SendDataResult* result) {
if (result) {
// Preset |result| to assume an error. If SendData succeeds, we'll
struct sctp_sendv_spa spa = {0};
spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
spa.sendv_sndinfo.snd_sid = params.ssrc;
- spa.sendv_sndinfo.snd_ppid = talk_base::HostToNetwork32(
+ spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(
GetPpid(params.type));
// Ordered implies reliable.
send_res = usrsctp_sendv(sock_, payload.data(),
static_cast<size_t>(payload.length()),
NULL, 0, &spa,
- static_cast<socklen_t>(sizeof(spa)),
+ rtc::checked_cast<socklen_t>(sizeof(spa)),
SCTP_SENDV_SPA, 0);
if (send_res < 0) {
- if (errno == EWOULDBLOCK) {
+ if (errno == SCTP_EWOULDBLOCK) {
*result = SDR_BLOCK;
LOG(LS_INFO) << debug_name_ << "->SendData(...): EWOULDBLOCK returned";
} else {
// Called by network interface when a packet has been received.
void SctpDataMediaChannel::OnPacketReceived(
- talk_base::Buffer* packet, const talk_base::PacketTime& packet_time) {
+ rtc::Buffer* packet, const rtc::PacketTime& packet_time) {
LOG(LS_VERBOSE) << debug_name_ << "->OnPacketReceived(...): " << " length="
<< packet->length() << ", sending: " << sending_;
// Only give receiving packets to usrsctp after if connected. This enables two
}
void SctpDataMediaChannel::OnDataFromSctpToChannel(
- const ReceiveDataParams& params, talk_base::Buffer* buffer) {
+ const ReceiveDataParams& params, rtc::Buffer* buffer) {
if (receiving_) {
LOG(LS_VERBOSE) << debug_name_ << "->OnDataFromSctpToChannel(...): "
<< "Posting with length: " << buffer->length()
return true;
}
-void SctpDataMediaChannel::OnNotificationFromSctp(talk_base::Buffer* buffer) {
+void SctpDataMediaChannel::OnNotificationFromSctp(rtc::Buffer* buffer) {
const sctp_notification& notification =
reinterpret_cast<const sctp_notification&>(*buffer->data());
ASSERT(notification.sn_header.sn_length == buffer->length());
<< ListStreams(open_streams_) << "], Q'd: ["
<< ListStreams(queued_reset_streams_) << "], Sent: ["
<< ListStreams(sent_reset_streams_) << "]";
- bool local_stream_reset_acknowledged = false;
// If both sides try to reset some streams at the same time (even if they're
// disjoint sets), we can get reset failures.
sent_reset_streams_.begin(),
sent_reset_streams_.end());
sent_reset_streams_.clear();
- local_stream_reset_acknowledged = true;
} else if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
// Each side gets an event for each direction of a stream. That is,
if (it != sent_reset_streams_.end()) {
LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): local sid " << stream_id << " acknowledged.";
- local_stream_reset_acknowledged = true;
sent_reset_streams_.erase(it);
} else if ((it = open_streams_.find(stream_id))
LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
<< "): closing sid " << stream_id;
open_streams_.erase(it);
- SignalStreamClosed(stream_id);
+ SignalStreamClosedRemotely(stream_id);
} else if ((it = queued_reset_streams_.find(stream_id))
!= queued_reset_streams_.end()) {
}
}
- if (local_stream_reset_acknowledged) {
- // This message acknowledges the last stream-reset request we sent out
- // (only one can be outstanding at a time). Send out the next one.
- SendQueuedStreamResets();
- }
+ // Always try to send the queued RESET because this call indicates that the
+ // last local RESET or remote RESET has made some progress.
+ SendQueuedStreamResets();
}
// Puts the specified |param| from the codec identified by |id| into |dest|
for (size_t i = 0; i < codecs.size(); ++i) {
if (codecs[i].Matches(match_pattern)) {
if (codecs[i].GetParam(param, &value)) {
- *dest = talk_base::FromString<int>(value);
+ *dest = rtc::FromString<int>(value);
return true;
}
}
}
void SctpDataMediaChannel::OnPacketFromSctpToNetwork(
- talk_base::Buffer* buffer) {
+ rtc::Buffer* buffer) {
if (buffer->length() > kSctpMtu) {
LOG(LS_ERROR) << debug_name_ << "->OnPacketFromSctpToNetwork(...): "
<< "SCTP seems to have made a packet that is bigger "
&reset_stream_buf[0]);
resetp->srs_assoc_id = SCTP_ALL_ASSOC;
resetp->srs_flags = SCTP_STREAM_RESET_INCOMING | SCTP_STREAM_RESET_OUTGOING;
- resetp->srs_number_streams = num_streams;
+ resetp->srs_number_streams = rtc::checked_cast<uint16_t>(num_streams);
int result_idx = 0;
for (StreamSet::iterator it = queued_reset_streams_.begin();
it != queued_reset_streams_.end(); ++it) {
resetp->srs_stream_list[result_idx++] = *it;
}
- int ret = usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_RESET_STREAMS, resetp,
- reset_stream_buf.size());
+ int ret = usrsctp_setsockopt(
+ sock_, IPPROTO_SCTP, SCTP_RESET_STREAMS, resetp,
+ rtc::checked_cast<socklen_t>(reset_stream_buf.size()));
if (ret < 0) {
LOG_ERRNO(LS_ERROR) << debug_name_ << "Failed to send a stream reset for "
<< num_streams << " streams";
return true;
}
-void SctpDataMediaChannel::OnMessage(talk_base::Message* msg) {
+void SctpDataMediaChannel::OnMessage(rtc::Message* msg) {
switch (msg->message_id) {
case MSG_SCTPINBOUNDPACKET: {
- talk_base::scoped_ptr<InboundPacketMessage> pdata(
+ rtc::scoped_ptr<InboundPacketMessage> pdata(
static_cast<InboundPacketMessage*>(msg->pdata));
OnInboundPacketFromSctpToChannel(pdata->data().get());
break;
}
case MSG_SCTPOUTBOUNDPACKET: {
- talk_base::scoped_ptr<OutboundPacketMessage> pdata(
+ rtc::scoped_ptr<OutboundPacketMessage> pdata(
static_cast<OutboundPacketMessage*>(msg->pdata));
OnPacketFromSctpToNetwork(pdata->data().get());
break;