1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/quic/quic_headers_stream.h"
11 #include "net/ssl/ssl_info.h"
13 using base::StringPiece;
21 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
22 const size_t kMaxZombieStreams = 20;
24 #define ENDPOINT (is_server() ? "Server: " : " Client: ")
26 // We want to make sure we delete any closed streams in a safe manner.
27 // To avoid deleting a stream in mid-operation, we have a simple shim between
28 // us and the stream, so we can delete any streams when we return from
31 // We could just override the base methods, but this makes it easier to make
32 // sure we don't miss any.
33 class VisitorShim : public QuicConnectionVisitorInterface {
35 explicit VisitorShim(QuicSession* session) : session_(session) {}
37 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
38 bool accepted = session_->OnStreamFrames(frames);
39 session_->PostProcessAfterData();
42 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
43 session_->OnRstStream(frame);
44 session_->PostProcessAfterData();
47 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
48 session_->OnGoAway(frame);
49 session_->PostProcessAfterData();
52 virtual bool OnCanWrite() OVERRIDE {
53 bool rc = session_->OnCanWrite();
54 session_->PostProcessAfterData();
58 virtual void OnSuccessfulVersionNegotiation(
59 const QuicVersion& version) OVERRIDE {
60 session_->OnSuccessfulVersionNegotiation(version);
63 virtual void OnConnectionClosed(
64 QuicErrorCode error, bool from_peer) OVERRIDE {
65 session_->OnConnectionClosed(error, from_peer);
66 // The session will go away, so don't bother with cleanup.
69 virtual void OnWriteBlocked() OVERRIDE {
70 session_->OnWriteBlocked();
73 virtual bool HasPendingHandshake() const OVERRIDE {
74 return session_->HasPendingHandshake();
78 QuicSession* session_;
81 QuicSession::QuicSession(QuicConnection* connection,
82 const QuicConfig& config)
83 : connection_(connection),
84 visitor_shim_(new VisitorShim(this)),
86 max_open_streams_(config_.max_streams_per_connection()),
87 next_stream_id_(is_server() ? 2 : 3),
88 largest_peer_created_stream_id_(0),
89 error_(QUIC_NO_ERROR),
90 goaway_received_(false),
92 has_pending_handshake_(false) {
94 connection_->set_visitor(visitor_shim_.get());
95 connection_->SetFromConfig(config_);
96 if (connection_->connected()) {
97 connection_->SetOverallConnectionTimeout(
98 config_.max_time_before_crypto_handshake());
100 if (connection_->version() > QUIC_VERSION_12) {
101 headers_stream_.reset(new QuicHeadersStream(this));
103 // For version above QUIC v12, the headers stream is stream 3, so the
104 // next available local stream ID should be 5.
105 DCHECK_EQ(kHeadersStreamId, next_stream_id_);
106 next_stream_id_ += 2;
111 QuicSession::~QuicSession() {
112 STLDeleteElements(&closed_streams_);
113 STLDeleteValues(&stream_map_);
116 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
117 for (size_t i = 0; i < frames.size(); ++i) {
118 // TODO(rch) deal with the error case of stream id 0
119 if (IsClosedStream(frames[i].stream_id)) {
120 // If we get additional frames for a stream where we didn't process
121 // headers, it's highly likely our compression context will end up
122 // permanently out of sync with the peer's, so we give up and close the
124 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
125 connection()->SendConnectionClose(
126 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
132 ReliableQuicStream* stream = GetStream(frames[i].stream_id);
133 if (stream == NULL) return false;
134 if (!stream->WillAcceptStreamFrame(frames[i])) return false;
136 // TODO(alyssar) check against existing connection address: if changed, make
137 // sure we update the connection.
140 for (size_t i = 0; i < frames.size(); ++i) {
141 QuicStreamId stream_id = frames[i].stream_id;
142 ReliableQuicStream* stream = GetStream(stream_id);
146 stream->OnStreamFrame(frames[i]);
148 // If the stream is a data stream had been prematurely closed, and the
149 // headers are now decompressed, then we are finally finished
151 if (ContainsKey(zombie_streams_, stream_id) &&
152 static_cast<QuicDataStream*>(stream)->headers_decompressed()) {
153 CloseZombieStream(stream_id);
157 while (!decompression_blocked_streams_.empty()) {
158 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
159 if (header_id != decompressor_.current_header_id()) {
162 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
163 decompression_blocked_streams_.erase(header_id);
164 QuicDataStream* stream = GetDataStream(stream_id);
166 connection()->SendConnectionClose(
167 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
170 stream->OnDecompressorAvailable();
175 void QuicSession::OnStreamHeaders(QuicStreamId stream_id,
176 StringPiece headers_data) {
177 QuicDataStream* stream = GetDataStream(stream_id);
179 // It's quite possible to receive headers after a stream has been reset.
182 stream->OnStreamHeaders(headers_data);
185 void QuicSession::OnStreamHeadersPriority(QuicStreamId stream_id,
186 QuicPriority priority) {
187 QuicDataStream* stream = GetDataStream(stream_id);
189 // It's quite possible to receive headers after a stream has been reset.
192 stream->OnStreamHeadersPriority(priority);
195 void QuicSession::OnStreamHeadersComplete(QuicStreamId stream_id,
198 QuicDataStream* stream = GetDataStream(stream_id);
200 // It's quite possible to receive headers after a stream has been reset.
203 stream->OnStreamHeadersComplete(fin, frame_len);
206 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
207 if (frame.stream_id == kCryptoStreamId) {
208 connection()->SendConnectionCloseWithDetails(
209 QUIC_INVALID_STREAM_ID,
210 "Attempt to reset the crypto stream");
213 if (frame.stream_id == kHeadersStreamId &&
214 connection()->version() > QUIC_VERSION_12) {
215 connection()->SendConnectionCloseWithDetails(
216 QUIC_INVALID_STREAM_ID,
217 "Attempt to reset the headers stream");
220 QuicDataStream* stream = GetDataStream(frame.stream_id);
222 return; // Errors are handled by GetStream.
224 if (ContainsKey(zombie_streams_, stream->id())) {
225 // If this was a zombie stream then we close it out now.
226 CloseZombieStream(stream->id());
227 // However, since the headers still have not been decompressed, we want to
228 // mark it a prematurely closed so that if we ever receive frames
229 // for this stream we can close the connection.
230 DCHECK(!stream->headers_decompressed());
231 AddPrematurelyClosedStream(frame.stream_id);
234 if (connection()->version() <= QUIC_VERSION_12) {
235 if (stream->stream_bytes_read() > 0 && !stream->headers_decompressed()) {
236 connection()->SendConnectionClose(
237 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
240 stream->OnStreamReset(frame);
243 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
244 DCHECK(frame.last_good_stream_id < next_stream_id_);
245 goaway_received_ = true;
248 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
249 DCHECK(!connection_->connected());
250 if (error_ == QUIC_NO_ERROR) {
254 while (!stream_map_.empty()) {
255 DataStreamMap::iterator it = stream_map_.begin();
256 QuicStreamId id = it->first;
257 it->second->OnConnectionClosed(error, from_peer);
258 // The stream should call CloseStream as part of OnConnectionClosed.
259 if (stream_map_.find(id) != stream_map_.end()) {
260 LOG(DFATAL) << ENDPOINT
261 << "Stream failed to close under OnConnectionClosed";
267 bool QuicSession::OnCanWrite() {
268 // We latch this here rather than doing a traditional loop, because streams
269 // may be modifying the list as we loop.
270 int remaining_writes = write_blocked_streams_.NumBlockedStreams();
272 while (remaining_writes > 0 && connection_->CanWriteStreamData()) {
273 DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
274 if (!write_blocked_streams_.HasWriteBlockedStreams()) {
275 LOG(DFATAL) << "WriteBlockedStream is missing";
276 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
277 return true; // We have no write blocked streams.
279 QuicStreamId stream_id = write_blocked_streams_.PopFront();
280 if (stream_id == kCryptoStreamId) {
281 has_pending_handshake_ = false; // We just popped it.
283 ReliableQuicStream* stream = GetStream(stream_id);
284 if (stream != NULL) {
285 // If the stream can't write all bytes, it'll re-add itself to the blocked
287 stream->OnCanWrite();
292 return !write_blocked_streams_.HasWriteBlockedStreams();
295 bool QuicSession::HasPendingHandshake() const {
296 return has_pending_handshake_;
299 QuicConsumedData QuicSession::WritevData(
301 const struct iovec* iov,
303 QuicStreamOffset offset,
305 QuicAckNotifier::DelegateInterface* ack_notifier_delegate) {
307 data.AppendIovec(iov, iov_count);
308 return connection_->SendStreamData(id, data, offset, fin,
309 ack_notifier_delegate);
312 size_t QuicSession::WriteHeaders(QuicStreamId id,
313 const SpdyHeaderBlock& headers,
315 DCHECK_LT(QUIC_VERSION_12, connection()->version());
316 if (connection()->version() <= QUIC_VERSION_12) {
319 return headers_stream_->WriteHeaders(id, headers, fin);
322 void QuicSession::SendRstStream(QuicStreamId id,
323 QuicRstStreamErrorCode error,
324 QuicStreamOffset bytes_written) {
325 connection_->SendRstStream(id, error, bytes_written);
326 CloseStreamInner(id, true);
329 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
331 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
334 void QuicSession::CloseStream(QuicStreamId stream_id) {
335 CloseStreamInner(stream_id, false);
338 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
339 bool locally_reset) {
340 DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
342 DataStreamMap::iterator it = stream_map_.find(stream_id);
343 if (it == stream_map_.end()) {
344 DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
347 QuicDataStream* stream = it->second;
349 // Tell the stream that a RST has been sent.
351 stream->set_rst_sent(true);
354 if (connection_->version() <= QUIC_VERSION_12 &&
355 connection_->connected() && !stream->headers_decompressed()) {
356 // If the stream is being closed locally (for example a client cancelling
357 // a request before receiving the response) then we need to make sure that
358 // we keep the stream alive long enough to process any response or
359 // RST_STREAM frames.
360 if (locally_reset && !is_server()) {
361 AddZombieStream(stream_id);
365 // This stream has been closed before the headers were decompressed.
366 // This might cause problems with head of line blocking of headers.
367 // If the peer sent headers which were lost but we now close the stream
368 // we will never be able to decompress headers for other streams.
369 // To deal with this, we keep track of streams which have been closed
370 // prematurely. If we ever receive data frames for this steam, then we
371 // know there actually has been a problem and we close the connection.
372 AddPrematurelyClosedStream(stream->id());
374 closed_streams_.push_back(it->second);
375 if (ContainsKey(zombie_streams_, stream->id())) {
376 zombie_streams_.erase(stream->id());
378 stream_map_.erase(it);
382 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
383 if (zombie_streams_.size() == kMaxZombieStreams) {
384 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
385 CloseZombieStream(oldest_zombie_stream_id);
386 // However, since the headers still have not been decompressed, we want to
387 // mark it a prematurely closed so that if we ever receive frames
388 // for this stream we can close the connection.
389 AddPrematurelyClosedStream(oldest_zombie_stream_id);
391 zombie_streams_.insert(make_pair(stream_id, true));
394 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
395 DCHECK(ContainsKey(zombie_streams_, stream_id));
396 zombie_streams_.erase(stream_id);
397 QuicDataStream* stream = GetDataStream(stream_id);
401 stream_map_.erase(stream_id);
403 closed_streams_.push_back(stream);
406 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
407 if (connection()->version() > QUIC_VERSION_12) {
410 if (prematurely_closed_streams_.size() ==
411 kMaxPrematurelyClosedStreamsTracked) {
412 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
414 prematurely_closed_streams_.insert(make_pair(stream_id, true));
417 bool QuicSession::IsEncryptionEstablished() {
418 return GetCryptoStream()->encryption_established();
421 bool QuicSession::IsCryptoHandshakeConfirmed() {
422 return GetCryptoStream()->handshake_confirmed();
425 void QuicSession::OnConfigNegotiated() {
426 connection_->SetFromConfig(config_);
429 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
431 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
432 // to QuicSession since it is the glue.
433 case ENCRYPTION_FIRST_ESTABLISHED:
436 case ENCRYPTION_REESTABLISHED:
437 // Retransmit originally packets that were sent, since they can't be
438 // decrypted by the peer.
439 connection_->RetransmitUnackedPackets(INITIAL_ENCRYPTION_ONLY);
442 case HANDSHAKE_CONFIRMED:
443 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
444 << "Handshake confirmed without parameter negotiation.";
445 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
446 max_open_streams_ = config_.max_streams_per_connection();
450 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
454 void QuicSession::OnCryptoHandshakeMessageSent(
455 const CryptoHandshakeMessage& message) {
458 void QuicSession::OnCryptoHandshakeMessageReceived(
459 const CryptoHandshakeMessage& message) {
462 QuicConfig* QuicSession::config() {
466 void QuicSession::ActivateStream(QuicDataStream* stream) {
467 DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
468 << ". activating " << stream->id();
469 DCHECK_EQ(stream_map_.count(stream->id()), 0u);
470 stream_map_[stream->id()] = stream;
473 QuicStreamId QuicSession::GetNextStreamId() {
474 QuicStreamId id = next_stream_id_;
475 next_stream_id_ += 2;
479 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
480 if (stream_id == kCryptoStreamId) {
481 return GetCryptoStream();
483 if (stream_id == kHeadersStreamId &&
484 connection_->version() > QUIC_VERSION_12) {
485 return headers_stream_.get();
487 return GetDataStream(stream_id);
490 QuicDataStream* QuicSession::GetDataStream(const QuicStreamId stream_id) {
491 if (stream_id == kCryptoStreamId) {
492 DLOG(FATAL) << "Attempt to call GetDataStream with the crypto stream id";
495 if (stream_id == kHeadersStreamId &&
496 connection_->version() > QUIC_VERSION_12) {
497 DLOG(FATAL) << "Attempt to call GetDataStream with the headers stream id";
501 DataStreamMap::iterator it = stream_map_.find(stream_id);
502 if (it != stream_map_.end()) {
506 if (IsClosedStream(stream_id)) {
510 if (stream_id % 2 == next_stream_id_ % 2) {
511 // We've received a frame for a locally-created stream that is not
512 // currently active. This is an error.
513 if (connection()->connected()) {
514 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
519 return GetIncomingDataStream(stream_id);
522 QuicDataStream* QuicSession::GetIncomingDataStream(QuicStreamId stream_id) {
523 if (IsClosedStream(stream_id)) {
528 // We've already sent a GoAway
529 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
533 implicitly_created_streams_.erase(stream_id);
534 if (stream_id > largest_peer_created_stream_id_) {
535 // TODO(rch) add unit test for this
536 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
537 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
540 if (largest_peer_created_stream_id_ == 0) {
541 if (is_server() && connection()->version() > QUIC_VERSION_12) {
542 largest_peer_created_stream_id_= 3;
544 largest_peer_created_stream_id_= 1;
547 for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
550 implicitly_created_streams_.insert(id);
552 largest_peer_created_stream_id_ = stream_id;
554 QuicDataStream* stream = CreateIncomingDataStream(stream_id);
555 if (stream == NULL) {
558 ActivateStream(stream);
562 bool QuicSession::IsClosedStream(QuicStreamId id) {
564 if (id == kCryptoStreamId) {
567 if (connection()->version() > QUIC_VERSION_12) {
568 if (id == kHeadersStreamId) {
572 if (ContainsKey(zombie_streams_, id)) {
575 if (ContainsKey(stream_map_, id)) {
579 if (id % 2 == next_stream_id_ % 2) {
580 // Locally created streams are strictly in-order. If the id is in the
581 // range of created streams and it's not active, it must have been closed.
582 return id < next_stream_id_;
584 // For peer created streams, we also need to consider implicitly created
586 return id <= largest_peer_created_stream_id_ &&
587 implicitly_created_streams_.count(id) == 0;
590 size_t QuicSession::GetNumOpenStreams() const {
591 return stream_map_.size() + implicitly_created_streams_.size() -
592 zombie_streams_.size();
595 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
597 ReliableQuicStream* stream = GetStream(id);
598 if (stream != NULL) {
599 LOG_IF(DFATAL, priority != stream->EffectivePriority())
600 << "Priorities do not match. Got: " << priority
601 << " Expected: " << stream->EffectivePriority();
603 LOG(DFATAL) << "Marking unknown stream " << id << " blocked.";
607 if (id == kCryptoStreamId) {
608 DCHECK(!has_pending_handshake_);
609 has_pending_handshake_ = true;
610 // TODO(jar): Be sure to use the highest priority for the crypto stream,
611 // perhaps by adding a "special" priority for it that is higher than
613 priority = kHighestPriority;
615 write_blocked_streams_.PushBack(id, priority, connection()->version());
618 bool QuicSession::HasDataToWrite() const {
619 return write_blocked_streams_.HasWriteBlockedStreams() ||
620 connection_->HasQueuedData();
623 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
624 QuicStreamId stream_id) {
625 DCHECK_GE(QUIC_VERSION_12, connection()->version());
626 decompression_blocked_streams_[header_id] = stream_id;
629 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
634 void QuicSession::PostProcessAfterData() {
635 STLDeleteElements(&closed_streams_);
636 closed_streams_.clear();