1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/spdy/spdy_session.h"
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/cert/cert_verify_result.h"
33 #include "net/http/http_log_util.h"
34 #include "net/http/http_network_session.h"
35 #include "net/http/http_server_properties.h"
36 #include "net/http/http_util.h"
37 #include "net/http/transport_security_state.h"
38 #include "net/spdy/spdy_buffer_producer.h"
39 #include "net/spdy/spdy_frame_builder.h"
40 #include "net/spdy/spdy_http_utils.h"
41 #include "net/spdy/spdy_protocol.h"
42 #include "net/spdy/spdy_session_pool.h"
43 #include "net/spdy/spdy_stream.h"
44 #include "net/ssl/channel_id_service.h"
45 #include "net/ssl/ssl_cipher_suite_names.h"
46 #include "net/ssl/ssl_connection_status_flags.h"
52 const int kReadBufferSize = 8 * 1024;
53 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
54 const int kHungIntervalSeconds = 10;
56 // Minimum seconds that unclaimed pushed streams will be kept in memory.
57 const int kMinPushedStreamLifetimeSeconds = 300;
59 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
60 const SpdyHeaderBlock& headers,
61 net::NetLog::LogLevel log_level) {
62 scoped_ptr<base::ListValue> headers_list(new base::ListValue());
63 for (SpdyHeaderBlock::const_iterator it = headers.begin();
64 it != headers.end(); ++it) {
65 headers_list->AppendString(
67 ElideHeaderValueForNetLog(log_level, it->first, it->second));
69 return headers_list.Pass();
72 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
75 SpdyPriority spdy_priority,
76 SpdyStreamId stream_id,
77 NetLog::LogLevel log_level) {
78 base::DictionaryValue* dict = new base::DictionaryValue();
80 SpdyHeaderBlockToListValue(*headers, log_level).release());
81 dict->SetBoolean("fin", fin);
82 dict->SetBoolean("unidirectional", unidirectional);
83 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
84 dict->SetInteger("stream_id", stream_id);
88 base::Value* NetLogSpdySynStreamReceivedCallback(
89 const SpdyHeaderBlock* headers,
92 SpdyPriority spdy_priority,
93 SpdyStreamId stream_id,
94 SpdyStreamId associated_stream,
95 NetLog::LogLevel log_level) {
96 base::DictionaryValue* dict = new base::DictionaryValue();
98 SpdyHeaderBlockToListValue(*headers, log_level).release());
99 dict->SetBoolean("fin", fin);
100 dict->SetBoolean("unidirectional", unidirectional);
101 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
102 dict->SetInteger("stream_id", stream_id);
103 dict->SetInteger("associated_stream", associated_stream);
107 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
108 const SpdyHeaderBlock* headers,
110 SpdyStreamId stream_id,
111 NetLog::LogLevel log_level) {
112 base::DictionaryValue* dict = new base::DictionaryValue();
114 SpdyHeaderBlockToListValue(*headers, log_level).release());
115 dict->SetBoolean("fin", fin);
116 dict->SetInteger("stream_id", stream_id);
120 base::Value* NetLogSpdySessionCloseCallback(int net_error,
121 const std::string* description,
122 NetLog::LogLevel /* log_level */) {
123 base::DictionaryValue* dict = new base::DictionaryValue();
124 dict->SetInteger("net_error", net_error);
125 dict->SetString("description", *description);
129 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
130 NetLog::LogLevel /* log_level */) {
131 base::DictionaryValue* dict = new base::DictionaryValue();
132 dict->SetString("host", host_pair->first.ToString());
133 dict->SetString("proxy", host_pair->second.ToPacString());
137 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
138 bool clear_persisted,
139 NetLog::LogLevel /* log_level */) {
140 base::DictionaryValue* dict = new base::DictionaryValue();
141 dict->SetString("host", host_port_pair.ToString());
142 dict->SetBoolean("clear_persisted", clear_persisted);
146 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
147 SpdySettingsFlags flags,
149 NetLog::LogLevel /* log_level */) {
150 base::DictionaryValue* dict = new base::DictionaryValue();
151 dict->SetInteger("id", id);
152 dict->SetInteger("flags", flags);
153 dict->SetInteger("value", value);
157 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
158 NetLog::LogLevel /* log_level */) {
159 base::DictionaryValue* dict = new base::DictionaryValue();
160 base::ListValue* settings_list = new base::ListValue();
161 for (SettingsMap::const_iterator it = settings->begin();
162 it != settings->end(); ++it) {
163 const SpdySettingsIds id = it->first;
164 const SpdySettingsFlags flags = it->second.first;
165 const uint32 value = it->second.second;
166 settings_list->Append(new base::StringValue(
167 base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
169 dict->Set("settings", settings_list);
173 base::Value* NetLogSpdyWindowUpdateFrameCallback(
174 SpdyStreamId stream_id,
176 NetLog::LogLevel /* log_level */) {
177 base::DictionaryValue* dict = new base::DictionaryValue();
178 dict->SetInteger("stream_id", static_cast<int>(stream_id));
179 dict->SetInteger("delta", delta);
183 base::Value* NetLogSpdySessionWindowUpdateCallback(
186 NetLog::LogLevel /* log_level */) {
187 base::DictionaryValue* dict = new base::DictionaryValue();
188 dict->SetInteger("delta", delta);
189 dict->SetInteger("window_size", window_size);
193 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
196 NetLog::LogLevel /* log_level */) {
197 base::DictionaryValue* dict = new base::DictionaryValue();
198 dict->SetInteger("stream_id", static_cast<int>(stream_id));
199 dict->SetInteger("size", size);
200 dict->SetBoolean("fin", fin);
204 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
206 const std::string* description,
207 NetLog::LogLevel /* log_level */) {
208 base::DictionaryValue* dict = new base::DictionaryValue();
209 dict->SetInteger("stream_id", static_cast<int>(stream_id));
210 dict->SetInteger("status", status);
211 dict->SetString("description", *description);
215 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
218 NetLog::LogLevel /* log_level */) {
219 base::DictionaryValue* dict = new base::DictionaryValue();
220 dict->SetInteger("unique_id", unique_id);
221 dict->SetString("type", type);
222 dict->SetBoolean("is_ack", is_ack);
226 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
228 int unclaimed_streams,
229 SpdyGoAwayStatus status,
230 NetLog::LogLevel /* log_level */) {
231 base::DictionaryValue* dict = new base::DictionaryValue();
232 dict->SetInteger("last_accepted_stream_id",
233 static_cast<int>(last_stream_id));
234 dict->SetInteger("active_streams", active_streams);
235 dict->SetInteger("unclaimed_streams", unclaimed_streams);
236 dict->SetInteger("status", static_cast<int>(status));
240 base::Value* NetLogSpdyPushPromiseReceivedCallback(
241 const SpdyHeaderBlock* headers,
242 SpdyStreamId stream_id,
243 SpdyStreamId promised_stream_id,
244 NetLog::LogLevel log_level) {
245 base::DictionaryValue* dict = new base::DictionaryValue();
247 SpdyHeaderBlockToListValue(*headers, log_level).release());
248 dict->SetInteger("id", stream_id);
249 dict->SetInteger("promised_stream_id", promised_stream_id);
253 // Helper function to return the total size of an array of objects
254 // with .size() member functions.
255 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
256 size_t total_size = 0;
257 for (size_t i = 0; i < N; ++i) {
258 total_size += arr[i].size();
263 // Helper class for std:find_if on STL container containing
264 // SpdyStreamRequest weak pointers.
265 class RequestEquals {
267 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
268 : request_(request) {}
270 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
271 return request_.get() == request.get();
275 const base::WeakPtr<SpdyStreamRequest> request_;
278 // The maximum number of concurrent streams we will ever create. Even if
279 // the server permits more, we will never exceed this limit.
280 const size_t kMaxConcurrentStreamLimit = 256;
284 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
285 SpdyFramer::SpdyError err) {
287 case SpdyFramer::SPDY_NO_ERROR:
288 return SPDY_ERROR_NO_ERROR;
289 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
290 return SPDY_ERROR_INVALID_CONTROL_FRAME;
291 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
292 return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
293 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
294 return SPDY_ERROR_ZLIB_INIT_FAILURE;
295 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
296 return SPDY_ERROR_UNSUPPORTED_VERSION;
297 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
298 return SPDY_ERROR_DECOMPRESS_FAILURE;
299 case SpdyFramer::SPDY_COMPRESS_FAILURE:
300 return SPDY_ERROR_COMPRESS_FAILURE;
301 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
302 return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
303 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
304 return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
305 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
306 return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
307 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
308 return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
309 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
310 return SPDY_ERROR_UNEXPECTED_FRAME;
313 return static_cast<SpdyProtocolErrorDetails>(-1);
317 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
319 case SpdyFramer::SPDY_NO_ERROR:
321 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
322 return ERR_SPDY_PROTOCOL_ERROR;
323 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
324 return ERR_SPDY_FRAME_SIZE_ERROR;
325 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
326 return ERR_SPDY_COMPRESSION_ERROR;
327 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
328 return ERR_SPDY_PROTOCOL_ERROR;
329 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
330 return ERR_SPDY_COMPRESSION_ERROR;
331 case SpdyFramer::SPDY_COMPRESS_FAILURE:
332 return ERR_SPDY_COMPRESSION_ERROR;
333 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
334 return ERR_SPDY_PROTOCOL_ERROR;
335 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
336 return ERR_SPDY_PROTOCOL_ERROR;
337 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
338 return ERR_SPDY_PROTOCOL_ERROR;
339 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
340 return ERR_SPDY_PROTOCOL_ERROR;
341 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
342 return ERR_SPDY_PROTOCOL_ERROR;
345 return ERR_SPDY_PROTOCOL_ERROR;
349 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
350 SpdyRstStreamStatus status) {
352 case RST_STREAM_PROTOCOL_ERROR:
353 return STATUS_CODE_PROTOCOL_ERROR;
354 case RST_STREAM_INVALID_STREAM:
355 return STATUS_CODE_INVALID_STREAM;
356 case RST_STREAM_REFUSED_STREAM:
357 return STATUS_CODE_REFUSED_STREAM;
358 case RST_STREAM_UNSUPPORTED_VERSION:
359 return STATUS_CODE_UNSUPPORTED_VERSION;
360 case RST_STREAM_CANCEL:
361 return STATUS_CODE_CANCEL;
362 case RST_STREAM_INTERNAL_ERROR:
363 return STATUS_CODE_INTERNAL_ERROR;
364 case RST_STREAM_FLOW_CONTROL_ERROR:
365 return STATUS_CODE_FLOW_CONTROL_ERROR;
366 case RST_STREAM_STREAM_IN_USE:
367 return STATUS_CODE_STREAM_IN_USE;
368 case RST_STREAM_STREAM_ALREADY_CLOSED:
369 return STATUS_CODE_STREAM_ALREADY_CLOSED;
370 case RST_STREAM_INVALID_CREDENTIALS:
371 return STATUS_CODE_INVALID_CREDENTIALS;
372 case RST_STREAM_FRAME_SIZE_ERROR:
373 return STATUS_CODE_FRAME_SIZE_ERROR;
374 case RST_STREAM_SETTINGS_TIMEOUT:
375 return STATUS_CODE_SETTINGS_TIMEOUT;
376 case RST_STREAM_CONNECT_ERROR:
377 return STATUS_CODE_CONNECT_ERROR;
378 case RST_STREAM_ENHANCE_YOUR_CALM:
379 return STATUS_CODE_ENHANCE_YOUR_CALM;
382 return static_cast<SpdyProtocolErrorDetails>(-1);
386 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
389 return GOAWAY_NO_ERROR;
390 case ERR_SPDY_PROTOCOL_ERROR:
391 return GOAWAY_PROTOCOL_ERROR;
392 case ERR_SPDY_FLOW_CONTROL_ERROR:
393 return GOAWAY_FLOW_CONTROL_ERROR;
394 case ERR_SPDY_FRAME_SIZE_ERROR:
395 return GOAWAY_FRAME_SIZE_ERROR;
396 case ERR_SPDY_COMPRESSION_ERROR:
397 return GOAWAY_COMPRESSION_ERROR;
398 case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
399 return GOAWAY_INADEQUATE_SECURITY;
401 return GOAWAY_PROTOCOL_ERROR;
405 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
406 SpdyMajorVersion protocol_version,
407 SpdyHeaderBlock* request_headers,
408 SpdyHeaderBlock* response_headers) {
409 DCHECK(response_headers);
410 DCHECK(request_headers);
411 for (SpdyHeaderBlock::const_iterator it = headers.begin();
414 SpdyHeaderBlock* to_insert = response_headers;
415 if (protocol_version == SPDY2) {
416 if (it->first == "url")
417 to_insert = request_headers;
419 const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
420 static const char* scheme = ":scheme";
421 static const char* path = ":path";
422 if (it->first == host || it->first == scheme || it->first == path)
423 to_insert = request_headers;
425 to_insert->insert(*it);
429 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
433 SpdyStreamRequest::~SpdyStreamRequest() {
437 int SpdyStreamRequest::StartRequest(
439 const base::WeakPtr<SpdySession>& session,
441 RequestPriority priority,
442 const BoundNetLog& net_log,
443 const CompletionCallback& callback) {
447 DCHECK(callback_.is_null());
452 priority_ = priority;
454 callback_ = callback;
456 base::WeakPtr<SpdyStream> stream;
457 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
465 void SpdyStreamRequest::CancelRequest() {
467 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
469 // Do this to cancel any pending CompleteStreamRequest() tasks.
470 weak_ptr_factory_.InvalidateWeakPtrs();
473 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
475 base::WeakPtr<SpdyStream> stream = stream_;
481 void SpdyStreamRequest::OnRequestCompleteSuccess(
482 const base::WeakPtr<SpdyStream>& stream) {
485 DCHECK(!callback_.is_null());
486 CompletionCallback callback = callback_;
493 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
496 DCHECK(!callback_.is_null());
497 CompletionCallback callback = callback_;
503 void SpdyStreamRequest::Reset() {
504 type_ = SPDY_BIDIRECTIONAL_STREAM;
508 priority_ = MINIMUM_PRIORITY;
509 net_log_ = BoundNetLog();
513 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
515 waiting_for_syn_reply(false) {}
517 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
519 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
522 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
524 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
526 SpdySession::PushedStreamInfo::PushedStreamInfo(
527 SpdyStreamId stream_id,
528 base::TimeTicks creation_time)
529 : stream_id(stream_id),
530 creation_time(creation_time) {}
532 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
535 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
536 const SSLInfo& ssl_info,
537 const std::string& old_hostname,
538 const std::string& new_hostname) {
539 // Pooling is prohibited if the server cert is not valid for the new domain,
540 // and for connections on which client certs were sent. It is also prohibited
541 // when channel ID was sent if the hosts are from different eTLDs+1.
542 if (IsCertStatusError(ssl_info.cert_status))
545 if (ssl_info.client_cert_sent)
548 if (ssl_info.channel_id_sent &&
549 ChannelIDService::GetDomainForHost(new_hostname) !=
550 ChannelIDService::GetDomainForHost(old_hostname)) {
555 if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
558 std::string pinning_failure_log;
559 if (!transport_security_state->CheckPublicKeyPins(
561 true, /* sni_available */
562 ssl_info.is_issued_by_known_root,
563 ssl_info.public_key_hashes,
564 &pinning_failure_log)) {
571 SpdySession::SpdySession(
572 const SpdySessionKey& spdy_session_key,
573 const base::WeakPtr<HttpServerProperties>& http_server_properties,
574 TransportSecurityState* transport_security_state,
575 bool verify_domain_authentication,
576 bool enable_sending_initial_data,
577 bool enable_compression,
578 bool enable_ping_based_connection_checking,
579 NextProto default_protocol,
580 size_t stream_initial_recv_window_size,
581 size_t initial_max_concurrent_streams,
582 size_t max_concurrent_streams_limit,
584 const HostPortPair& trusted_spdy_proxy,
586 : in_io_loop_(false),
587 spdy_session_key_(spdy_session_key),
589 http_server_properties_(http_server_properties),
590 transport_security_state_(transport_security_state),
591 read_buffer_(new IOBuffer(kReadBufferSize)),
592 stream_hi_water_mark_(kFirstStreamId),
593 num_pushed_streams_(0u),
594 num_active_pushed_streams_(0u),
595 in_flight_write_frame_type_(DATA),
596 in_flight_write_frame_size_(0),
598 certificate_error_code_(OK),
599 availability_state_(STATE_AVAILABLE),
600 read_state_(READ_STATE_DO_READ),
601 write_state_(WRITE_STATE_IDLE),
603 max_concurrent_streams_(initial_max_concurrent_streams == 0
604 ? kInitialMaxConcurrentStreams
605 : initial_max_concurrent_streams),
606 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
607 ? kMaxConcurrentStreamLimit
608 : max_concurrent_streams_limit),
609 max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
610 streams_initiated_count_(0),
611 streams_pushed_count_(0),
612 streams_pushed_and_claimed_count_(0),
613 streams_abandoned_count_(0),
614 total_bytes_received_(0),
615 sent_settings_(false),
616 received_settings_(false),
620 last_activity_time_(time_func()),
621 last_compressed_frame_len_(0),
622 check_ping_status_pending_(false),
623 send_connection_header_prefix_(false),
624 flow_control_state_(FLOW_CONTROL_NONE),
625 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
626 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
627 ? kDefaultInitialRecvWindowSize
628 : stream_initial_recv_window_size),
629 session_send_window_size_(0),
630 session_recv_window_size_(0),
631 session_unacked_recv_window_bytes_(0),
632 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
633 verify_domain_authentication_(verify_domain_authentication),
634 enable_sending_initial_data_(enable_sending_initial_data),
635 enable_compression_(enable_compression),
636 enable_ping_based_connection_checking_(
637 enable_ping_based_connection_checking),
638 protocol_(default_protocol),
639 connection_at_risk_of_loss_time_(
640 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
641 hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
642 trusted_spdy_proxy_(trusted_spdy_proxy),
643 time_func_(time_func),
644 weak_factory_(this) {
645 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
646 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
647 DCHECK(HttpStreamFactory::spdy_enabled());
649 NetLog::TYPE_SPDY_SESSION,
650 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
651 next_unclaimed_push_stream_sweep_time_ = time_func_() +
652 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
653 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
656 SpdySession::~SpdySession() {
660 // TODO(akalin): Check connection->is_initialized() instead. This
661 // requires re-working CreateFakeSpdySession(), though.
662 DCHECK(connection_->socket());
663 // With SPDY we can't recycle sockets.
664 connection_->socket()->Disconnect();
668 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
671 void SpdySession::InitializeWithSocket(
672 scoped_ptr<ClientSocketHandle> connection,
673 SpdySessionPool* pool,
675 int certificate_error_code) {
677 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
678 DCHECK_EQ(read_state_, READ_STATE_DO_READ);
679 DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
680 DCHECK(!connection_);
682 DCHECK(certificate_error_code == OK ||
683 certificate_error_code < ERR_IO_PENDING);
684 // TODO(akalin): Check connection->is_initialized() instead. This
685 // requires re-working CreateFakeSpdySession(), though.
686 DCHECK(connection->socket());
688 base::StatsCounter spdy_sessions("spdy.sessions");
689 spdy_sessions.Increment();
691 connection_ = connection.Pass();
692 is_secure_ = is_secure;
693 certificate_error_code_ = certificate_error_code;
695 NextProto protocol_negotiated =
696 connection_->socket()->GetNegotiatedProtocol();
697 if (protocol_negotiated != kProtoUnknown) {
698 protocol_ = protocol_negotiated;
700 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
701 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
703 if (protocol_ == kProtoSPDY4)
704 send_connection_header_prefix_ = true;
706 if (protocol_ >= kProtoSPDY31) {
707 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
708 session_send_window_size_ = kSpdySessionInitialWindowSize;
709 session_recv_window_size_ = kSpdySessionInitialWindowSize;
710 } else if (protocol_ >= kProtoSPDY3) {
711 flow_control_state_ = FLOW_CONTROL_STREAM;
713 flow_control_state_ = FLOW_CONTROL_NONE;
716 buffered_spdy_framer_.reset(
717 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
718 enable_compression_));
719 buffered_spdy_framer_->set_visitor(this);
720 buffered_spdy_framer_->set_debug_visitor(this);
721 UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
722 #if defined(SPDY_PROXY_AUTH_ORIGIN)
723 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
724 host_port_pair().Equals(HostPortPair::FromURL(
725 GURL(SPDY_PROXY_AUTH_ORIGIN))));
729 NetLog::TYPE_SPDY_SESSION_INITIALIZED,
730 connection_->socket()->NetLog().source().ToEventParametersCallback());
732 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
733 connection_->AddHigherLayeredPool(this);
734 if (enable_sending_initial_data_)
738 // Bootstrap the read loop.
739 base::MessageLoop::current()->PostTask(
741 base::Bind(&SpdySession::PumpReadLoop,
742 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
745 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
746 if (!verify_domain_authentication_)
749 if (availability_state_ == STATE_DRAINING)
753 bool was_npn_negotiated;
754 NextProto protocol_negotiated = kProtoUnknown;
755 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
756 return true; // This is not a secure session, so all domains are okay.
758 return CanPool(transport_security_state_, ssl_info,
759 host_port_pair().host(), domain);
762 int SpdySession::GetPushStream(
764 base::WeakPtr<SpdyStream>* stream,
765 const BoundNetLog& stream_net_log) {
770 if (availability_state_ == STATE_DRAINING)
771 return ERR_CONNECTION_CLOSED;
773 Error err = TryAccessStream(url);
777 *stream = GetActivePushStream(url);
779 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
780 streams_pushed_and_claimed_count_++;
785 // {,Try}CreateStream() and TryAccessStream() can be called with
786 // |in_io_loop_| set if a stream is being created in response to
787 // another being closed due to received data.
789 Error SpdySession::TryAccessStream(const GURL& url) {
790 if (is_secure_ && certificate_error_code_ != OK &&
791 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
792 RecordProtocolErrorHistogram(
793 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
795 static_cast<Error>(certificate_error_code_),
796 "Tried to get SPDY stream for secure content over an unauthenticated "
798 return ERR_SPDY_PROTOCOL_ERROR;
803 int SpdySession::TryCreateStream(
804 const base::WeakPtr<SpdyStreamRequest>& request,
805 base::WeakPtr<SpdyStream>* stream) {
808 if (availability_state_ == STATE_GOING_AWAY)
811 if (availability_state_ == STATE_DRAINING)
812 return ERR_CONNECTION_CLOSED;
814 Error err = TryAccessStream(request->url());
818 if (!max_concurrent_streams_ ||
819 (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
820 max_concurrent_streams_)) {
821 return CreateStream(*request, stream);
825 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
826 RequestPriority priority = request->priority();
827 CHECK_GE(priority, MINIMUM_PRIORITY);
828 CHECK_LE(priority, MAXIMUM_PRIORITY);
829 pending_create_stream_queues_[priority].push_back(request);
830 return ERR_IO_PENDING;
833 int SpdySession::CreateStream(const SpdyStreamRequest& request,
834 base::WeakPtr<SpdyStream>* stream) {
835 DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
836 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
838 if (availability_state_ == STATE_GOING_AWAY)
841 if (availability_state_ == STATE_DRAINING)
842 return ERR_CONNECTION_CLOSED;
844 Error err = TryAccessStream(request.url());
846 // This should have been caught in TryCreateStream().
851 DCHECK(connection_->socket());
852 DCHECK(connection_->socket()->IsConnected());
853 if (connection_->socket()) {
854 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
855 connection_->socket()->IsConnected());
856 if (!connection_->socket()->IsConnected()) {
858 ERR_CONNECTION_CLOSED,
859 "Tried to create SPDY stream for a closed socket connection.");
860 return ERR_CONNECTION_CLOSED;
864 scoped_ptr<SpdyStream> new_stream(
865 new SpdyStream(request.type(), GetWeakPtr(), request.url(),
867 stream_initial_send_window_size_,
868 stream_initial_recv_window_size_,
870 *stream = new_stream->GetWeakPtr();
871 InsertCreatedStream(new_stream.Pass());
873 UMA_HISTOGRAM_CUSTOM_COUNTS(
874 "Net.SpdyPriorityCount",
875 static_cast<int>(request.priority()), 0, 10, 11);
880 void SpdySession::CancelStreamRequest(
881 const base::WeakPtr<SpdyStreamRequest>& request) {
883 RequestPriority priority = request->priority();
884 CHECK_GE(priority, MINIMUM_PRIORITY);
885 CHECK_LE(priority, MAXIMUM_PRIORITY);
888 // |request| should not be in a queue not matching its priority.
889 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
892 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
893 DCHECK(std::find_if(queue->begin(),
895 RequestEquals(request)) == queue->end());
899 PendingStreamRequestQueue* queue =
900 &pending_create_stream_queues_[priority];
901 // Remove |request| from |queue| while preserving the order of the
903 PendingStreamRequestQueue::iterator it =
904 std::find_if(queue->begin(), queue->end(), RequestEquals(request));
905 // The request may already be removed if there's a
906 // CompleteStreamRequest() in flight.
907 if (it != queue->end()) {
908 it = queue->erase(it);
909 // |request| should be in the queue at most once, and if it is
910 // present, should not be pending completion.
911 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
916 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
917 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
918 if (pending_create_stream_queues_[j].empty())
921 base::WeakPtr<SpdyStreamRequest> pending_request =
922 pending_create_stream_queues_[j].front();
923 DCHECK(pending_request);
924 pending_create_stream_queues_[j].pop_front();
925 return pending_request;
927 return base::WeakPtr<SpdyStreamRequest>();
930 void SpdySession::ProcessPendingStreamRequests() {
931 // Like |max_concurrent_streams_|, 0 means infinite for
932 // |max_requests_to_process|.
933 size_t max_requests_to_process = 0;
934 if (max_concurrent_streams_ != 0) {
935 max_requests_to_process =
936 max_concurrent_streams_ -
937 (active_streams_.size() + created_streams_.size());
940 max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
941 base::WeakPtr<SpdyStreamRequest> pending_request =
942 GetNextPendingStreamRequest();
943 if (!pending_request)
946 // Note that this post can race with other stream creations, and it's
947 // possible that the un-stalled stream will be stalled again if it loses.
948 // TODO(jgraettinger): Provide stronger ordering guarantees.
949 base::MessageLoop::current()->PostTask(
951 base::Bind(&SpdySession::CompleteStreamRequest,
952 weak_factory_.GetWeakPtr(),
957 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
958 pooled_aliases_.insert(alias_key);
961 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
962 DCHECK(buffered_spdy_framer_.get());
963 return buffered_spdy_framer_->protocol_version();
966 bool SpdySession::HasAcceptableTransportSecurity() const {
967 // If we're not even using TLS, we have no standards to meet.
972 // We don't enforce transport security standards for older SPDY versions.
973 if (GetProtocolVersion() < SPDY4) {
978 CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
980 // HTTP/2 requires TLS 1.2+
981 if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
982 SSL_CONNECTION_VERSION_TLS1_2) {
986 if (!IsSecureTLSCipherSuite(
987 SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
994 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
995 return weak_factory_.GetWeakPtr();
998 bool SpdySession::CloseOneIdleConnection() {
1001 if (active_streams_.empty()) {
1002 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1004 // Return false as the socket wasn't immediately closed.
1008 void SpdySession::EnqueueStreamWrite(
1009 const base::WeakPtr<SpdyStream>& stream,
1010 SpdyFrameType frame_type,
1011 scoped_ptr<SpdyBufferProducer> producer) {
1012 DCHECK(frame_type == HEADERS ||
1013 frame_type == DATA ||
1014 frame_type == CREDENTIAL ||
1015 frame_type == SYN_STREAM);
1016 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1019 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1020 SpdyStreamId stream_id,
1021 RequestPriority priority,
1022 SpdyControlFlags flags,
1023 const SpdyHeaderBlock& block) {
1024 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1025 CHECK(it != active_streams_.end());
1026 CHECK_EQ(it->second.stream->stream_id(), stream_id);
1028 SendPrefacePingIfNoneInFlight();
1030 DCHECK(buffered_spdy_framer_.get());
1031 SpdyPriority spdy_priority =
1032 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1034 scoped_ptr<SpdyFrame> syn_frame;
1035 // TODO(hkhalil): Avoid copy of |block|.
1036 if (GetProtocolVersion() <= SPDY3) {
1037 SpdySynStreamIR syn_stream(stream_id);
1038 syn_stream.set_associated_to_stream_id(0);
1039 syn_stream.set_priority(spdy_priority);
1040 syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1041 syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1042 syn_stream.set_name_value_block(block);
1043 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1045 SpdyHeadersIR headers(stream_id);
1046 headers.set_priority(spdy_priority);
1047 headers.set_has_priority(true);
1048 headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1049 headers.set_name_value_block(block);
1050 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1053 base::StatsCounter spdy_requests("spdy.requests");
1054 spdy_requests.Increment();
1055 streams_initiated_count_++;
1057 if (net_log().IsLogging()) {
1058 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1059 base::Bind(&NetLogSpdySynStreamSentCallback,
1061 (flags & CONTROL_FLAG_FIN) != 0,
1062 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1067 return syn_frame.Pass();
1070 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1073 SpdyDataFlags flags) {
1074 if (availability_state_ == STATE_DRAINING) {
1075 return scoped_ptr<SpdyBuffer>();
1078 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1079 CHECK(it != active_streams_.end());
1080 SpdyStream* stream = it->second.stream;
1081 CHECK_EQ(stream->stream_id(), stream_id);
1085 return scoped_ptr<SpdyBuffer>();
1088 int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1090 bool send_stalled_by_stream =
1091 (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1092 (stream->send_window_size() <= 0);
1093 bool send_stalled_by_session = IsSendStalled();
1095 // NOTE: There's an enum of the same name in histograms.xml.
1096 enum SpdyFrameFlowControlState {
1098 SEND_STALLED_BY_STREAM,
1099 SEND_STALLED_BY_SESSION,
1100 SEND_STALLED_BY_STREAM_AND_SESSION,
1103 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1104 if (send_stalled_by_stream) {
1105 if (send_stalled_by_session) {
1106 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1108 frame_flow_control_state = SEND_STALLED_BY_STREAM;
1110 } else if (send_stalled_by_session) {
1111 frame_flow_control_state = SEND_STALLED_BY_SESSION;
1114 if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1115 UMA_HISTOGRAM_ENUMERATION(
1116 "Net.SpdyFrameStreamFlowControlState",
1117 frame_flow_control_state,
1118 SEND_STALLED_BY_STREAM + 1);
1119 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1120 UMA_HISTOGRAM_ENUMERATION(
1121 "Net.SpdyFrameStreamAndSessionFlowControlState",
1122 frame_flow_control_state,
1123 SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1126 // Obey send window size of the stream if stream flow control is
1128 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1129 if (send_stalled_by_stream) {
1130 stream->set_send_stalled_by_flow_control(true);
1131 // Even though we're currently stalled only by the stream, we
1132 // might end up being stalled by the session also.
1133 QueueSendStalledStream(*stream);
1135 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1136 NetLog::IntegerCallback("stream_id", stream_id));
1137 return scoped_ptr<SpdyBuffer>();
1140 effective_len = std::min(effective_len, stream->send_window_size());
1143 // Obey send window size of the session if session flow control is
1145 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1146 if (send_stalled_by_session) {
1147 stream->set_send_stalled_by_flow_control(true);
1148 QueueSendStalledStream(*stream);
1150 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1151 NetLog::IntegerCallback("stream_id", stream_id));
1152 return scoped_ptr<SpdyBuffer>();
1155 effective_len = std::min(effective_len, session_send_window_size_);
1158 DCHECK_GE(effective_len, 0);
1160 // Clear FIN flag if only some of the data will be in the data
1162 if (effective_len < len)
1163 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1165 if (net_log().IsLogging()) {
1167 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1168 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1169 (flags & DATA_FLAG_FIN) != 0));
1172 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1173 if (effective_len > 0)
1174 SendPrefacePingIfNoneInFlight();
1176 // TODO(mbelshe): reduce memory copies here.
1177 DCHECK(buffered_spdy_framer_.get());
1178 scoped_ptr<SpdyFrame> frame(
1179 buffered_spdy_framer_->CreateDataFrame(
1180 stream_id, data->data(),
1181 static_cast<uint32>(effective_len), flags));
1183 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1185 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1186 DecreaseSendWindowSize(static_cast<int32>(effective_len));
1187 data_buffer->AddConsumeCallback(
1188 base::Bind(&SpdySession::OnWriteBufferConsumed,
1189 weak_factory_.GetWeakPtr(),
1190 static_cast<size_t>(effective_len)));
1193 return data_buffer.Pass();
1196 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1197 DCHECK_NE(stream_id, 0u);
1199 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1200 if (it == active_streams_.end()) {
1205 CloseActiveStreamIterator(it, status);
1208 void SpdySession::CloseCreatedStream(
1209 const base::WeakPtr<SpdyStream>& stream, int status) {
1210 DCHECK_EQ(stream->stream_id(), 0u);
1212 CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1213 if (it == created_streams_.end()) {
1218 CloseCreatedStreamIterator(it, status);
1221 void SpdySession::ResetStream(SpdyStreamId stream_id,
1222 SpdyRstStreamStatus status,
1223 const std::string& description) {
1224 DCHECK_NE(stream_id, 0u);
1226 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1227 if (it == active_streams_.end()) {
1232 ResetStreamIterator(it, status, description);
1235 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1236 return ContainsKey(active_streams_, stream_id);
1239 LoadState SpdySession::GetLoadState() const {
1240 // Just report that we're idle since the session could be doing
1241 // many things concurrently.
1242 return LOAD_STATE_IDLE;
1245 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1247 // TODO(mbelshe): We should send a RST_STREAM control frame here
1248 // so that the server can cancel a large send.
1250 scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1251 active_streams_.erase(it);
1253 // TODO(akalin): When SpdyStream was ref-counted (and
1254 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1255 // was only done when status was not OK. This meant that pushed
1256 // streams can still be claimed after they're closed. This is
1257 // probably something that we still want to support, although server
1258 // push is hardly used. Write tests for this and fix this. (See
1259 // http://crbug.com/261712 .)
1260 if (owned_stream->type() == SPDY_PUSH_STREAM) {
1261 unclaimed_pushed_streams_.erase(owned_stream->url());
1262 num_pushed_streams_--;
1263 if (!owned_stream->IsReservedRemote())
1264 num_active_pushed_streams_--;
1267 DeleteStream(owned_stream.Pass(), status);
1268 MaybeFinishGoingAway();
1270 // If there are no active streams and the socket pool is stalled, close the
1271 // session to free up a socket slot.
1272 if (active_streams_.empty() && connection_->IsPoolStalled()) {
1273 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1277 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1279 scoped_ptr<SpdyStream> owned_stream(*it);
1280 created_streams_.erase(it);
1281 DeleteStream(owned_stream.Pass(), status);
1284 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1285 SpdyRstStreamStatus status,
1286 const std::string& description) {
1287 // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1289 SpdyStreamId stream_id = it->first;
1290 RequestPriority priority = it->second.stream->priority();
1291 EnqueueResetStreamFrame(stream_id, priority, status, description);
1293 // Removes any pending writes for the stream except for possibly an
1295 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1298 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1299 RequestPriority priority,
1300 SpdyRstStreamStatus status,
1301 const std::string& description) {
1302 DCHECK_NE(stream_id, 0u);
1305 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1306 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1308 DCHECK(buffered_spdy_framer_.get());
1309 scoped_ptr<SpdyFrame> rst_frame(
1310 buffered_spdy_framer_->CreateRstStream(stream_id, status));
1312 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1313 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1316 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1317 CHECK(!in_io_loop_);
1318 if (availability_state_ == STATE_DRAINING) {
1321 ignore_result(DoReadLoop(expected_read_state, result));
1324 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1325 CHECK(!in_io_loop_);
1326 CHECK_EQ(read_state_, expected_read_state);
1330 int bytes_read_without_yielding = 0;
1332 // Loop until the session is draining, the read becomes blocked, or
1333 // the read limit is exceeded.
1335 switch (read_state_) {
1336 case READ_STATE_DO_READ:
1337 CHECK_EQ(result, OK);
1340 case READ_STATE_DO_READ_COMPLETE:
1342 bytes_read_without_yielding += result;
1343 result = DoReadComplete(result);
1346 NOTREACHED() << "read_state_: " << read_state_;
1350 if (availability_state_ == STATE_DRAINING)
1353 if (result == ERR_IO_PENDING)
1356 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1357 read_state_ = READ_STATE_DO_READ;
1358 base::MessageLoop::current()->PostTask(
1360 base::Bind(&SpdySession::PumpReadLoop,
1361 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1362 result = ERR_IO_PENDING;
1368 in_io_loop_ = false;
1373 int SpdySession::DoRead() {
1377 CHECK(connection_->socket());
1378 read_state_ = READ_STATE_DO_READ_COMPLETE;
1379 return connection_->socket()->Read(
1382 base::Bind(&SpdySession::PumpReadLoop,
1383 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1386 int SpdySession::DoReadComplete(int result) {
1389 // Parse a frame. For now this code requires that the frame fit into our
1390 // buffer (kReadBufferSize).
1391 // TODO(mbelshe): support arbitrarily large frames!
1394 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1395 total_bytes_received_, 1, 100000000, 50);
1396 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1398 return ERR_CONNECTION_CLOSED;
1402 DoDrainSession(static_cast<Error>(result), "result is < 0.");
1405 CHECK_LE(result, kReadBufferSize);
1406 total_bytes_received_ += result;
1408 last_activity_time_ = time_func_();
1410 DCHECK(buffered_spdy_framer_.get());
1411 char* data = read_buffer_->data();
1412 while (result > 0) {
1413 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1414 result -= bytes_processed;
1415 data += bytes_processed;
1417 if (availability_state_ == STATE_DRAINING) {
1418 return ERR_CONNECTION_CLOSED;
1421 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1424 read_state_ = READ_STATE_DO_READ;
1428 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1429 CHECK(!in_io_loop_);
1430 DCHECK_EQ(write_state_, expected_write_state);
1432 DoWriteLoop(expected_write_state, result);
1434 if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1435 write_queue_.IsEmpty()) {
1436 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|.
1441 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1442 CHECK(!in_io_loop_);
1443 DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1444 DCHECK_EQ(write_state_, expected_write_state);
1448 // Loop until the session is closed or the write becomes blocked.
1450 switch (write_state_) {
1451 case WRITE_STATE_DO_WRITE:
1452 DCHECK_EQ(result, OK);
1455 case WRITE_STATE_DO_WRITE_COMPLETE:
1456 result = DoWriteComplete(result);
1458 case WRITE_STATE_IDLE:
1460 NOTREACHED() << "write_state_: " << write_state_;
1464 if (write_state_ == WRITE_STATE_IDLE) {
1465 DCHECK_EQ(result, ERR_IO_PENDING);
1469 if (result == ERR_IO_PENDING)
1474 in_io_loop_ = false;
1479 int SpdySession::DoWrite() {
1482 DCHECK(buffered_spdy_framer_);
1483 if (in_flight_write_) {
1484 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1486 // Grab the next frame to send.
1487 SpdyFrameType frame_type = DATA;
1488 scoped_ptr<SpdyBufferProducer> producer;
1489 base::WeakPtr<SpdyStream> stream;
1490 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1491 write_state_ = WRITE_STATE_IDLE;
1492 return ERR_IO_PENDING;
1496 CHECK(!stream->IsClosed());
1498 // Activate the stream only when sending the SYN_STREAM frame to
1499 // guarantee monotonically-increasing stream IDs.
1500 if (frame_type == SYN_STREAM) {
1501 CHECK(stream.get());
1502 CHECK_EQ(stream->stream_id(), 0u);
1503 scoped_ptr<SpdyStream> owned_stream =
1504 ActivateCreatedStream(stream.get());
1505 InsertActivatedStream(owned_stream.Pass());
1507 if (stream_hi_water_mark_ > kLastStreamId) {
1508 CHECK_EQ(stream->stream_id(), kLastStreamId);
1509 // We've exhausted the stream ID space, and no new streams may be
1510 // created after this one.
1512 StartGoingAway(kLastStreamId, ERR_ABORTED);
1516 in_flight_write_ = producer->ProduceBuffer();
1517 if (!in_flight_write_) {
1519 return ERR_UNEXPECTED;
1521 in_flight_write_frame_type_ = frame_type;
1522 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1523 DCHECK_GE(in_flight_write_frame_size_,
1524 buffered_spdy_framer_->GetFrameMinimumSize());
1525 in_flight_write_stream_ = stream;
1528 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1530 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1531 // with Socket implementations that don't store their IOBuffer
1532 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1533 scoped_refptr<IOBuffer> write_io_buffer =
1534 in_flight_write_->GetIOBufferForRemainingData();
1535 return connection_->socket()->Write(
1536 write_io_buffer.get(),
1537 in_flight_write_->GetRemainingSize(),
1538 base::Bind(&SpdySession::PumpWriteLoop,
1539 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1542 int SpdySession::DoWriteComplete(int result) {
1544 DCHECK_NE(result, ERR_IO_PENDING);
1545 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1547 last_activity_time_ = time_func_();
1550 DCHECK_NE(result, ERR_IO_PENDING);
1551 in_flight_write_.reset();
1552 in_flight_write_frame_type_ = DATA;
1553 in_flight_write_frame_size_ = 0;
1554 in_flight_write_stream_.reset();
1555 write_state_ = WRITE_STATE_DO_WRITE;
1556 DoDrainSession(static_cast<Error>(result), "Write error");
1560 // It should not be possible to have written more bytes than our
1561 // in_flight_write_.
1562 DCHECK_LE(static_cast<size_t>(result),
1563 in_flight_write_->GetRemainingSize());
1566 in_flight_write_->Consume(static_cast<size_t>(result));
1568 // We only notify the stream when we've fully written the pending frame.
1569 if (in_flight_write_->GetRemainingSize() == 0) {
1570 // It is possible that the stream was cancelled while we were
1571 // writing to the socket.
1572 if (in_flight_write_stream_.get()) {
1573 DCHECK_GT(in_flight_write_frame_size_, 0u);
1574 in_flight_write_stream_->OnFrameWriteComplete(
1575 in_flight_write_frame_type_,
1576 in_flight_write_frame_size_);
1579 // Cleanup the write which just completed.
1580 in_flight_write_.reset();
1581 in_flight_write_frame_type_ = DATA;
1582 in_flight_write_frame_size_ = 0;
1583 in_flight_write_stream_.reset();
1587 write_state_ = WRITE_STATE_DO_WRITE;
1591 void SpdySession::DcheckGoingAway() const {
1593 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1594 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1595 DCHECK(pending_create_stream_queues_[i].empty());
1597 DCHECK(created_streams_.empty());
1601 void SpdySession::DcheckDraining() const {
1603 DCHECK_EQ(availability_state_, STATE_DRAINING);
1604 DCHECK(active_streams_.empty());
1605 DCHECK(unclaimed_pushed_streams_.empty());
1608 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1610 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1612 // The loops below are carefully written to avoid reentrancy problems.
1615 size_t old_size = GetTotalSize(pending_create_stream_queues_);
1616 base::WeakPtr<SpdyStreamRequest> pending_request =
1617 GetNextPendingStreamRequest();
1618 if (!pending_request)
1620 // No new stream requests should be added while the session is
1622 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1623 pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1627 size_t old_size = active_streams_.size();
1628 ActiveStreamMap::iterator it =
1629 active_streams_.lower_bound(last_good_stream_id + 1);
1630 if (it == active_streams_.end())
1632 LogAbandonedActiveStream(it, status);
1633 CloseActiveStreamIterator(it, status);
1634 // No new streams should be activated while the session is going
1636 DCHECK_GT(old_size, active_streams_.size());
1639 while (!created_streams_.empty()) {
1640 size_t old_size = created_streams_.size();
1641 CreatedStreamSet::iterator it = created_streams_.begin();
1642 LogAbandonedStream(*it, status);
1643 CloseCreatedStreamIterator(it, status);
1644 // No new streams should be created while the session is going
1646 DCHECK_GT(old_size, created_streams_.size());
1649 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1654 void SpdySession::MaybeFinishGoingAway() {
1655 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1656 DoDrainSession(OK, "Finished going away");
1660 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1661 if (availability_state_ == STATE_DRAINING) {
1666 // If |err| indicates an error occurred, inform the peer that we're closing
1667 // and why. Don't GOAWAY on a graceful or idle close, as that may
1668 // unnecessarily wake the radio. We could technically GOAWAY on network errors
1669 // (we'll probably fail to actually write it, but that's okay), however many
1670 // unit-tests would need to be updated.
1672 err != ERR_ABORTED && // Used by SpdySessionPool to close idle sessions.
1673 err != ERR_NETWORK_CHANGED && // Used to deprecate sessions on IP change.
1674 err != ERR_SOCKET_NOT_CONNECTED &&
1675 err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1676 // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1677 SpdyGoAwayIR goaway_ir(0, // Last accepted stream ID.
1678 MapNetErrorToGoAwayStatus(err),
1680 EnqueueSessionWrite(HIGHEST,
1682 scoped_ptr<SpdyFrame>(
1683 buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1686 availability_state_ = STATE_DRAINING;
1687 error_on_close_ = err;
1690 NetLog::TYPE_SPDY_SESSION_CLOSE,
1691 base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1693 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1694 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1695 total_bytes_received_, 1, 100000000, 50);
1698 // We ought to be going away already, as this is a graceful close.
1701 StartGoingAway(0, err);
1704 MaybePostWriteLoop();
1707 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1709 std::string description = base::StringPrintf(
1710 "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1711 stream->url().spec();
1712 stream->LogStreamError(status, description);
1713 // We don't increment the streams abandoned counter here. If the
1714 // stream isn't active (i.e., it hasn't written anything to the wire
1715 // yet) then it's as if it never existed. If it is active, then
1716 // LogAbandonedActiveStream() will increment the counters.
1719 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1721 DCHECK_GT(it->first, 0u);
1722 LogAbandonedStream(it->second.stream, status);
1723 ++streams_abandoned_count_;
1724 base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1725 abandoned_streams.Increment();
1726 if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1727 unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1728 unclaimed_pushed_streams_.end()) {
1729 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1730 abandoned_push_streams.Increment();
1734 SpdyStreamId SpdySession::GetNewStreamId() {
1735 CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1736 SpdyStreamId id = stream_hi_water_mark_;
1737 stream_hi_water_mark_ += 2;
1741 void SpdySession::CloseSessionOnError(Error err,
1742 const std::string& description) {
1743 DCHECK_LT(err, ERR_IO_PENDING);
1744 DoDrainSession(err, description);
1747 void SpdySession::MakeUnavailable() {
1748 if (availability_state_ == STATE_AVAILABLE) {
1749 availability_state_ = STATE_GOING_AWAY;
1750 pool_->MakeSessionUnavailable(GetWeakPtr());
1754 base::Value* SpdySession::GetInfoAsValue() const {
1755 base::DictionaryValue* dict = new base::DictionaryValue();
1757 dict->SetInteger("source_id", net_log_.source().id);
1759 dict->SetString("host_port_pair", host_port_pair().ToString());
1760 if (!pooled_aliases_.empty()) {
1761 base::ListValue* alias_list = new base::ListValue();
1762 for (std::set<SpdySessionKey>::const_iterator it =
1763 pooled_aliases_.begin();
1764 it != pooled_aliases_.end(); it++) {
1765 alias_list->Append(new base::StringValue(
1766 it->host_port_pair().ToString()));
1768 dict->Set("aliases", alias_list);
1770 dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1772 dict->SetInteger("active_streams", active_streams_.size());
1774 dict->SetInteger("unclaimed_pushed_streams",
1775 unclaimed_pushed_streams_.size());
1777 dict->SetBoolean("is_secure", is_secure_);
1779 dict->SetString("protocol_negotiated",
1780 SSLClientSocket::NextProtoToString(
1781 connection_->socket()->GetNegotiatedProtocol()));
1783 dict->SetInteger("error", error_on_close_);
1784 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1786 dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1787 dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1788 dict->SetInteger("streams_pushed_and_claimed_count",
1789 streams_pushed_and_claimed_count_);
1790 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1791 DCHECK(buffered_spdy_framer_.get());
1792 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1794 dict->SetBoolean("sent_settings", sent_settings_);
1795 dict->SetBoolean("received_settings", received_settings_);
1797 dict->SetInteger("send_window_size", session_send_window_size_);
1798 dict->SetInteger("recv_window_size", session_recv_window_size_);
1799 dict->SetInteger("unacked_recv_window_bytes",
1800 session_unacked_recv_window_bytes_);
1804 bool SpdySession::IsReused() const {
1805 return buffered_spdy_framer_->frames_received() > 0 ||
1806 connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1809 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1810 LoadTimingInfo* load_timing_info) const {
1811 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1815 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1816 int rv = ERR_SOCKET_NOT_CONNECTED;
1817 if (connection_->socket()) {
1818 rv = connection_->socket()->GetPeerAddress(address);
1821 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1822 rv == ERR_SOCKET_NOT_CONNECTED);
1827 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1828 int rv = ERR_SOCKET_NOT_CONNECTED;
1829 if (connection_->socket()) {
1830 rv = connection_->socket()->GetLocalAddress(address);
1833 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1834 rv == ERR_SOCKET_NOT_CONNECTED);
1839 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1840 SpdyFrameType frame_type,
1841 scoped_ptr<SpdyFrame> frame) {
1842 DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1843 frame_type == WINDOW_UPDATE || frame_type == PING ||
1844 frame_type == GOAWAY);
1846 priority, frame_type,
1847 scoped_ptr<SpdyBufferProducer>(
1848 new SimpleBufferProducer(
1849 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1850 base::WeakPtr<SpdyStream>());
1853 void SpdySession::EnqueueWrite(RequestPriority priority,
1854 SpdyFrameType frame_type,
1855 scoped_ptr<SpdyBufferProducer> producer,
1856 const base::WeakPtr<SpdyStream>& stream) {
1857 if (availability_state_ == STATE_DRAINING)
1860 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1861 MaybePostWriteLoop();
1864 void SpdySession::MaybePostWriteLoop() {
1865 if (write_state_ == WRITE_STATE_IDLE) {
1866 CHECK(!in_flight_write_);
1867 write_state_ = WRITE_STATE_DO_WRITE;
1868 base::MessageLoop::current()->PostTask(
1870 base::Bind(&SpdySession::PumpWriteLoop,
1871 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1875 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1876 CHECK_EQ(stream->stream_id(), 0u);
1877 CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1878 created_streams_.insert(stream.release());
1881 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1882 CHECK_EQ(stream->stream_id(), 0u);
1883 CHECK(created_streams_.find(stream) != created_streams_.end());
1884 stream->set_stream_id(GetNewStreamId());
1885 scoped_ptr<SpdyStream> owned_stream(stream);
1886 created_streams_.erase(stream);
1887 return owned_stream.Pass();
1890 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1891 SpdyStreamId stream_id = stream->stream_id();
1892 CHECK_NE(stream_id, 0u);
1893 std::pair<ActiveStreamMap::iterator, bool> result =
1894 active_streams_.insert(
1895 std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1896 CHECK(result.second);
1897 ignore_result(stream.release());
1900 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1901 if (in_flight_write_stream_.get() == stream.get()) {
1902 // If we're deleting the stream for the in-flight write, we still
1903 // need to let the write complete, so we clear
1904 // |in_flight_write_stream_| and let the write finish on its own
1905 // without notifying |in_flight_write_stream_|.
1906 in_flight_write_stream_.reset();
1909 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1910 stream->OnClose(status);
1912 if (availability_state_ == STATE_AVAILABLE) {
1913 ProcessPendingStreamRequests();
1917 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1918 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1920 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1921 if (unclaimed_it == unclaimed_pushed_streams_.end())
1922 return base::WeakPtr<SpdyStream>();
1924 SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1925 unclaimed_pushed_streams_.erase(unclaimed_it);
1927 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1928 if (active_it == active_streams_.end()) {
1930 return base::WeakPtr<SpdyStream>();
1933 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1934 used_push_streams.Increment();
1935 return active_it->second.stream->GetWeakPtr();
1938 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1939 bool* was_npn_negotiated,
1940 NextProto* protocol_negotiated) {
1941 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1942 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1943 return connection_->socket()->GetSSLInfo(ssl_info);
1946 bool SpdySession::GetSSLCertRequestInfo(
1947 SSLCertRequestInfo* cert_request_info) {
1950 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1954 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1957 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1958 std::string description =
1959 base::StringPrintf("Framer error: %d (%s).",
1961 SpdyFramer::ErrorCodeToString(error_code));
1962 DoDrainSession(MapFramerErrorToNetError(error_code), description);
1965 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1966 const std::string& description) {
1969 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1970 if (it == active_streams_.end()) {
1971 // We still want to send a frame to reset the stream even if we
1972 // don't know anything about it.
1973 EnqueueResetStreamFrame(
1974 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1978 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1981 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1986 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1988 // By the time data comes in, the stream may already be inactive.
1989 if (it == active_streams_.end())
1992 SpdyStream* stream = it->second.stream;
1993 CHECK_EQ(stream->stream_id(), stream_id);
1995 DCHECK(buffered_spdy_framer_);
1996 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1997 stream->IncrementRawReceivedBytes(header_len);
2000 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2006 if (data == NULL && len != 0) {
2007 // This is notification of consumed data padding.
2008 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2009 // See crbug.com/353012.
2013 DCHECK_LT(len, 1u << 24);
2014 if (net_log().IsLogging()) {
2016 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2017 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2020 // Build the buffer as early as possible so that we go through the
2021 // session flow control checks and update
2022 // |unacked_recv_window_bytes_| properly even when the stream is
2023 // inactive (since the other side has still reduced its session send
2025 scoped_ptr<SpdyBuffer> buffer;
2028 CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2029 buffer.reset(new SpdyBuffer(data, len));
2031 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2032 DecreaseRecvWindowSize(static_cast<int32>(len));
2033 buffer->AddConsumeCallback(
2034 base::Bind(&SpdySession::OnReadBufferConsumed,
2035 weak_factory_.GetWeakPtr()));
2041 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2043 // By the time data comes in, the stream may already be inactive.
2044 if (it == active_streams_.end())
2047 SpdyStream* stream = it->second.stream;
2048 CHECK_EQ(stream->stream_id(), stream_id);
2050 stream->IncrementRawReceivedBytes(len);
2052 if (it->second.waiting_for_syn_reply) {
2053 const std::string& error = "Data received before SYN_REPLY.";
2054 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2055 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2059 stream->OnDataReceived(buffer.Pass());
2062 void SpdySession::OnSettings(bool clear_persisted) {
2065 if (clear_persisted)
2066 http_server_properties_->ClearSpdySettings(host_port_pair());
2068 if (net_log_.IsLogging()) {
2070 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2071 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2075 if (GetProtocolVersion() >= SPDY4) {
2076 // Send an acknowledgment of the setting.
2077 SpdySettingsIR settings_ir;
2078 settings_ir.set_is_ack(true);
2079 EnqueueSessionWrite(
2082 scoped_ptr<SpdyFrame>(
2083 buffered_spdy_framer_->SerializeFrame(settings_ir)));
2087 void SpdySession::OnSetting(SpdySettingsIds id,
2092 HandleSetting(id, value);
2093 http_server_properties_->SetSpdySetting(
2096 static_cast<SpdySettingsFlags>(flags),
2098 received_settings_ = true;
2102 NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2103 base::Bind(&NetLogSpdySettingCallback,
2104 id, static_cast<SpdySettingsFlags>(flags), value));
2107 void SpdySession::OnSendCompressedFrame(
2108 SpdyStreamId stream_id,
2112 if (type != SYN_STREAM && type != HEADERS)
2115 DCHECK(buffered_spdy_framer_.get());
2116 size_t compressed_len =
2117 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2120 // Make sure we avoid early decimal truncation.
2121 int compression_pct = 100 - (100 * compressed_len) / payload_len;
2122 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2127 void SpdySession::OnReceiveCompressedFrame(
2128 SpdyStreamId stream_id,
2131 last_compressed_frame_len_ = frame_len;
2134 int SpdySession::OnInitialResponseHeadersReceived(
2135 const SpdyHeaderBlock& response_headers,
2136 base::Time response_time,
2137 base::TimeTicks recv_first_byte_time,
2138 SpdyStream* stream) {
2140 SpdyStreamId stream_id = stream->stream_id();
2142 if (stream->type() == SPDY_PUSH_STREAM) {
2143 DCHECK(stream->IsReservedRemote());
2144 if (max_concurrent_pushed_streams_ &&
2145 num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2146 ResetStream(stream_id,
2147 RST_STREAM_REFUSED_STREAM,
2148 "Stream concurrency limit reached.");
2149 return STATUS_CODE_REFUSED_STREAM;
2153 if (stream->type() == SPDY_PUSH_STREAM) {
2154 // Will be balanced in DeleteStream.
2155 num_active_pushed_streams_++;
2158 // May invalidate |stream|.
2159 int rv = stream->OnInitialResponseHeadersReceived(
2160 response_headers, response_time, recv_first_byte_time);
2162 DCHECK_NE(rv, ERR_IO_PENDING);
2163 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2169 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2170 SpdyStreamId associated_stream_id,
2171 SpdyPriority priority,
2173 bool unidirectional,
2174 const SpdyHeaderBlock& headers) {
2177 if (GetProtocolVersion() >= SPDY4) {
2178 DCHECK_EQ(0u, associated_stream_id);
2179 OnHeaders(stream_id, fin, headers);
2183 base::Time response_time = base::Time::Now();
2184 base::TimeTicks recv_first_byte_time = time_func_();
2186 if (net_log_.IsLogging()) {
2188 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2189 base::Bind(&NetLogSpdySynStreamReceivedCallback,
2190 &headers, fin, unidirectional, priority,
2191 stream_id, associated_stream_id));
2194 // Split headers to simulate push promise and response.
2195 SpdyHeaderBlock request_headers;
2196 SpdyHeaderBlock response_headers;
2197 SplitPushedHeadersToRequestAndResponse(
2198 headers, GetProtocolVersion(), &request_headers, &response_headers);
2200 if (!TryCreatePushStream(
2201 stream_id, associated_stream_id, priority, request_headers))
2204 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2205 if (active_it == active_streams_.end()) {
2210 if (OnInitialResponseHeadersReceived(response_headers,
2212 recv_first_byte_time,
2213 active_it->second.stream) != OK)
2216 base::StatsCounter push_requests("spdy.pushed_streams");
2217 push_requests.Increment();
2220 void SpdySession::DeleteExpiredPushedStreams() {
2221 if (unclaimed_pushed_streams_.empty())
2224 // Check that adequate time has elapsed since the last sweep.
2225 if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2228 // Gather old streams to delete.
2229 base::TimeTicks minimum_freshness = time_func_() -
2230 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2231 std::vector<SpdyStreamId> streams_to_close;
2232 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2233 it != unclaimed_pushed_streams_.end(); ++it) {
2234 if (minimum_freshness > it->second.creation_time)
2235 streams_to_close.push_back(it->second.stream_id);
2238 for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2239 streams_to_close.begin();
2240 to_close_it != streams_to_close.end(); ++to_close_it) {
2241 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2242 if (active_it == active_streams_.end())
2245 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2246 // CloseActiveStreamIterator() will remove the stream from
2247 // |unclaimed_pushed_streams_|.
2248 ResetStreamIterator(
2249 active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2252 next_unclaimed_push_stream_sweep_time_ = time_func_() +
2253 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2256 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2258 const SpdyHeaderBlock& headers) {
2261 base::Time response_time = base::Time::Now();
2262 base::TimeTicks recv_first_byte_time = time_func_();
2264 if (net_log().IsLogging()) {
2266 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2267 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2268 &headers, fin, stream_id));
2271 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2272 if (it == active_streams_.end()) {
2273 // NOTE: it may just be that the stream was cancelled.
2277 SpdyStream* stream = it->second.stream;
2278 CHECK_EQ(stream->stream_id(), stream_id);
2280 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2281 last_compressed_frame_len_ = 0;
2283 if (GetProtocolVersion() >= SPDY4) {
2284 const std::string& error =
2285 "SPDY4 wasn't expecting SYN_REPLY.";
2286 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2287 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2290 if (!it->second.waiting_for_syn_reply) {
2291 const std::string& error =
2292 "Received duplicate SYN_REPLY for stream.";
2293 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2294 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2297 it->second.waiting_for_syn_reply = false;
2299 ignore_result(OnInitialResponseHeadersReceived(
2300 headers, response_time, recv_first_byte_time, stream));
2303 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2305 const SpdyHeaderBlock& headers) {
2308 if (net_log().IsLogging()) {
2310 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2311 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2312 &headers, fin, stream_id));
2315 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2316 if (it == active_streams_.end()) {
2317 // NOTE: it may just be that the stream was cancelled.
2318 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2322 SpdyStream* stream = it->second.stream;
2323 CHECK_EQ(stream->stream_id(), stream_id);
2325 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2326 last_compressed_frame_len_ = 0;
2328 base::Time response_time = base::Time::Now();
2329 base::TimeTicks recv_first_byte_time = time_func_();
2331 if (it->second.waiting_for_syn_reply) {
2332 if (GetProtocolVersion() < SPDY4) {
2333 const std::string& error =
2334 "Was expecting SYN_REPLY, not HEADERS.";
2335 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2336 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2340 it->second.waiting_for_syn_reply = false;
2341 ignore_result(OnInitialResponseHeadersReceived(
2342 headers, response_time, recv_first_byte_time, stream));
2343 } else if (it->second.stream->IsReservedRemote()) {
2344 ignore_result(OnInitialResponseHeadersReceived(
2345 headers, response_time, recv_first_byte_time, stream));
2347 int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2349 DCHECK_NE(rv, ERR_IO_PENDING);
2350 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2355 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2356 SpdyRstStreamStatus status) {
2359 std::string description;
2361 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2362 base::Bind(&NetLogSpdyRstCallback,
2363 stream_id, status, &description));
2365 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2366 if (it == active_streams_.end()) {
2367 // NOTE: it may just be that the stream was cancelled.
2368 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2372 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2375 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2376 } else if (status == RST_STREAM_REFUSED_STREAM) {
2377 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2379 RecordProtocolErrorHistogram(
2380 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2381 it->second.stream->LogStreamError(
2382 ERR_SPDY_PROTOCOL_ERROR,
2383 base::StringPrintf("SPDY stream closed with status: %d", status));
2384 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2385 // For now, it doesn't matter much - it is a protocol error.
2386 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2390 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2391 SpdyGoAwayStatus status) {
2394 // TODO(jgraettinger): UMA histogram on |status|.
2396 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2397 base::Bind(&NetLogSpdyGoAwayCallback,
2398 last_accepted_stream_id,
2399 active_streams_.size(),
2400 unclaimed_pushed_streams_.size(),
2403 StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2404 // This is to handle the case when we already don't have any active
2405 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2406 // active streams and so the last one being closed will finish the
2407 // going away process (see DeleteStream()).
2408 MaybeFinishGoingAway();
2411 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2415 NetLog::TYPE_SPDY_SESSION_PING,
2416 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2418 // Send response to a PING from server.
2419 if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2420 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2421 WritePingFrame(unique_id, true);
2426 if (pings_in_flight_ < 0) {
2427 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2428 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2429 pings_in_flight_ = 0;
2433 if (pings_in_flight_ > 0)
2436 // We will record RTT in histogram when there are no more client sent
2437 // pings_in_flight_.
2438 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2441 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2442 uint32 delta_window_size) {
2445 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2447 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2448 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2449 stream_id, delta_window_size));
2451 if (stream_id == kSessionFlowControlStreamId) {
2452 // WINDOW_UPDATE for the session.
2453 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2454 LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2455 << "session flow control is not turned on";
2456 // TODO(akalin): Record an error and close the session.
2460 if (delta_window_size < 1u) {
2461 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2463 ERR_SPDY_PROTOCOL_ERROR,
2464 "Received WINDOW_UPDATE with an invalid delta_window_size " +
2465 base::UintToString(delta_window_size));
2469 IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2471 // WINDOW_UPDATE for a stream.
2472 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2473 // TODO(akalin): Record an error and close the session.
2474 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2475 << " when flow control is not turned on";
2479 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2481 if (it == active_streams_.end()) {
2482 // NOTE: it may just be that the stream was cancelled.
2483 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2487 SpdyStream* stream = it->second.stream;
2488 CHECK_EQ(stream->stream_id(), stream_id);
2490 if (delta_window_size < 1u) {
2491 ResetStreamIterator(it,
2492 RST_STREAM_FLOW_CONTROL_ERROR,
2494 "Received WINDOW_UPDATE with an invalid "
2495 "delta_window_size %ud", delta_window_size));
2499 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2500 it->second.stream->IncreaseSendWindowSize(
2501 static_cast<int32>(delta_window_size));
2505 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2506 SpdyStreamId associated_stream_id,
2507 SpdyPriority priority,
2508 const SpdyHeaderBlock& headers) {
2509 // Server-initiated streams should have even sequence numbers.
2510 if ((stream_id & 0x1) != 0) {
2511 LOG(WARNING) << "Received invalid push stream id " << stream_id;
2515 if (IsStreamActive(stream_id)) {
2516 LOG(WARNING) << "Received push for active stream " << stream_id;
2520 RequestPriority request_priority =
2521 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2523 if (availability_state_ == STATE_GOING_AWAY) {
2524 // TODO(akalin): This behavior isn't in the SPDY spec, although it
2525 // probably should be.
2526 EnqueueResetStreamFrame(stream_id,
2528 RST_STREAM_REFUSED_STREAM,
2529 "push stream request received when going away");
2533 if (associated_stream_id == 0) {
2534 // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2535 // session going away. We should never get here.
2536 CHECK_GT(SPDY4, GetProtocolVersion());
2537 std::string description = base::StringPrintf(
2538 "Received invalid associated stream id %d for pushed stream %d",
2539 associated_stream_id,
2541 EnqueueResetStreamFrame(
2542 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2546 streams_pushed_count_++;
2548 // TODO(mbelshe): DCHECK that this is a GET method?
2550 // Verify that the response had a URL for us.
2551 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2552 if (!gurl.is_valid()) {
2553 EnqueueResetStreamFrame(stream_id,
2555 RST_STREAM_PROTOCOL_ERROR,
2556 "Pushed stream url was invalid: " + gurl.spec());
2560 // Verify we have a valid stream association.
2561 ActiveStreamMap::iterator associated_it =
2562 active_streams_.find(associated_stream_id);
2563 if (associated_it == active_streams_.end()) {
2564 EnqueueResetStreamFrame(
2567 RST_STREAM_INVALID_STREAM,
2568 base::StringPrintf("Received push for inactive associated stream %d",
2569 associated_stream_id));
2573 // Check that the pushed stream advertises the same origin as its associated
2574 // stream. Bypass this check if and only if this session is with a SPDY proxy
2575 // that is trusted explicitly via the --trusted-spdy-proxy switch.
2576 if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2577 // Disallow pushing of HTTPS content.
2578 if (gurl.SchemeIs("https")) {
2579 EnqueueResetStreamFrame(
2582 RST_STREAM_REFUSED_STREAM,
2583 base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2584 associated_stream_id));
2587 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2588 if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2589 EnqueueResetStreamFrame(
2592 RST_STREAM_REFUSED_STREAM,
2593 base::StringPrintf("Rejected Cross Origin Push Stream %d",
2594 associated_stream_id));
2599 // There should not be an existing pushed stream with the same path.
2600 PushedStreamMap::iterator pushed_it =
2601 unclaimed_pushed_streams_.lower_bound(gurl);
2602 if (pushed_it != unclaimed_pushed_streams_.end() &&
2603 pushed_it->first == gurl) {
2604 EnqueueResetStreamFrame(
2607 RST_STREAM_PROTOCOL_ERROR,
2608 "Received duplicate pushed stream with url: " + gurl.spec());
2612 scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2616 stream_initial_send_window_size_,
2617 stream_initial_recv_window_size_,
2619 stream->set_stream_id(stream_id);
2621 // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2622 if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2623 associated_it->second.stream->IncrementRawReceivedBytes(
2624 last_compressed_frame_len_);
2626 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2629 last_compressed_frame_len_ = 0;
2631 DeleteExpiredPushedStreams();
2632 PushedStreamMap::iterator inserted_pushed_it =
2633 unclaimed_pushed_streams_.insert(
2635 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2636 DCHECK(inserted_pushed_it != pushed_it);
2638 InsertActivatedStream(stream.Pass());
2640 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2641 if (active_it == active_streams_.end()) {
2646 active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2647 DCHECK(active_it->second.stream->IsReservedRemote());
2648 num_pushed_streams_++;
2652 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2653 SpdyStreamId promised_stream_id,
2654 const SpdyHeaderBlock& headers) {
2657 if (net_log_.IsLogging()) {
2658 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2659 base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2662 promised_stream_id));
2665 // Any priority will do.
2666 // TODO(baranovich): pass parent stream id priority?
2667 if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2670 base::StatsCounter push_requests("spdy.pushed_streams");
2671 push_requests.Increment();
2674 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2675 uint32 delta_window_size) {
2676 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2677 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2678 CHECK(it != active_streams_.end());
2679 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2680 SendWindowUpdateFrame(
2681 stream_id, delta_window_size, it->second.stream->priority());
2684 void SpdySession::SendInitialData() {
2685 DCHECK(enable_sending_initial_data_);
2687 if (send_connection_header_prefix_) {
2688 DCHECK_EQ(protocol_, kProtoSPDY4);
2689 scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2690 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2691 kHttp2ConnectionHeaderPrefixSize,
2692 false /* take_ownership */));
2693 // Count the prefix as part of the subsequent SETTINGS frame.
2694 EnqueueSessionWrite(HIGHEST, SETTINGS,
2695 connection_header_prefix_frame.Pass());
2698 // First, notify the server about the settings they should use when
2699 // communicating with us.
2700 SettingsMap settings_map;
2701 // Create a new settings frame notifying the server of our
2702 // max concurrent streams and initial window size.
2703 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2704 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2705 if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2706 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2707 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2708 SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2709 stream_initial_recv_window_size_);
2711 SendSettings(settings_map);
2713 // Next, notify the server about our initial recv window size.
2714 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2715 // Bump up the receive window size to the real initial value. This
2716 // has to go here since the WINDOW_UPDATE frame sent by
2717 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2718 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2719 // This condition implies that |kDefaultInitialRecvWindowSize| -
2720 // |session_recv_window_size_| doesn't overflow.
2721 DCHECK_GT(session_recv_window_size_, 0);
2722 IncreaseRecvWindowSize(
2723 kDefaultInitialRecvWindowSize - session_recv_window_size_);
2726 // Finally, notify the server about the settings they have
2727 // previously told us to use when communicating with them (after
2729 const SettingsMap& server_settings_map =
2730 http_server_properties_->GetSpdySettings(host_port_pair());
2731 if (server_settings_map.empty())
2734 SettingsMap::const_iterator it =
2735 server_settings_map.find(SETTINGS_CURRENT_CWND);
2736 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2737 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2739 for (SettingsMap::const_iterator it = server_settings_map.begin();
2740 it != server_settings_map.end(); ++it) {
2741 const SpdySettingsIds new_id = it->first;
2742 const uint32 new_val = it->second.second;
2743 HandleSetting(new_id, new_val);
2746 SendSettings(server_settings_map);
2750 void SpdySession::SendSettings(const SettingsMap& settings) {
2752 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2753 base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2755 // Create the SETTINGS frame and send it.
2756 DCHECK(buffered_spdy_framer_.get());
2757 scoped_ptr<SpdyFrame> settings_frame(
2758 buffered_spdy_framer_->CreateSettings(settings));
2759 sent_settings_ = true;
2760 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2763 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2765 case SETTINGS_MAX_CONCURRENT_STREAMS:
2766 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2767 kMaxConcurrentStreamLimit);
2768 ProcessPendingStreamRequests();
2770 case SETTINGS_INITIAL_WINDOW_SIZE: {
2771 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2773 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2777 if (value > static_cast<uint32>(kint32max)) {
2779 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2780 NetLog::IntegerCallback("initial_window_size", value));
2784 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2785 int32 delta_window_size =
2786 static_cast<int32>(value) - stream_initial_send_window_size_;
2787 stream_initial_send_window_size_ = static_cast<int32>(value);
2788 UpdateStreamsSendWindowSize(delta_window_size);
2790 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2791 NetLog::IntegerCallback("delta_window_size", delta_window_size));
2797 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2798 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2799 for (ActiveStreamMap::iterator it = active_streams_.begin();
2800 it != active_streams_.end(); ++it) {
2801 it->second.stream->AdjustSendWindowSize(delta_window_size);
2804 for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2805 it != created_streams_.end(); it++) {
2806 (*it)->AdjustSendWindowSize(delta_window_size);
2810 void SpdySession::SendPrefacePingIfNoneInFlight() {
2811 if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2814 base::TimeTicks now = time_func_();
2815 // If there is no activity in the session, then send a preface-PING.
2816 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2820 void SpdySession::SendPrefacePing() {
2821 WritePingFrame(next_ping_id_, false);
2824 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2825 uint32 delta_window_size,
2826 RequestPriority priority) {
2827 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2828 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2829 if (it != active_streams_.end()) {
2830 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2832 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2833 CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2837 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2838 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2839 stream_id, delta_window_size));
2841 DCHECK(buffered_spdy_framer_.get());
2842 scoped_ptr<SpdyFrame> window_update_frame(
2843 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2844 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2847 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2848 DCHECK(buffered_spdy_framer_.get());
2849 scoped_ptr<SpdyFrame> ping_frame(
2850 buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2851 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2853 if (net_log().IsLogging()) {
2855 NetLog::TYPE_SPDY_SESSION_PING,
2856 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2861 PlanToCheckPingStatus();
2862 last_ping_sent_time_ = time_func_();
2866 void SpdySession::PlanToCheckPingStatus() {
2867 if (check_ping_status_pending_)
2870 check_ping_status_pending_ = true;
2871 base::MessageLoop::current()->PostDelayedTask(
2873 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2874 time_func_()), hung_interval_);
2877 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2878 CHECK(!in_io_loop_);
2880 // Check if we got a response back for all PINGs we had sent.
2881 if (pings_in_flight_ == 0) {
2882 check_ping_status_pending_ = false;
2886 DCHECK(check_ping_status_pending_);
2888 base::TimeTicks now = time_func_();
2889 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2891 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2892 // Track all failed PING messages in a separate bucket.
2893 RecordPingRTTHistogram(base::TimeDelta::Max());
2894 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2898 // Check the status of connection after a delay.
2899 base::MessageLoop::current()->PostDelayedTask(
2901 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2906 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2907 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2910 void SpdySession::RecordProtocolErrorHistogram(
2911 SpdyProtocolErrorDetails details) {
2912 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2913 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2914 if (EndsWith(host_port_pair().host(), "google.com", false)) {
2915 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2916 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2920 void SpdySession::RecordHistograms() {
2921 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2922 streams_initiated_count_,
2924 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2925 streams_pushed_count_,
2927 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2928 streams_pushed_and_claimed_count_,
2930 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2931 streams_abandoned_count_,
2933 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2934 sent_settings_ ? 1 : 0, 2);
2935 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2936 received_settings_ ? 1 : 0, 2);
2937 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2940 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2941 stalled_streams_ > 0 ? 1 : 0, 2);
2943 if (received_settings_) {
2944 // Enumerate the saved settings, and set histograms for it.
2945 const SettingsMap& settings_map =
2946 http_server_properties_->GetSpdySettings(host_port_pair());
2948 SettingsMap::const_iterator it;
2949 for (it = settings_map.begin(); it != settings_map.end(); ++it) {
2950 const SpdySettingsIds id = it->first;
2951 const uint32 val = it->second.second;
2953 case SETTINGS_CURRENT_CWND:
2954 // Record several different histograms to see if cwnd converges
2955 // for larger volumes of data being sent.
2956 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
2958 if (total_bytes_received_ > 10 * 1024) {
2959 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
2961 if (total_bytes_received_ > 25 * 1024) {
2962 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
2964 if (total_bytes_received_ > 50 * 1024) {
2965 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
2967 if (total_bytes_received_ > 100 * 1024) {
2968 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
2975 case SETTINGS_ROUND_TRIP_TIME:
2976 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
2979 case SETTINGS_DOWNLOAD_RETRANS_RATE:
2980 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
2990 void SpdySession::CompleteStreamRequest(
2991 const base::WeakPtr<SpdyStreamRequest>& pending_request) {
2992 // Abort if the request has already been cancelled.
2993 if (!pending_request)
2996 base::WeakPtr<SpdyStream> stream;
2997 int rv = TryCreateStream(pending_request, &stream);
3001 pending_request->OnRequestCompleteSuccess(stream);
3006 if (rv != ERR_IO_PENDING) {
3007 pending_request->OnRequestCompleteFailure(rv);
3011 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3014 SSLClientSocket* ssl_socket =
3015 reinterpret_cast<SSLClientSocket*>(connection_->socket());
3020 void SpdySession::OnWriteBufferConsumed(
3021 size_t frame_payload_size,
3022 size_t consume_size,
3023 SpdyBuffer::ConsumeSource consume_source) {
3024 // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3025 // deleted (e.g., a stream is closed due to incoming data).
3027 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3029 if (consume_source == SpdyBuffer::DISCARD) {
3030 // If we're discarding a frame or part of it, increase the send
3031 // window by the number of discarded bytes. (Although if we're
3032 // discarding part of a frame, it's probably because of a write
3033 // error and we'll be tearing down the session soon.)
3034 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3035 DCHECK_GT(remaining_payload_bytes, 0u);
3036 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3038 // For consumed bytes, the send window is increased when we receive
3039 // a WINDOW_UPDATE frame.
3042 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3043 // We can be called with |in_io_loop_| set if a SpdyBuffer is
3044 // deleted (e.g., a stream is closed due to incoming data).
3046 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3047 DCHECK_GE(delta_window_size, 1);
3049 // Check for overflow.
3050 int32 max_delta_window_size = kint32max - session_send_window_size_;
3051 if (delta_window_size > max_delta_window_size) {
3052 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3054 ERR_SPDY_PROTOCOL_ERROR,
3055 "Received WINDOW_UPDATE [delta: " +
3056 base::IntToString(delta_window_size) +
3057 "] for session overflows session_send_window_size_ [current: " +
3058 base::IntToString(session_send_window_size_) + "]");
3062 session_send_window_size_ += delta_window_size;
3065 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3066 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3067 delta_window_size, session_send_window_size_));
3069 DCHECK(!IsSendStalled());
3070 ResumeSendStalledStreams();
3073 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3074 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3076 // We only call this method when sending a frame. Therefore,
3077 // |delta_window_size| should be within the valid frame size range.
3078 DCHECK_GE(delta_window_size, 1);
3079 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3081 // |send_window_size_| should have been at least |delta_window_size| for
3082 // this call to happen.
3083 DCHECK_GE(session_send_window_size_, delta_window_size);
3085 session_send_window_size_ -= delta_window_size;
3088 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3089 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3090 -delta_window_size, session_send_window_size_));
3093 void SpdySession::OnReadBufferConsumed(
3094 size_t consume_size,
3095 SpdyBuffer::ConsumeSource consume_source) {
3096 // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3097 // deleted (e.g., discarded by a SpdyReadQueue).
3099 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3100 DCHECK_GE(consume_size, 1u);
3101 DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3103 IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3106 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3107 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3108 DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3109 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3110 DCHECK_GE(delta_window_size, 1);
3111 // Check for overflow.
3112 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3114 session_recv_window_size_ += delta_window_size;
3116 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3117 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3118 delta_window_size, session_recv_window_size_));
3120 session_unacked_recv_window_bytes_ += delta_window_size;
3121 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3122 SendWindowUpdateFrame(kSessionFlowControlStreamId,
3123 session_unacked_recv_window_bytes_,
3125 session_unacked_recv_window_bytes_ = 0;
3129 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3131 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3132 DCHECK_GE(delta_window_size, 1);
3134 // Since we never decrease the initial receive window size,
3135 // |delta_window_size| should never cause |recv_window_size_| to go
3136 // negative. If we do, the receive window isn't being respected.
3137 if (delta_window_size > session_recv_window_size_) {
3138 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3140 ERR_SPDY_FLOW_CONTROL_ERROR,
3141 "delta_window_size is " + base::IntToString(delta_window_size) +
3142 " in DecreaseRecvWindowSize, which is larger than the receive " +
3143 "window size of " + base::IntToString(session_recv_window_size_));
3147 session_recv_window_size_ -= delta_window_size;
3149 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3150 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3151 -delta_window_size, session_recv_window_size_));
3154 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3155 DCHECK(stream.send_stalled_by_flow_control());
3156 RequestPriority priority = stream.priority();
3157 CHECK_GE(priority, MINIMUM_PRIORITY);
3158 CHECK_LE(priority, MAXIMUM_PRIORITY);
3159 stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3162 void SpdySession::ResumeSendStalledStreams() {
3163 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3165 // We don't have to worry about new streams being queued, since
3166 // doing so would cause IsSendStalled() to return true. But we do
3167 // have to worry about streams being closed, as well as ourselves
3170 while (!IsSendStalled()) {
3171 size_t old_size = 0;
3173 old_size = GetTotalSize(stream_send_unstall_queue_);
3176 SpdyStreamId stream_id = PopStreamToPossiblyResume();
3179 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3180 // The stream may actually still be send-stalled after this (due
3181 // to its own send window) but that's okay -- it'll then be
3182 // resumed once its send window increases.
3183 if (it != active_streams_.end())
3184 it->second.stream->PossiblyResumeIfSendStalled();
3186 // The size should decrease unless we got send-stalled again.
3187 if (!IsSendStalled())
3188 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3192 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3193 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3194 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3195 if (!queue->empty()) {
3196 SpdyStreamId stream_id = queue->front();