#include "base/stl_util.h"
#include "net/quic/crypto/proof_verifier.h"
#include "net/quic/quic_connection.h"
+#include "net/quic/quic_headers_stream.h"
#include "net/ssl/ssl_info.h"
using base::StringPiece;
const size_t kMaxPrematurelyClosedStreamsTracked = 20;
const size_t kMaxZombieStreams = 20;
-#define ENDPOINT (is_server_ ? "Server: " : " Client: ")
+#define ENDPOINT (is_server() ? "Server: " : " Client: ")
// We want to make sure we delete any closed streams in a safe manner.
// To avoid deleting a stream in mid-operation, we have a simple shim between
session_->OnSuccessfulVersionNegotiation(version);
}
- virtual void OnConfigNegotiated() OVERRIDE {
- session_->OnConfigNegotiated();
- }
-
- virtual void OnConnectionClosed(QuicErrorCode error,
- bool from_peer) OVERRIDE {
+ virtual void OnConnectionClosed(
+ QuicErrorCode error, bool from_peer) OVERRIDE {
session_->OnConnectionClosed(error, from_peer);
// The session will go away, so don't bother with cleanup.
}
+ virtual void OnWriteBlocked() OVERRIDE {
+ session_->OnWriteBlocked();
+ }
+
virtual bool HasPendingHandshake() const OVERRIDE {
return session_->HasPendingHandshake();
}
};
QuicSession::QuicSession(QuicConnection* connection,
- const QuicConfig& config,
- bool is_server)
+ const QuicConfig& config)
: connection_(connection),
visitor_shim_(new VisitorShim(this)),
config_(config),
max_open_streams_(config_.max_streams_per_connection()),
- next_stream_id_(is_server ? 2 : 3),
- is_server_(is_server),
+ next_stream_id_(is_server() ? 2 : 3),
largest_peer_created_stream_id_(0),
error_(QUIC_NO_ERROR),
goaway_received_(false),
connection_->SetOverallConnectionTimeout(
config_.max_time_before_crypto_handshake());
}
+ if (connection_->version() > QUIC_VERSION_12) {
+ headers_stream_.reset(new QuicHeadersStream(this));
+ if (!is_server()) {
+ // For version above QUIC v12, the headers stream is stream 3, so the
+ // next available local stream ID should be 5.
+ DCHECK_EQ(kHeadersStreamId, next_stream_id_);
+ next_stream_id_ += 2;
+ }
+ }
}
QuicSession::~QuicSession() {
}
stream->OnStreamFrame(frames[i]);
- // If the stream had been prematurely closed, and the
+ // If the stream is a data stream had been prematurely closed, and the
// headers are now decompressed, then we are finally finished
// with this stream.
if (ContainsKey(zombie_streams_, stream_id) &&
- stream->headers_decompressed()) {
+ static_cast<QuicDataStream*>(stream)->headers_decompressed()) {
CloseZombieStream(stream_id);
}
}
}
QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
decompression_blocked_streams_.erase(header_id);
- ReliableQuicStream* stream = GetStream(stream_id);
+ QuicDataStream* stream = GetDataStream(stream_id);
if (!stream) {
connection()->SendConnectionClose(
QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
return true;
}
+void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
+ StringPiece headers_data) {
+ QuicDataStream* stream = GetDataStream(stream_id);
+ if (!stream) {
+ // It's quite possible to receive headers after a stream has been reset.
+ return;
+ }
+ stream->OnStreamHeaders(headers_data);
+}
+
+void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
+ QuicPriority priority) {
+ QuicDataStream* stream = GetDataStream(stream_id);
+ if (!stream) {
+ // It's quite possible to receive headers after a stream has been reset.
+ return;
+ }
+ stream->OnStreamHeadersPriority(priority);
+}
+
+void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
+ bool fin,
+ size_t frame_len) {
+ QuicDataStream* stream = GetDataStream(stream_id);
+ if (!stream) {
+ // It's quite possible to receive headers after a stream has been reset.
+ return;
+ }
+ stream->OnStreamHeadersComplete(fin, frame_len);
+}
+
void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
- ReliableQuicStream* stream = GetStream(frame.stream_id);
+ if (frame.stream_id == kCryptoStreamId) {
+ connection()->SendConnectionCloseWithDetails(
+ QUIC_INVALID_STREAM_ID,
+ "Attempt to reset the crypto stream");
+ return;
+ }
+ if (frame.stream_id == kHeadersStreamId &&
+ connection()->version() > QUIC_VERSION_12) {
+ connection()->SendConnectionCloseWithDetails(
+ QUIC_INVALID_STREAM_ID,
+ "Attempt to reset the headers stream");
+ return;
+ }
+ QuicDataStream* stream = GetDataStream(frame.stream_id);
if (!stream) {
return; // Errors are handled by GetStream.
}
AddPrematurelyClosedStream(frame.stream_id);
return;
}
- stream->OnStreamReset(frame.error_code);
+ if (connection()->version() <= QUIC_VERSION_12) {
+ if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) {
+ connection()->SendConnectionClose(
+ QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
+ }
+ }
+ stream->OnStreamReset(frame);
}
void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
error_ = error;
}
- while (stream_map_.size() != 0) {
- ReliableStreamMap::iterator it = stream_map_.begin();
+ while (!stream_map_.empty()) {
+ DataStreamMap::iterator it = stream_map_.begin();
QuicStreamId id = it->first;
it->second->OnConnectionClosed(error, from_peer);
// The stream should call CloseStream as part of OnConnectionClosed.
// may be modifying the list as we loop.
int remaining_writes = write_blocked_streams_.NumBlockedStreams();
- while (!connection_->HasQueuedData() &&
- remaining_writes > 0) {
+ while (remaining_writes > 0 && connection_->CanWriteStreamData()) {
DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
- int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList();
- if (index == -1) {
+ if (!write_blocked_streams_.HasWriteBlockedStreams()) {
LOG(DFATAL) << "WriteBlockedStream is missing";
connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
return true; // We have no write blocked streams.
}
- QuicStreamId stream_id = write_blocked_streams_.PopFront(index);
+ QuicStreamId stream_id = write_blocked_streams_.PopFront();
if (stream_id == kCryptoStreamId) {
has_pending_handshake_ = false; // We just popped it.
}
return has_pending_handshake_;
}
-QuicConsumedData QuicSession::WritevData(QuicStreamId id,
- const struct iovec* iov,
- int iov_count,
- QuicStreamOffset offset,
- bool fin) {
+QuicConsumedData QuicSession::WritevData(
+ QuicStreamId id,
+ const struct iovec* iov,
+ int iov_count,
+ QuicStreamOffset offset,
+ bool fin,
+ QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
IOVector data;
data.AppendIovec(iov, iov_count);
- return connection_->SendStreamData(id, data, offset, fin);
+ return connection_->SendStreamData(id, data, offset, fin,
+ ack_notifier_delegate);
+}
+
+size_t QuicSession::WriteHeaders(QuicStreamId id,
+ const SpdyHeaderBlock& headers,
+ bool fin) {
+ DCHECK_LT(QUIC_VERSION_12, connection()->version());
+ if (connection()->version() <= QUIC_VERSION_12) {
+ return 0;
+ }
+ return headers_stream_->WriteHeaders(id, headers, fin);
}
void QuicSession::SendRstStream(QuicStreamId id,
- QuicRstStreamErrorCode error) {
- connection_->SendRstStream(id, error);
+ QuicRstStreamErrorCode error,
+ QuicStreamOffset bytes_written) {
+ connection_->SendRstStream(id, error, bytes_written);
CloseStreamInner(id, true);
}
void QuicSession::CloseStreamInner(QuicStreamId stream_id,
bool locally_reset) {
- DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
+ DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
- ReliableStreamMap::iterator it = stream_map_.find(stream_id);
+ DataStreamMap::iterator it = stream_map_.find(stream_id);
if (it == stream_map_.end()) {
- DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
+ DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
return;
}
- ReliableQuicStream* stream = it->second;
- if (connection_->connected() && !stream->headers_decompressed()) {
+ QuicDataStream* stream = it->second;
+
+ // Tell the stream that a RST has been sent.
+ if (locally_reset) {
+ stream->set_rst_sent(true);
+ }
+
+ if (connection_->version() <= QUIC_VERSION_12 &&
+ connection_->connected() && !stream->headers_decompressed()) {
// If the stream is being closed locally (for example a client cancelling
// a request before receiving the response) then we need to make sure that
// we keep the stream alive long enough to process any response or
// RST_STREAM frames.
- if (locally_reset && !is_server_) {
+ if (locally_reset && !is_server()) {
AddZombieStream(stream_id);
return;
}
void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
DCHECK(ContainsKey(zombie_streams_, stream_id));
zombie_streams_.erase(stream_id);
- ReliableQuicStream* stream = GetStream(stream_id);
+ QuicDataStream* stream = GetDataStream(stream_id);
if (!stream) {
return;
}
}
void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
+ if (connection()->version() > QUIC_VERSION_12) {
+ return;
+ }
if (prematurely_closed_streams_.size() ==
kMaxPrematurelyClosedStreamsTracked) {
prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
case ENCRYPTION_REESTABLISHED:
// Retransmit originally packets that were sent, since they can't be
// decrypted by the peer.
- connection_->RetransmitUnackedPackets(
- QuicConnection::INITIAL_ENCRYPTION_ONLY);
+ connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY);
break;
case HANDSHAKE_CONFIRMED:
return &config_;
}
-void QuicSession::ActivateStream(ReliableQuicStream* stream) {
- DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size()
+void QuicSession::ActivateStream(QuicDataStream* stream) {
+ DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
<< ". activating " << stream->id();
DCHECK_EQ(stream_map_.count(stream->id()), 0u);
stream_map_[stream->id()] = stream;
if (stream_id == kCryptoStreamId) {
return GetCryptoStream();
}
+ if (stream_id == kHeadersStreamId &&
+ connection_->version() > QUIC_VERSION_12) {
+ return headers_stream_.get();
+ }
+ return GetDataStream(stream_id);
+}
- ReliableStreamMap::iterator it = stream_map_.find(stream_id);
+QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
+ if (stream_id == kCryptoStreamId) {
+ DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
+ return NULL;
+ }
+ if (stream_id == kHeadersStreamId &&
+ connection_->version() > QUIC_VERSION_12) {
+ DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
+ return NULL;
+ }
+
+ DataStreamMap::iterator it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return it->second;
}
if (stream_id % 2 == next_stream_id_ % 2) {
// We've received a frame for a locally-created stream that is not
// currently active. This is an error.
- connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
+ if (connection()->connected()) {
+ connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
+ }
return NULL;
}
- return GetIncomingReliableStream(stream_id);
+ return GetIncomingDataStream(stream_id);
}
-ReliableQuicStream* QuicSession::GetIncomingReliableStream(
- QuicStreamId stream_id) {
+QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
if (IsClosedStream(stream_id)) {
return NULL;
}
if (goaway_sent_) {
// We've already sent a GoAway
- SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
+ SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
return NULL;
}
return NULL;
}
if (largest_peer_created_stream_id_ == 0) {
- largest_peer_created_stream_id_= 1;
+ if (is_server() && connection()->version() > QUIC_VERSION_12) {
+ largest_peer_created_stream_id_= 3;
+ } else {
+ largest_peer_created_stream_id_= 1;
+ }
}
for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
id < stream_id;
}
largest_peer_created_stream_id_ = stream_id;
}
- ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id);
+ QuicDataStream* stream = CreateIncomingDataStream(stream_id);
if (stream == NULL) {
return NULL;
}
if (id == kCryptoStreamId) {
return false;
}
+ if (connection()->version() > QUIC_VERSION_12) {
+ if (id == kHeadersStreamId) {
+ return false;
+ }
+ }
if (ContainsKey(zombie_streams_, id)) {
return true;
}
}
void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
+#ifndef NDEBUG
+ ReliableQuicStream* stream = GetStream(id);
+ if (stream != NULL) {
+ LOG_IF(DFATAL, priority != stream->EffectivePriority())
+ << "Priorities do not match. Got: " << priority
+ << " Expected: " << stream->EffectivePriority();
+ } else {
+ LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
+ }
+#endif
+
if (id == kCryptoStreamId) {
DCHECK(!has_pending_handshake_);
has_pending_handshake_ = true;
// kHighestPriority.
priority = kHighestPriority;
}
- write_blocked_streams_.PushBack(id, priority);
+ write_blocked_streams_.PushBack(id, priority, connection()->version());
+}
+
+bool QuicSession::HasDataToWrite() const {
+ return write_blocked_streams_.HasWriteBlockedStreams() ||
+ connection_->HasQueuedData();
}
void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
QuicStreamId stream_id) {
+ DCHECK_GE(QUIC_VERSION_12, connection()->version());
decompression_blocked_streams_[header_id] = stream_id;
}