1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/ssl/ssl_info.h"
12 using base::StringPiece;
20 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
21 const size_t kMaxZombieStreams = 20;
23 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
25 // We want to make sure we delete any closed streams in a safe manner.
26 // To avoid deleting a stream in mid-operation, we have a simple shim between
27 // us and the stream, so we can delete any streams when we return from
30 // We could just override the base methods, but this makes it easier to make
31 // sure we don't miss any.
32 class VisitorShim : public QuicConnectionVisitorInterface {
34 explicit VisitorShim(QuicSession* session) : session_(session) {}
36 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
37 bool accepted = session_->OnStreamFrames(frames);
38 session_->PostProcessAfterData();
41 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
42 session_->OnRstStream(frame);
43 session_->PostProcessAfterData();
46 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
47 session_->OnGoAway(frame);
48 session_->PostProcessAfterData();
51 virtual bool OnCanWrite() OVERRIDE {
52 bool rc = session_->OnCanWrite();
53 session_->PostProcessAfterData();
57 virtual void OnSuccessfulVersionNegotiation(
58 const QuicVersion& version) OVERRIDE {
59 session_->OnSuccessfulVersionNegotiation(version);
62 virtual void OnConfigNegotiated() OVERRIDE {
63 session_->OnConfigNegotiated();
66 virtual void OnConnectionClosed(QuicErrorCode error,
67 bool from_peer) OVERRIDE {
68 session_->OnConnectionClosed(error, from_peer);
69 // The session will go away, so don't bother with cleanup.
72 virtual bool HasPendingHandshake() const OVERRIDE {
73 return session_->HasPendingHandshake();
77 QuicSession* session_;
80 QuicSession::QuicSession(QuicConnection* connection,
81 const QuicConfig& config,
83 : connection_(connection),
84 visitor_shim_(new VisitorShim(this)),
86 max_open_streams_(config_.max_streams_per_connection()),
87 next_stream_id_(is_server ? 2 : 3),
88 is_server_(is_server),
89 largest_peer_created_stream_id_(0),
90 error_(QUIC_NO_ERROR),
91 goaway_received_(false),
93 has_pending_handshake_(false) {
95 connection_->set_visitor(visitor_shim_.get());
96 connection_->SetFromConfig(config_);
97 if (connection_->connected()) {
98 connection_->SetOverallConnectionTimeout(
99 config_.max_time_before_crypto_handshake());
103 QuicSession::~QuicSession() {
104 STLDeleteElements(&closed_streams_);
105 STLDeleteValues(&stream_map_);
108 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
109 for (size_t i = 0; i < frames.size(); ++i) {
110 // TODO(rch) deal with the error case of stream id 0
111 if (IsClosedStream(frames[i].stream_id)) {
112 // If we get additional frames for a stream where we didn't process
113 // headers, it's highly likely our compression context will end up
114 // permanently out of sync with the peer's, so we give up and close the
116 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
117 connection()->SendConnectionClose(
118 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
124 ReliableQuicStream* stream = GetStream(frames[i].stream_id);
125 if (stream == NULL) return false;
126 if (!stream->WillAcceptStreamFrame(frames[i])) return false;
128 // TODO(alyssar) check against existing connection address: if changed, make
129 // sure we update the connection.
132 for (size_t i = 0; i < frames.size(); ++i) {
133 QuicStreamId stream_id = frames[i].stream_id;
134 ReliableQuicStream* stream = GetStream(stream_id);
138 stream->OnStreamFrame(frames[i]);
140 // If the stream had been prematurely closed, and the
141 // headers are now decompressed, then we are finally finished
143 if (ContainsKey(zombie_streams_, stream_id) &&
144 stream->headers_decompressed()) {
145 CloseZombieStream(stream_id);
149 while (!decompression_blocked_streams_.empty()) {
150 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
151 if (header_id != decompressor_.current_header_id()) {
154 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
155 decompression_blocked_streams_.erase(header_id);
156 ReliableQuicStream* stream = GetStream(stream_id);
158 connection()->SendConnectionClose(
159 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
162 stream->OnDecompressorAvailable();
167 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
168 ReliableQuicStream* stream = GetStream(frame.stream_id);
170 return; // Errors are handled by GetStream.
172 if (ContainsKey(zombie_streams_, stream->id())) {
173 // If this was a zombie stream then we close it out now.
174 CloseZombieStream(stream->id());
175 // However, since the headers still have not been decompressed, we want to
176 // mark it a prematurely closed so that if we ever receive frames
177 // for this stream we can close the connection.
178 DCHECK(!stream->headers_decompressed());
179 AddPrematurelyClosedStream(frame.stream_id);
182 stream->OnStreamReset(frame.error_code);
185 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
186 DCHECK(frame.last_good_stream_id < next_stream_id_);
187 goaway_received_ = true;
190 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
191 DCHECK(!connection_->connected());
192 if (error_ == QUIC_NO_ERROR) {
196 while (stream_map_.size() != 0) {
197 ReliableStreamMap::iterator it = stream_map_.begin();
198 QuicStreamId id = it->first;
199 it->second->OnConnectionClosed(error, from_peer);
200 // The stream should call CloseStream as part of OnConnectionClosed.
201 if (stream_map_.find(id) != stream_map_.end()) {
202 LOG(DFATAL) << ENDPOINT
203 << "Stream failed to close under OnConnectionClosed";
209 bool QuicSession::OnCanWrite() {
210 // We latch this here rather than doing a traditional loop, because streams
211 // may be modifying the list as we loop.
212 int remaining_writes = write_blocked_streams_.NumBlockedStreams();
214 while (!connection_->HasQueuedData() &&
215 remaining_writes > 0) {
216 DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
217 int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList();
219 LOG(DFATAL) << "WriteBlockedStream is missing";
220 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
221 return true; // We have no write blocked streams.
223 QuicStreamId stream_id = write_blocked_streams_.PopFront(index);
224 if (stream_id == kCryptoStreamId) {
225 has_pending_handshake_ = false; // We just popped it.
227 ReliableQuicStream* stream = GetStream(stream_id);
228 if (stream != NULL) {
229 // If the stream can't write all bytes, it'll re-add itself to the blocked
231 stream->OnCanWrite();
236 return !write_blocked_streams_.HasWriteBlockedStreams();
239 bool QuicSession::HasPendingHandshake() const {
240 return has_pending_handshake_;
243 QuicConsumedData QuicSession::WritevData(QuicStreamId id,
244 const struct iovec* iov,
246 QuicStreamOffset offset,
249 data.AppendIovec(iov, iov_count);
250 return connection_->SendStreamData(id, data, offset, fin);
253 void QuicSession::SendRstStream(QuicStreamId id,
254 QuicRstStreamErrorCode error) {
255 connection_->SendRstStream(id, error);
256 CloseStreamInner(id, true);
259 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
261 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
264 void QuicSession::CloseStream(QuicStreamId stream_id) {
265 CloseStreamInner(stream_id, false);
268 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
269 bool locally_reset) {
270 DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
272 ReliableStreamMap::iterator it = stream_map_.find(stream_id);
273 if (it == stream_map_.end()) {
274 DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
277 ReliableQuicStream* stream = it->second;
278 if (connection_->connected() && !stream->headers_decompressed()) {
279 // If the stream is being closed locally (for example a client cancelling
280 // a request before receiving the response) then we need to make sure that
281 // we keep the stream alive long enough to process any response or
282 // RST_STREAM frames.
283 if (locally_reset && !is_server_) {
284 AddZombieStream(stream_id);
288 // This stream has been closed before the headers were decompressed.
289 // This might cause problems with head of line blocking of headers.
290 // If the peer sent headers which were lost but we now close the stream
291 // we will never be able to decompress headers for other streams.
292 // To deal with this, we keep track of streams which have been closed
293 // prematurely. If we ever receive data frames for this steam, then we
294 // know there actually has been a problem and we close the connection.
295 AddPrematurelyClosedStream(stream->id());
297 closed_streams_.push_back(it->second);
298 if (ContainsKey(zombie_streams_, stream->id())) {
299 zombie_streams_.erase(stream->id());
301 stream_map_.erase(it);
305 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
306 if (zombie_streams_.size() == kMaxZombieStreams) {
307 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
308 CloseZombieStream(oldest_zombie_stream_id);
309 // However, since the headers still have not been decompressed, we want to
310 // mark it a prematurely closed so that if we ever receive frames
311 // for this stream we can close the connection.
312 AddPrematurelyClosedStream(oldest_zombie_stream_id);
314 zombie_streams_.insert(make_pair(stream_id, true));
317 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
318 DCHECK(ContainsKey(zombie_streams_, stream_id));
319 zombie_streams_.erase(stream_id);
320 ReliableQuicStream* stream = GetStream(stream_id);
324 stream_map_.erase(stream_id);
326 closed_streams_.push_back(stream);
329 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
330 if (prematurely_closed_streams_.size() ==
331 kMaxPrematurelyClosedStreamsTracked) {
332 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
334 prematurely_closed_streams_.insert(make_pair(stream_id, true));
337 bool QuicSession::IsEncryptionEstablished() {
338 return GetCryptoStream()->encryption_established();
341 bool QuicSession::IsCryptoHandshakeConfirmed() {
342 return GetCryptoStream()->handshake_confirmed();
345 void QuicSession::OnConfigNegotiated() {
346 connection_->SetFromConfig(config_);
349 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
351 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
352 // to QuicSession since it is the glue.
353 case ENCRYPTION_FIRST_ESTABLISHED:
356 case ENCRYPTION_REESTABLISHED:
357 // Retransmit originally packets that were sent, since they can't be
358 // decrypted by the peer.
359 connection_->RetransmitUnackedPackets(
360 QuicConnection::INITIAL_ENCRYPTION_ONLY);
363 case HANDSHAKE_CONFIRMED:
364 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
365 << "Handshake confirmed without parameter negotiation.";
366 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
367 max_open_streams_ = config_.max_streams_per_connection();
371 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
375 void QuicSession::OnCryptoHandshakeMessageSent(
376 const CryptoHandshakeMessage& message) {
379 void QuicSession::OnCryptoHandshakeMessageReceived(
380 const CryptoHandshakeMessage& message) {
383 QuicConfig* QuicSession::config() {
387 void QuicSession::ActivateStream(ReliableQuicStream* stream) {
388 DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size()
389 << ". activating " << stream->id();
390 DCHECK_EQ(stream_map_.count(stream->id()), 0u);
391 stream_map_[stream->id()] = stream;
394 QuicStreamId QuicSession::GetNextStreamId() {
395 QuicStreamId id = next_stream_id_;
396 next_stream_id_ += 2;
400 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
401 if (stream_id == kCryptoStreamId) {
402 return GetCryptoStream();
405 ReliableStreamMap::iterator it = stream_map_.find(stream_id);
406 if (it != stream_map_.end()) {
410 if (IsClosedStream(stream_id)) {
414 if (stream_id % 2 == next_stream_id_ % 2) {
415 // We've received a frame for a locally-created stream that is not
416 // currently active. This is an error.
417 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
421 return GetIncomingReliableStream(stream_id);
424 ReliableQuicStream* QuicSession::GetIncomingReliableStream(
425 QuicStreamId stream_id) {
426 if (IsClosedStream(stream_id)) {
431 // We've already sent a GoAway
432 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
436 implicitly_created_streams_.erase(stream_id);
437 if (stream_id > largest_peer_created_stream_id_) {
438 // TODO(rch) add unit test for this
439 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
440 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
443 if (largest_peer_created_stream_id_ == 0) {
444 largest_peer_created_stream_id_= 1;
446 for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
449 implicitly_created_streams_.insert(id);
451 largest_peer_created_stream_id_ = stream_id;
453 ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id);
454 if (stream == NULL) {
457 ActivateStream(stream);
461 bool QuicSession::IsClosedStream(QuicStreamId id) {
463 if (id == kCryptoStreamId) {
466 if (ContainsKey(zombie_streams_, id)) {
469 if (ContainsKey(stream_map_, id)) {
473 if (id % 2 == next_stream_id_ % 2) {
474 // Locally created streams are strictly in-order. If the id is in the
475 // range of created streams and it's not active, it must have been closed.
476 return id < next_stream_id_;
478 // For peer created streams, we also need to consider implicitly created
480 return id <= largest_peer_created_stream_id_ &&
481 implicitly_created_streams_.count(id) == 0;
484 size_t QuicSession::GetNumOpenStreams() const {
485 return stream_map_.size() + implicitly_created_streams_.size() -
486 zombie_streams_.size();
489 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
490 if (id == kCryptoStreamId) {
491 DCHECK(!has_pending_handshake_);
492 has_pending_handshake_ = true;
493 // TODO(jar): Be sure to use the highest priority for the crypto stream,
494 // perhaps by adding a "special" priority for it that is higher than
496 priority = kHighestPriority;
498 write_blocked_streams_.PushBack(id, priority);
501 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
502 QuicStreamId stream_id) {
503 decompression_blocked_streams_[header_id] = stream_id;
506 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
511 void QuicSession::PostProcessAfterData() {
512 STLDeleteElements(&closed_streams_);
513 closed_streams_.clear();