1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/spdy/spdy_session.h"
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/profiler/scoped_tracker.h"
20 #include "base/stl_util.h"
21 #include "base/strings/string_number_conversions.h"
22 #include "base/strings/string_util.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/strings/utf_string_conversions.h"
25 #include "base/time/time.h"
26 #include "base/values.h"
27 #include "crypto/ec_private_key.h"
28 #include "crypto/ec_signature_creator.h"
29 #include "net/base/connection_type_histograms.h"
30 #include "net/base/net_log.h"
31 #include "net/base/net_util.h"
32 #include "net/cert/asn1_util.h"
33 #include "net/cert/cert_verify_result.h"
34 #include "net/http/http_log_util.h"
35 #include "net/http/http_network_session.h"
36 #include "net/http/http_server_properties.h"
37 #include "net/http/http_util.h"
38 #include "net/http/transport_security_state.h"
39 #include "net/socket/ssl_client_socket.h"
40 #include "net/spdy/spdy_buffer_producer.h"
41 #include "net/spdy/spdy_frame_builder.h"
42 #include "net/spdy/spdy_http_utils.h"
43 #include "net/spdy/spdy_protocol.h"
44 #include "net/spdy/spdy_session_pool.h"
45 #include "net/spdy/spdy_stream.h"
46 #include "net/ssl/channel_id_service.h"
47 #include "net/ssl/ssl_cipher_suite_names.h"
48 #include "net/ssl/ssl_connection_status_flags.h"
54 const int kReadBufferSize = 8 * 1024;
55 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
56 const int kHungIntervalSeconds = 10;
58 // Minimum seconds that unclaimed pushed streams will be kept in memory.
59 const int kMinPushedStreamLifetimeSeconds = 300;
61 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
62 const SpdyHeaderBlock& headers,
63 net::NetLog::LogLevel log_level) {
64 scoped_ptr<base::ListValue> headers_list(new base::ListValue());
65 for (SpdyHeaderBlock::const_iterator it = headers.begin();
66 it != headers.end(); ++it) {
67 headers_list->AppendString(
69 ElideHeaderValueForNetLog(log_level, it->first, it->second));
71 return headers_list.Pass();
74 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
77 SpdyPriority spdy_priority,
78 SpdyStreamId stream_id,
79 NetLog::LogLevel log_level) {
80 base::DictionaryValue* dict = new base::DictionaryValue();
82 SpdyHeaderBlockToListValue(*headers, log_level).release());
83 dict->SetBoolean("fin", fin);
84 dict->SetBoolean("unidirectional", unidirectional);
85 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
86 dict->SetInteger("stream_id", stream_id);
90 base::Value* NetLogSpdySynStreamReceivedCallback(
91 const SpdyHeaderBlock* headers,
94 SpdyPriority spdy_priority,
95 SpdyStreamId stream_id,
96 SpdyStreamId associated_stream,
97 NetLog::LogLevel log_level) {
98 base::DictionaryValue* dict = new base::DictionaryValue();
100 SpdyHeaderBlockToListValue(*headers, log_level).release());
101 dict->SetBoolean("fin", fin);
102 dict->SetBoolean("unidirectional", unidirectional);
103 dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
104 dict->SetInteger("stream_id", stream_id);
105 dict->SetInteger("associated_stream", associated_stream);
109 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
110 const SpdyHeaderBlock* headers,
112 SpdyStreamId stream_id,
113 NetLog::LogLevel log_level) {
114 base::DictionaryValue* dict = new base::DictionaryValue();
116 SpdyHeaderBlockToListValue(*headers, log_level).release());
117 dict->SetBoolean("fin", fin);
118 dict->SetInteger("stream_id", stream_id);
122 base::Value* NetLogSpdySessionCloseCallback(int net_error,
123 const std::string* description,
124 NetLog::LogLevel /* log_level */) {
125 base::DictionaryValue* dict = new base::DictionaryValue();
126 dict->SetInteger("net_error", net_error);
127 dict->SetString("description", *description);
131 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
132 NetLog::LogLevel /* log_level */) {
133 base::DictionaryValue* dict = new base::DictionaryValue();
134 dict->SetString("host", host_pair->first.ToString());
135 dict->SetString("proxy", host_pair->second.ToPacString());
139 base::Value* NetLogSpdyInitializedCallback(NetLog::Source source,
140 const NextProto protocol_version,
141 NetLog::LogLevel /* log_level */) {
142 base::DictionaryValue* dict = new base::DictionaryValue();
143 if (source.IsValid()) {
144 source.AddToEventParameters(dict);
146 dict->SetString("protocol",
147 SSLClientSocket::NextProtoToString(protocol_version));
151 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
152 bool clear_persisted,
153 NetLog::LogLevel /* log_level */) {
154 base::DictionaryValue* dict = new base::DictionaryValue();
155 dict->SetString("host", host_port_pair.ToString());
156 dict->SetBoolean("clear_persisted", clear_persisted);
160 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
161 const SpdyMajorVersion protocol_version,
162 SpdySettingsFlags flags,
164 NetLog::LogLevel /* log_level */) {
165 base::DictionaryValue* dict = new base::DictionaryValue();
166 dict->SetInteger("id",
167 SpdyConstants::SerializeSettingId(protocol_version, id));
168 dict->SetInteger("flags", flags);
169 dict->SetInteger("value", value);
173 base::Value* NetLogSpdySendSettingsCallback(
174 const SettingsMap* settings,
175 const SpdyMajorVersion protocol_version,
176 NetLog::LogLevel /* log_level */) {
177 base::DictionaryValue* dict = new base::DictionaryValue();
178 base::ListValue* settings_list = new base::ListValue();
179 for (SettingsMap::const_iterator it = settings->begin();
180 it != settings->end(); ++it) {
181 const SpdySettingsIds id = it->first;
182 const SpdySettingsFlags flags = it->second.first;
183 const uint32 value = it->second.second;
184 settings_list->Append(new base::StringValue(base::StringPrintf(
185 "[id:%u flags:%u value:%u]",
186 SpdyConstants::SerializeSettingId(protocol_version, id),
190 dict->Set("settings", settings_list);
194 base::Value* NetLogSpdyWindowUpdateFrameCallback(
195 SpdyStreamId stream_id,
197 NetLog::LogLevel /* log_level */) {
198 base::DictionaryValue* dict = new base::DictionaryValue();
199 dict->SetInteger("stream_id", static_cast<int>(stream_id));
200 dict->SetInteger("delta", delta);
204 base::Value* NetLogSpdySessionWindowUpdateCallback(
207 NetLog::LogLevel /* log_level */) {
208 base::DictionaryValue* dict = new base::DictionaryValue();
209 dict->SetInteger("delta", delta);
210 dict->SetInteger("window_size", window_size);
214 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
217 NetLog::LogLevel /* log_level */) {
218 base::DictionaryValue* dict = new base::DictionaryValue();
219 dict->SetInteger("stream_id", static_cast<int>(stream_id));
220 dict->SetInteger("size", size);
221 dict->SetBoolean("fin", fin);
225 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
227 const std::string* description,
228 NetLog::LogLevel /* log_level */) {
229 base::DictionaryValue* dict = new base::DictionaryValue();
230 dict->SetInteger("stream_id", static_cast<int>(stream_id));
231 dict->SetInteger("status", status);
232 dict->SetString("description", *description);
236 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
239 NetLog::LogLevel /* log_level */) {
240 base::DictionaryValue* dict = new base::DictionaryValue();
241 dict->SetInteger("unique_id", unique_id);
242 dict->SetString("type", type);
243 dict->SetBoolean("is_ack", is_ack);
247 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
249 int unclaimed_streams,
250 SpdyGoAwayStatus status,
251 NetLog::LogLevel /* log_level */) {
252 base::DictionaryValue* dict = new base::DictionaryValue();
253 dict->SetInteger("last_accepted_stream_id",
254 static_cast<int>(last_stream_id));
255 dict->SetInteger("active_streams", active_streams);
256 dict->SetInteger("unclaimed_streams", unclaimed_streams);
257 dict->SetInteger("status", static_cast<int>(status));
261 base::Value* NetLogSpdyPushPromiseReceivedCallback(
262 const SpdyHeaderBlock* headers,
263 SpdyStreamId stream_id,
264 SpdyStreamId promised_stream_id,
265 NetLog::LogLevel log_level) {
266 base::DictionaryValue* dict = new base::DictionaryValue();
268 SpdyHeaderBlockToListValue(*headers, log_level).release());
269 dict->SetInteger("id", stream_id);
270 dict->SetInteger("promised_stream_id", promised_stream_id);
274 // Helper function to return the total size of an array of objects
275 // with .size() member functions.
276 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
277 size_t total_size = 0;
278 for (size_t i = 0; i < N; ++i) {
279 total_size += arr[i].size();
284 // Helper class for std:find_if on STL container containing
285 // SpdyStreamRequest weak pointers.
286 class RequestEquals {
288 RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
289 : request_(request) {}
291 bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
292 return request_.get() == request.get();
296 const base::WeakPtr<SpdyStreamRequest> request_;
299 // The maximum number of concurrent streams we will ever create. Even if
300 // the server permits more, we will never exceed this limit.
301 const size_t kMaxConcurrentStreamLimit = 256;
305 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
306 SpdyFramer::SpdyError err) {
308 case SpdyFramer::SPDY_NO_ERROR:
309 return SPDY_ERROR_NO_ERROR;
310 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
311 return SPDY_ERROR_INVALID_CONTROL_FRAME;
312 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
313 return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
314 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
315 return SPDY_ERROR_ZLIB_INIT_FAILURE;
316 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
317 return SPDY_ERROR_UNSUPPORTED_VERSION;
318 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
319 return SPDY_ERROR_DECOMPRESS_FAILURE;
320 case SpdyFramer::SPDY_COMPRESS_FAILURE:
321 return SPDY_ERROR_COMPRESS_FAILURE;
322 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
323 return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
324 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
325 return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
326 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
327 return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
328 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
329 return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
330 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
331 return SPDY_ERROR_UNEXPECTED_FRAME;
334 return static_cast<SpdyProtocolErrorDetails>(-1);
338 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
340 case SpdyFramer::SPDY_NO_ERROR:
342 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
343 return ERR_SPDY_PROTOCOL_ERROR;
344 case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
345 return ERR_SPDY_FRAME_SIZE_ERROR;
346 case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
347 return ERR_SPDY_COMPRESSION_ERROR;
348 case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
349 return ERR_SPDY_PROTOCOL_ERROR;
350 case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
351 return ERR_SPDY_COMPRESSION_ERROR;
352 case SpdyFramer::SPDY_COMPRESS_FAILURE:
353 return ERR_SPDY_COMPRESSION_ERROR;
354 case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
355 return ERR_SPDY_PROTOCOL_ERROR;
356 case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
357 return ERR_SPDY_PROTOCOL_ERROR;
358 case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
359 return ERR_SPDY_PROTOCOL_ERROR;
360 case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
361 return ERR_SPDY_PROTOCOL_ERROR;
362 case SpdyFramer::SPDY_UNEXPECTED_FRAME:
363 return ERR_SPDY_PROTOCOL_ERROR;
366 return ERR_SPDY_PROTOCOL_ERROR;
370 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
371 SpdyRstStreamStatus status) {
373 case RST_STREAM_PROTOCOL_ERROR:
374 return STATUS_CODE_PROTOCOL_ERROR;
375 case RST_STREAM_INVALID_STREAM:
376 return STATUS_CODE_INVALID_STREAM;
377 case RST_STREAM_REFUSED_STREAM:
378 return STATUS_CODE_REFUSED_STREAM;
379 case RST_STREAM_UNSUPPORTED_VERSION:
380 return STATUS_CODE_UNSUPPORTED_VERSION;
381 case RST_STREAM_CANCEL:
382 return STATUS_CODE_CANCEL;
383 case RST_STREAM_INTERNAL_ERROR:
384 return STATUS_CODE_INTERNAL_ERROR;
385 case RST_STREAM_FLOW_CONTROL_ERROR:
386 return STATUS_CODE_FLOW_CONTROL_ERROR;
387 case RST_STREAM_STREAM_IN_USE:
388 return STATUS_CODE_STREAM_IN_USE;
389 case RST_STREAM_STREAM_ALREADY_CLOSED:
390 return STATUS_CODE_STREAM_ALREADY_CLOSED;
391 case RST_STREAM_INVALID_CREDENTIALS:
392 return STATUS_CODE_INVALID_CREDENTIALS;
393 case RST_STREAM_FRAME_SIZE_ERROR:
394 return STATUS_CODE_FRAME_SIZE_ERROR;
395 case RST_STREAM_SETTINGS_TIMEOUT:
396 return STATUS_CODE_SETTINGS_TIMEOUT;
397 case RST_STREAM_CONNECT_ERROR:
398 return STATUS_CODE_CONNECT_ERROR;
399 case RST_STREAM_ENHANCE_YOUR_CALM:
400 return STATUS_CODE_ENHANCE_YOUR_CALM;
403 return static_cast<SpdyProtocolErrorDetails>(-1);
407 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
410 return GOAWAY_NO_ERROR;
411 case ERR_SPDY_PROTOCOL_ERROR:
412 return GOAWAY_PROTOCOL_ERROR;
413 case ERR_SPDY_FLOW_CONTROL_ERROR:
414 return GOAWAY_FLOW_CONTROL_ERROR;
415 case ERR_SPDY_FRAME_SIZE_ERROR:
416 return GOAWAY_FRAME_SIZE_ERROR;
417 case ERR_SPDY_COMPRESSION_ERROR:
418 return GOAWAY_COMPRESSION_ERROR;
419 case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
420 return GOAWAY_INADEQUATE_SECURITY;
422 return GOAWAY_PROTOCOL_ERROR;
426 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
427 SpdyMajorVersion protocol_version,
428 SpdyHeaderBlock* request_headers,
429 SpdyHeaderBlock* response_headers) {
430 DCHECK(response_headers);
431 DCHECK(request_headers);
432 for (SpdyHeaderBlock::const_iterator it = headers.begin();
435 SpdyHeaderBlock* to_insert = response_headers;
436 if (protocol_version == SPDY2) {
437 if (it->first == "url")
438 to_insert = request_headers;
440 const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
441 static const char* scheme = ":scheme";
442 static const char* path = ":path";
443 if (it->first == host || it->first == scheme || it->first == path)
444 to_insert = request_headers;
446 to_insert->insert(*it);
450 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
454 SpdyStreamRequest::~SpdyStreamRequest() {
458 int SpdyStreamRequest::StartRequest(
460 const base::WeakPtr<SpdySession>& session,
462 RequestPriority priority,
463 const BoundNetLog& net_log,
464 const CompletionCallback& callback) {
468 DCHECK(callback_.is_null());
473 priority_ = priority;
475 callback_ = callback;
477 base::WeakPtr<SpdyStream> stream;
478 int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
486 void SpdyStreamRequest::CancelRequest() {
488 session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
490 // Do this to cancel any pending CompleteStreamRequest() tasks.
491 weak_ptr_factory_.InvalidateWeakPtrs();
494 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
496 base::WeakPtr<SpdyStream> stream = stream_;
502 void SpdyStreamRequest::OnRequestCompleteSuccess(
503 const base::WeakPtr<SpdyStream>& stream) {
506 DCHECK(!callback_.is_null());
507 CompletionCallback callback = callback_;
514 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
517 DCHECK(!callback_.is_null());
518 CompletionCallback callback = callback_;
524 void SpdyStreamRequest::Reset() {
525 type_ = SPDY_BIDIRECTIONAL_STREAM;
529 priority_ = MINIMUM_PRIORITY;
530 net_log_ = BoundNetLog();
534 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
536 waiting_for_syn_reply(false) {}
538 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
540 waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
543 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
545 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
547 SpdySession::PushedStreamInfo::PushedStreamInfo(
548 SpdyStreamId stream_id,
549 base::TimeTicks creation_time)
550 : stream_id(stream_id),
551 creation_time(creation_time) {}
553 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
556 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
557 const SSLInfo& ssl_info,
558 const std::string& old_hostname,
559 const std::string& new_hostname) {
560 // Pooling is prohibited if the server cert is not valid for the new domain,
561 // and for connections on which client certs were sent. It is also prohibited
562 // when channel ID was sent if the hosts are from different eTLDs+1.
563 if (IsCertStatusError(ssl_info.cert_status))
566 if (ssl_info.client_cert_sent)
569 if (ssl_info.channel_id_sent &&
570 ChannelIDService::GetDomainForHost(new_hostname) !=
571 ChannelIDService::GetDomainForHost(old_hostname)) {
576 if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
579 std::string pinning_failure_log;
580 if (!transport_security_state->CheckPublicKeyPins(
582 ssl_info.is_issued_by_known_root,
583 ssl_info.public_key_hashes,
584 &pinning_failure_log)) {
591 SpdySession::SpdySession(
592 const SpdySessionKey& spdy_session_key,
593 const base::WeakPtr<HttpServerProperties>& http_server_properties,
594 TransportSecurityState* transport_security_state,
595 bool verify_domain_authentication,
596 bool enable_sending_initial_data,
597 bool enable_compression,
598 bool enable_ping_based_connection_checking,
599 NextProto default_protocol,
600 size_t stream_initial_recv_window_size,
601 size_t initial_max_concurrent_streams,
602 size_t max_concurrent_streams_limit,
604 const HostPortPair& trusted_spdy_proxy,
606 : in_io_loop_(false),
607 spdy_session_key_(spdy_session_key),
609 http_server_properties_(http_server_properties),
610 transport_security_state_(transport_security_state),
611 read_buffer_(new IOBuffer(kReadBufferSize)),
612 stream_hi_water_mark_(kFirstStreamId),
613 last_accepted_push_stream_id_(0),
614 num_pushed_streams_(0u),
615 num_active_pushed_streams_(0u),
616 in_flight_write_frame_type_(DATA),
617 in_flight_write_frame_size_(0),
619 certificate_error_code_(OK),
620 availability_state_(STATE_AVAILABLE),
621 read_state_(READ_STATE_DO_READ),
622 write_state_(WRITE_STATE_IDLE),
624 max_concurrent_streams_(initial_max_concurrent_streams == 0
625 ? kInitialMaxConcurrentStreams
626 : initial_max_concurrent_streams),
627 max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
628 ? kMaxConcurrentStreamLimit
629 : max_concurrent_streams_limit),
630 max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
631 streams_initiated_count_(0),
632 streams_pushed_count_(0),
633 streams_pushed_and_claimed_count_(0),
634 streams_abandoned_count_(0),
635 total_bytes_received_(0),
636 sent_settings_(false),
637 received_settings_(false),
641 last_activity_time_(time_func()),
642 last_compressed_frame_len_(0),
643 check_ping_status_pending_(false),
644 send_connection_header_prefix_(false),
645 flow_control_state_(FLOW_CONTROL_NONE),
646 stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
647 stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
648 ? kDefaultInitialRecvWindowSize
649 : stream_initial_recv_window_size),
650 session_send_window_size_(0),
651 session_recv_window_size_(0),
652 session_unacked_recv_window_bytes_(0),
653 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
654 verify_domain_authentication_(verify_domain_authentication),
655 enable_sending_initial_data_(enable_sending_initial_data),
656 enable_compression_(enable_compression),
657 enable_ping_based_connection_checking_(
658 enable_ping_based_connection_checking),
659 protocol_(default_protocol),
660 connection_at_risk_of_loss_time_(
661 base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
662 hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
663 trusted_spdy_proxy_(trusted_spdy_proxy),
664 time_func_(time_func),
665 weak_factory_(this) {
666 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
667 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
668 DCHECK(HttpStreamFactory::spdy_enabled());
670 NetLog::TYPE_SPDY_SESSION,
671 base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
672 next_unclaimed_push_stream_sweep_time_ = time_func_() +
673 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
674 // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
677 SpdySession::~SpdySession() {
681 // TODO(akalin): Check connection->is_initialized() instead. This
682 // requires re-working CreateFakeSpdySession(), though.
683 DCHECK(connection_->socket());
684 // With SPDY we can't recycle sockets.
685 connection_->socket()->Disconnect();
689 net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
692 void SpdySession::InitializeWithSocket(
693 scoped_ptr<ClientSocketHandle> connection,
694 SpdySessionPool* pool,
696 int certificate_error_code) {
698 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
699 DCHECK_EQ(read_state_, READ_STATE_DO_READ);
700 DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
701 DCHECK(!connection_);
703 DCHECK(certificate_error_code == OK ||
704 certificate_error_code < ERR_IO_PENDING);
705 // TODO(akalin): Check connection->is_initialized() instead. This
706 // requires re-working CreateFakeSpdySession(), though.
707 DCHECK(connection->socket());
709 base::StatsCounter spdy_sessions("spdy.sessions");
710 spdy_sessions.Increment();
712 connection_ = connection.Pass();
713 is_secure_ = is_secure;
714 certificate_error_code_ = certificate_error_code;
716 NextProto protocol_negotiated =
717 connection_->socket()->GetNegotiatedProtocol();
718 if (protocol_negotiated != kProtoUnknown) {
719 protocol_ = protocol_negotiated;
721 DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
722 DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
724 if (protocol_ == kProtoSPDY4)
725 send_connection_header_prefix_ = true;
727 if (protocol_ >= kProtoSPDY31) {
728 flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
729 session_send_window_size_ = kSpdySessionInitialWindowSize;
730 session_recv_window_size_ = kSpdySessionInitialWindowSize;
731 } else if (protocol_ >= kProtoSPDY3) {
732 flow_control_state_ = FLOW_CONTROL_STREAM;
734 flow_control_state_ = FLOW_CONTROL_NONE;
737 buffered_spdy_framer_.reset(
738 new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
739 enable_compression_));
740 buffered_spdy_framer_->set_visitor(this);
741 buffered_spdy_framer_->set_debug_visitor(this);
742 UMA_HISTOGRAM_ENUMERATION(
744 protocol_ - kProtoSPDYMinimumVersion,
745 kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1);
747 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED,
748 base::Bind(&NetLogSpdyInitializedCallback,
749 connection_->socket()->NetLog().source(),
752 DCHECK_EQ(availability_state_, STATE_AVAILABLE);
753 connection_->AddHigherLayeredPool(this);
754 if (enable_sending_initial_data_)
758 // Bootstrap the read loop.
759 base::MessageLoop::current()->PostTask(
761 base::Bind(&SpdySession::PumpReadLoop,
762 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
765 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
766 if (!verify_domain_authentication_)
769 if (availability_state_ == STATE_DRAINING)
773 bool was_npn_negotiated;
774 NextProto protocol_negotiated = kProtoUnknown;
775 if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
776 return true; // This is not a secure session, so all domains are okay.
778 return CanPool(transport_security_state_, ssl_info,
779 host_port_pair().host(), domain);
782 int SpdySession::GetPushStream(
784 base::WeakPtr<SpdyStream>* stream,
785 const BoundNetLog& stream_net_log) {
790 if (availability_state_ == STATE_DRAINING)
791 return ERR_CONNECTION_CLOSED;
793 Error err = TryAccessStream(url);
797 *stream = GetActivePushStream(url);
799 DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
800 streams_pushed_and_claimed_count_++;
805 // {,Try}CreateStream() and TryAccessStream() can be called with
806 // |in_io_loop_| set if a stream is being created in response to
807 // another being closed due to received data.
809 Error SpdySession::TryAccessStream(const GURL& url) {
810 if (is_secure_ && certificate_error_code_ != OK &&
811 (url.SchemeIs("https") || url.SchemeIs("wss"))) {
812 RecordProtocolErrorHistogram(
813 PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
815 static_cast<Error>(certificate_error_code_),
816 "Tried to get SPDY stream for secure content over an unauthenticated "
818 return ERR_SPDY_PROTOCOL_ERROR;
823 int SpdySession::TryCreateStream(
824 const base::WeakPtr<SpdyStreamRequest>& request,
825 base::WeakPtr<SpdyStream>* stream) {
828 if (availability_state_ == STATE_GOING_AWAY)
831 if (availability_state_ == STATE_DRAINING)
832 return ERR_CONNECTION_CLOSED;
834 Error err = TryAccessStream(request->url());
838 if (!max_concurrent_streams_ ||
839 (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
840 max_concurrent_streams_)) {
841 return CreateStream(*request, stream);
845 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
846 RequestPriority priority = request->priority();
847 CHECK_GE(priority, MINIMUM_PRIORITY);
848 CHECK_LE(priority, MAXIMUM_PRIORITY);
849 pending_create_stream_queues_[priority].push_back(request);
850 return ERR_IO_PENDING;
853 int SpdySession::CreateStream(const SpdyStreamRequest& request,
854 base::WeakPtr<SpdyStream>* stream) {
855 DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
856 DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
858 if (availability_state_ == STATE_GOING_AWAY)
861 if (availability_state_ == STATE_DRAINING)
862 return ERR_CONNECTION_CLOSED;
864 Error err = TryAccessStream(request.url());
866 // This should have been caught in TryCreateStream().
871 DCHECK(connection_->socket());
872 DCHECK(connection_->socket()->IsConnected());
873 if (connection_->socket()) {
874 UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
875 connection_->socket()->IsConnected());
876 if (!connection_->socket()->IsConnected()) {
878 ERR_CONNECTION_CLOSED,
879 "Tried to create SPDY stream for a closed socket connection.");
880 return ERR_CONNECTION_CLOSED;
884 scoped_ptr<SpdyStream> new_stream(
885 new SpdyStream(request.type(), GetWeakPtr(), request.url(),
887 stream_initial_send_window_size_,
888 stream_initial_recv_window_size_,
890 *stream = new_stream->GetWeakPtr();
891 InsertCreatedStream(new_stream.Pass());
893 UMA_HISTOGRAM_CUSTOM_COUNTS(
894 "Net.SpdyPriorityCount",
895 static_cast<int>(request.priority()), 0, 10, 11);
900 void SpdySession::CancelStreamRequest(
901 const base::WeakPtr<SpdyStreamRequest>& request) {
903 RequestPriority priority = request->priority();
904 CHECK_GE(priority, MINIMUM_PRIORITY);
905 CHECK_LE(priority, MAXIMUM_PRIORITY);
908 // |request| should not be in a queue not matching its priority.
909 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
912 PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
913 DCHECK(std::find_if(queue->begin(),
915 RequestEquals(request)) == queue->end());
919 PendingStreamRequestQueue* queue =
920 &pending_create_stream_queues_[priority];
921 // Remove |request| from |queue| while preserving the order of the
923 PendingStreamRequestQueue::iterator it =
924 std::find_if(queue->begin(), queue->end(), RequestEquals(request));
925 // The request may already be removed if there's a
926 // CompleteStreamRequest() in flight.
927 if (it != queue->end()) {
928 it = queue->erase(it);
929 // |request| should be in the queue at most once, and if it is
930 // present, should not be pending completion.
931 DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
936 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
937 for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
938 if (pending_create_stream_queues_[j].empty())
941 base::WeakPtr<SpdyStreamRequest> pending_request =
942 pending_create_stream_queues_[j].front();
943 DCHECK(pending_request);
944 pending_create_stream_queues_[j].pop_front();
945 return pending_request;
947 return base::WeakPtr<SpdyStreamRequest>();
950 void SpdySession::ProcessPendingStreamRequests() {
951 // Like |max_concurrent_streams_|, 0 means infinite for
952 // |max_requests_to_process|.
953 size_t max_requests_to_process = 0;
954 if (max_concurrent_streams_ != 0) {
955 max_requests_to_process =
956 max_concurrent_streams_ -
957 (active_streams_.size() + created_streams_.size());
960 max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
961 base::WeakPtr<SpdyStreamRequest> pending_request =
962 GetNextPendingStreamRequest();
963 if (!pending_request)
966 // Note that this post can race with other stream creations, and it's
967 // possible that the un-stalled stream will be stalled again if it loses.
968 // TODO(jgraettinger): Provide stronger ordering guarantees.
969 base::MessageLoop::current()->PostTask(
971 base::Bind(&SpdySession::CompleteStreamRequest,
972 weak_factory_.GetWeakPtr(),
977 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
978 pooled_aliases_.insert(alias_key);
981 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
982 DCHECK(buffered_spdy_framer_.get());
983 return buffered_spdy_framer_->protocol_version();
986 bool SpdySession::HasAcceptableTransportSecurity() const {
987 // If we're not even using TLS, we have no standards to meet.
992 // We don't enforce transport security standards for older SPDY versions.
993 if (GetProtocolVersion() < SPDY4) {
998 CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
1000 // HTTP/2 requires TLS 1.2+
1001 if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
1002 SSL_CONNECTION_VERSION_TLS1_2) {
1006 if (!IsSecureTLSCipherSuite(
1007 SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
1014 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
1015 return weak_factory_.GetWeakPtr();
1018 bool SpdySession::CloseOneIdleConnection() {
1019 CHECK(!in_io_loop_);
1021 if (active_streams_.empty()) {
1022 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1024 // Return false as the socket wasn't immediately closed.
1028 void SpdySession::EnqueueStreamWrite(
1029 const base::WeakPtr<SpdyStream>& stream,
1030 SpdyFrameType frame_type,
1031 scoped_ptr<SpdyBufferProducer> producer) {
1032 DCHECK(frame_type == HEADERS ||
1033 frame_type == DATA ||
1034 frame_type == CREDENTIAL ||
1035 frame_type == SYN_STREAM);
1036 EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1039 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1040 SpdyStreamId stream_id,
1041 RequestPriority priority,
1042 SpdyControlFlags flags,
1043 const SpdyHeaderBlock& block) {
1044 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1045 CHECK(it != active_streams_.end());
1046 CHECK_EQ(it->second.stream->stream_id(), stream_id);
1048 SendPrefacePingIfNoneInFlight();
1050 DCHECK(buffered_spdy_framer_.get());
1051 SpdyPriority spdy_priority =
1052 ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1054 scoped_ptr<SpdyFrame> syn_frame;
1055 // TODO(hkhalil): Avoid copy of |block|.
1056 if (GetProtocolVersion() <= SPDY3) {
1057 SpdySynStreamIR syn_stream(stream_id);
1058 syn_stream.set_associated_to_stream_id(0);
1059 syn_stream.set_priority(spdy_priority);
1060 syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1061 syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1062 syn_stream.set_name_value_block(block);
1063 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1065 SpdyHeadersIR headers(stream_id);
1066 headers.set_priority(spdy_priority);
1067 headers.set_has_priority(true);
1068 headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1069 headers.set_name_value_block(block);
1070 syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1073 base::StatsCounter spdy_requests("spdy.requests");
1074 spdy_requests.Increment();
1075 streams_initiated_count_++;
1077 if (net_log().IsLogging()) {
1078 net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1079 base::Bind(&NetLogSpdySynStreamSentCallback,
1081 (flags & CONTROL_FLAG_FIN) != 0,
1082 (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1087 return syn_frame.Pass();
1090 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1093 SpdyDataFlags flags) {
1094 if (availability_state_ == STATE_DRAINING) {
1095 return scoped_ptr<SpdyBuffer>();
1098 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1099 CHECK(it != active_streams_.end());
1100 SpdyStream* stream = it->second.stream;
1101 CHECK_EQ(stream->stream_id(), stream_id);
1105 return scoped_ptr<SpdyBuffer>();
1108 int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1110 bool send_stalled_by_stream =
1111 (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1112 (stream->send_window_size() <= 0);
1113 bool send_stalled_by_session = IsSendStalled();
1115 // NOTE: There's an enum of the same name in histograms.xml.
1116 enum SpdyFrameFlowControlState {
1118 SEND_STALLED_BY_STREAM,
1119 SEND_STALLED_BY_SESSION,
1120 SEND_STALLED_BY_STREAM_AND_SESSION,
1123 SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1124 if (send_stalled_by_stream) {
1125 if (send_stalled_by_session) {
1126 frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1128 frame_flow_control_state = SEND_STALLED_BY_STREAM;
1130 } else if (send_stalled_by_session) {
1131 frame_flow_control_state = SEND_STALLED_BY_SESSION;
1134 if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1135 UMA_HISTOGRAM_ENUMERATION(
1136 "Net.SpdyFrameStreamFlowControlState",
1137 frame_flow_control_state,
1138 SEND_STALLED_BY_STREAM + 1);
1139 } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1140 UMA_HISTOGRAM_ENUMERATION(
1141 "Net.SpdyFrameStreamAndSessionFlowControlState",
1142 frame_flow_control_state,
1143 SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1146 // Obey send window size of the stream if stream flow control is
1148 if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1149 if (send_stalled_by_stream) {
1150 stream->set_send_stalled_by_flow_control(true);
1151 // Even though we're currently stalled only by the stream, we
1152 // might end up being stalled by the session also.
1153 QueueSendStalledStream(*stream);
1155 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1156 NetLog::IntegerCallback("stream_id", stream_id));
1157 return scoped_ptr<SpdyBuffer>();
1160 effective_len = std::min(effective_len, stream->send_window_size());
1163 // Obey send window size of the session if session flow control is
1165 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1166 if (send_stalled_by_session) {
1167 stream->set_send_stalled_by_flow_control(true);
1168 QueueSendStalledStream(*stream);
1170 NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1171 NetLog::IntegerCallback("stream_id", stream_id));
1172 return scoped_ptr<SpdyBuffer>();
1175 effective_len = std::min(effective_len, session_send_window_size_);
1178 DCHECK_GE(effective_len, 0);
1180 // Clear FIN flag if only some of the data will be in the data
1182 if (effective_len < len)
1183 flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1185 if (net_log().IsLogging()) {
1187 NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1188 base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1189 (flags & DATA_FLAG_FIN) != 0));
1192 // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1193 if (effective_len > 0)
1194 SendPrefacePingIfNoneInFlight();
1196 // TODO(mbelshe): reduce memory copies here.
1197 DCHECK(buffered_spdy_framer_.get());
1198 scoped_ptr<SpdyFrame> frame(
1199 buffered_spdy_framer_->CreateDataFrame(
1200 stream_id, data->data(),
1201 static_cast<uint32>(effective_len), flags));
1203 scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1205 // Send window size is based on payload size, so nothing to do if this is
1206 // just a FIN with no payload.
1207 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
1208 effective_len != 0) {
1209 DecreaseSendWindowSize(static_cast<int32>(effective_len));
1210 data_buffer->AddConsumeCallback(
1211 base::Bind(&SpdySession::OnWriteBufferConsumed,
1212 weak_factory_.GetWeakPtr(),
1213 static_cast<size_t>(effective_len)));
1216 return data_buffer.Pass();
1219 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1220 DCHECK_NE(stream_id, 0u);
1222 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1223 if (it == active_streams_.end()) {
1228 CloseActiveStreamIterator(it, status);
1231 void SpdySession::CloseCreatedStream(
1232 const base::WeakPtr<SpdyStream>& stream, int status) {
1233 DCHECK_EQ(stream->stream_id(), 0u);
1235 CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1236 if (it == created_streams_.end()) {
1241 CloseCreatedStreamIterator(it, status);
1244 void SpdySession::ResetStream(SpdyStreamId stream_id,
1245 SpdyRstStreamStatus status,
1246 const std::string& description) {
1247 DCHECK_NE(stream_id, 0u);
1249 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1250 if (it == active_streams_.end()) {
1255 ResetStreamIterator(it, status, description);
1258 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1259 return ContainsKey(active_streams_, stream_id);
1262 LoadState SpdySession::GetLoadState() const {
1263 // Just report that we're idle since the session could be doing
1264 // many things concurrently.
1265 return LOAD_STATE_IDLE;
1268 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1270 // TODO(mbelshe): We should send a RST_STREAM control frame here
1271 // so that the server can cancel a large send.
1273 scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1274 active_streams_.erase(it);
1276 // TODO(akalin): When SpdyStream was ref-counted (and
1277 // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1278 // was only done when status was not OK. This meant that pushed
1279 // streams can still be claimed after they're closed. This is
1280 // probably something that we still want to support, although server
1281 // push is hardly used. Write tests for this and fix this. (See
1282 // http://crbug.com/261712 .)
1283 if (owned_stream->type() == SPDY_PUSH_STREAM) {
1284 unclaimed_pushed_streams_.erase(owned_stream->url());
1285 num_pushed_streams_--;
1286 if (!owned_stream->IsReservedRemote())
1287 num_active_pushed_streams_--;
1290 DeleteStream(owned_stream.Pass(), status);
1291 MaybeFinishGoingAway();
1293 // If there are no active streams and the socket pool is stalled, close the
1294 // session to free up a socket slot.
1295 if (active_streams_.empty() && connection_->IsPoolStalled()) {
1296 DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1300 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1302 scoped_ptr<SpdyStream> owned_stream(*it);
1303 created_streams_.erase(it);
1304 DeleteStream(owned_stream.Pass(), status);
1307 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1308 SpdyRstStreamStatus status,
1309 const std::string& description) {
1310 // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1312 SpdyStreamId stream_id = it->first;
1313 RequestPriority priority = it->second.stream->priority();
1314 EnqueueResetStreamFrame(stream_id, priority, status, description);
1316 // Removes any pending writes for the stream except for possibly an
1318 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1321 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1322 RequestPriority priority,
1323 SpdyRstStreamStatus status,
1324 const std::string& description) {
1325 DCHECK_NE(stream_id, 0u);
1328 NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1329 base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1331 DCHECK(buffered_spdy_framer_.get());
1332 scoped_ptr<SpdyFrame> rst_frame(
1333 buffered_spdy_framer_->CreateRstStream(stream_id, status));
1335 EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1336 RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1339 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1340 // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed.
1341 tracked_objects::ScopedTracker tracking_profile(
1342 FROM_HERE_WITH_EXPLICIT_FUNCTION(
1343 "418183 DoReadCallback => SpdySession::PumpReadLoop"));
1345 CHECK(!in_io_loop_);
1346 if (availability_state_ == STATE_DRAINING) {
1349 ignore_result(DoReadLoop(expected_read_state, result));
1352 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1353 CHECK(!in_io_loop_);
1354 CHECK_EQ(read_state_, expected_read_state);
1358 int bytes_read_without_yielding = 0;
1360 // Loop until the session is draining, the read becomes blocked, or
1361 // the read limit is exceeded.
1363 switch (read_state_) {
1364 case READ_STATE_DO_READ:
1365 CHECK_EQ(result, OK);
1368 case READ_STATE_DO_READ_COMPLETE:
1370 bytes_read_without_yielding += result;
1371 result = DoReadComplete(result);
1374 NOTREACHED() << "read_state_: " << read_state_;
1378 if (availability_state_ == STATE_DRAINING)
1381 if (result == ERR_IO_PENDING)
1384 if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1385 read_state_ = READ_STATE_DO_READ;
1386 base::MessageLoop::current()->PostTask(
1388 base::Bind(&SpdySession::PumpReadLoop,
1389 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1390 result = ERR_IO_PENDING;
1396 in_io_loop_ = false;
1401 int SpdySession::DoRead() {
1405 CHECK(connection_->socket());
1406 read_state_ = READ_STATE_DO_READ_COMPLETE;
1407 return connection_->socket()->Read(
1410 base::Bind(&SpdySession::PumpReadLoop,
1411 weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1414 int SpdySession::DoReadComplete(int result) {
1417 // Parse a frame. For now this code requires that the frame fit into our
1418 // buffer (kReadBufferSize).
1419 // TODO(mbelshe): support arbitrarily large frames!
1422 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1423 total_bytes_received_, 1, 100000000, 50);
1424 DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1426 return ERR_CONNECTION_CLOSED;
1430 DoDrainSession(static_cast<Error>(result), "result is < 0.");
1433 CHECK_LE(result, kReadBufferSize);
1434 total_bytes_received_ += result;
1436 last_activity_time_ = time_func_();
1438 DCHECK(buffered_spdy_framer_.get());
1439 char* data = read_buffer_->data();
1440 while (result > 0) {
1441 uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1442 result -= bytes_processed;
1443 data += bytes_processed;
1445 if (availability_state_ == STATE_DRAINING) {
1446 return ERR_CONNECTION_CLOSED;
1449 DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1452 read_state_ = READ_STATE_DO_READ;
1456 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1457 CHECK(!in_io_loop_);
1458 DCHECK_EQ(write_state_, expected_write_state);
1460 DoWriteLoop(expected_write_state, result);
1462 if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1463 write_queue_.IsEmpty()) {
1464 pool_->RemoveUnavailableSession(GetWeakPtr()); // Destroys |this|.
1469 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1470 CHECK(!in_io_loop_);
1471 DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1472 DCHECK_EQ(write_state_, expected_write_state);
1476 // Loop until the session is closed or the write becomes blocked.
1478 switch (write_state_) {
1479 case WRITE_STATE_DO_WRITE:
1480 DCHECK_EQ(result, OK);
1483 case WRITE_STATE_DO_WRITE_COMPLETE:
1484 result = DoWriteComplete(result);
1486 case WRITE_STATE_IDLE:
1488 NOTREACHED() << "write_state_: " << write_state_;
1492 if (write_state_ == WRITE_STATE_IDLE) {
1493 DCHECK_EQ(result, ERR_IO_PENDING);
1497 if (result == ERR_IO_PENDING)
1502 in_io_loop_ = false;
1507 int SpdySession::DoWrite() {
1510 DCHECK(buffered_spdy_framer_);
1511 if (in_flight_write_) {
1512 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1514 // Grab the next frame to send.
1515 SpdyFrameType frame_type = DATA;
1516 scoped_ptr<SpdyBufferProducer> producer;
1517 base::WeakPtr<SpdyStream> stream;
1518 if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1519 write_state_ = WRITE_STATE_IDLE;
1520 return ERR_IO_PENDING;
1524 CHECK(!stream->IsClosed());
1526 // Activate the stream only when sending the SYN_STREAM frame to
1527 // guarantee monotonically-increasing stream IDs.
1528 if (frame_type == SYN_STREAM) {
1529 CHECK(stream.get());
1530 CHECK_EQ(stream->stream_id(), 0u);
1531 scoped_ptr<SpdyStream> owned_stream =
1532 ActivateCreatedStream(stream.get());
1533 InsertActivatedStream(owned_stream.Pass());
1535 if (stream_hi_water_mark_ > kLastStreamId) {
1536 CHECK_EQ(stream->stream_id(), kLastStreamId);
1537 // We've exhausted the stream ID space, and no new streams may be
1538 // created after this one.
1540 StartGoingAway(kLastStreamId, ERR_ABORTED);
1544 in_flight_write_ = producer->ProduceBuffer();
1545 if (!in_flight_write_) {
1547 return ERR_UNEXPECTED;
1549 in_flight_write_frame_type_ = frame_type;
1550 in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1551 DCHECK_GE(in_flight_write_frame_size_,
1552 buffered_spdy_framer_->GetFrameMinimumSize());
1553 in_flight_write_stream_ = stream;
1556 write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1558 // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1559 // with Socket implementations that don't store their IOBuffer
1560 // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1561 scoped_refptr<IOBuffer> write_io_buffer =
1562 in_flight_write_->GetIOBufferForRemainingData();
1563 return connection_->socket()->Write(
1564 write_io_buffer.get(),
1565 in_flight_write_->GetRemainingSize(),
1566 base::Bind(&SpdySession::PumpWriteLoop,
1567 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1570 int SpdySession::DoWriteComplete(int result) {
1572 DCHECK_NE(result, ERR_IO_PENDING);
1573 DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1575 last_activity_time_ = time_func_();
1578 DCHECK_NE(result, ERR_IO_PENDING);
1579 in_flight_write_.reset();
1580 in_flight_write_frame_type_ = DATA;
1581 in_flight_write_frame_size_ = 0;
1582 in_flight_write_stream_.reset();
1583 write_state_ = WRITE_STATE_DO_WRITE;
1584 DoDrainSession(static_cast<Error>(result), "Write error");
1588 // It should not be possible to have written more bytes than our
1589 // in_flight_write_.
1590 DCHECK_LE(static_cast<size_t>(result),
1591 in_flight_write_->GetRemainingSize());
1594 in_flight_write_->Consume(static_cast<size_t>(result));
1596 // We only notify the stream when we've fully written the pending frame.
1597 if (in_flight_write_->GetRemainingSize() == 0) {
1598 // It is possible that the stream was cancelled while we were
1599 // writing to the socket.
1600 if (in_flight_write_stream_.get()) {
1601 DCHECK_GT(in_flight_write_frame_size_, 0u);
1602 in_flight_write_stream_->OnFrameWriteComplete(
1603 in_flight_write_frame_type_,
1604 in_flight_write_frame_size_);
1607 // Cleanup the write which just completed.
1608 in_flight_write_.reset();
1609 in_flight_write_frame_type_ = DATA;
1610 in_flight_write_frame_size_ = 0;
1611 in_flight_write_stream_.reset();
1615 write_state_ = WRITE_STATE_DO_WRITE;
1619 void SpdySession::DcheckGoingAway() const {
1621 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1622 for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1623 DCHECK(pending_create_stream_queues_[i].empty());
1625 DCHECK(created_streams_.empty());
1629 void SpdySession::DcheckDraining() const {
1631 DCHECK_EQ(availability_state_, STATE_DRAINING);
1632 DCHECK(active_streams_.empty());
1633 DCHECK(unclaimed_pushed_streams_.empty());
1636 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1638 DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1640 // The loops below are carefully written to avoid reentrancy problems.
1643 size_t old_size = GetTotalSize(pending_create_stream_queues_);
1644 base::WeakPtr<SpdyStreamRequest> pending_request =
1645 GetNextPendingStreamRequest();
1646 if (!pending_request)
1648 // No new stream requests should be added while the session is
1650 DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1651 pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1655 size_t old_size = active_streams_.size();
1656 ActiveStreamMap::iterator it =
1657 active_streams_.lower_bound(last_good_stream_id + 1);
1658 if (it == active_streams_.end())
1660 LogAbandonedActiveStream(it, status);
1661 CloseActiveStreamIterator(it, status);
1662 // No new streams should be activated while the session is going
1664 DCHECK_GT(old_size, active_streams_.size());
1667 while (!created_streams_.empty()) {
1668 size_t old_size = created_streams_.size();
1669 CreatedStreamSet::iterator it = created_streams_.begin();
1670 LogAbandonedStream(*it, status);
1671 CloseCreatedStreamIterator(it, status);
1672 // No new streams should be created while the session is going
1674 DCHECK_GT(old_size, created_streams_.size());
1677 write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1682 void SpdySession::MaybeFinishGoingAway() {
1683 if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1684 DoDrainSession(OK, "Finished going away");
1688 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1689 if (availability_state_ == STATE_DRAINING) {
1694 // If |err| indicates an error occurred, inform the peer that we're closing
1695 // and why. Don't GOAWAY on a graceful or idle close, as that may
1696 // unnecessarily wake the radio. We could technically GOAWAY on network errors
1697 // (we'll probably fail to actually write it, but that's okay), however many
1698 // unit-tests would need to be updated.
1700 err != ERR_ABORTED && // Used by SpdySessionPool to close idle sessions.
1701 err != ERR_NETWORK_CHANGED && // Used to deprecate sessions on IP change.
1702 err != ERR_SOCKET_NOT_CONNECTED &&
1703 err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1704 // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1705 SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_,
1706 MapNetErrorToGoAwayStatus(err),
1708 EnqueueSessionWrite(HIGHEST,
1710 scoped_ptr<SpdyFrame>(
1711 buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1714 availability_state_ = STATE_DRAINING;
1715 error_on_close_ = err;
1718 NetLog::TYPE_SPDY_SESSION_CLOSE,
1719 base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1721 UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1722 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1723 total_bytes_received_, 1, 100000000, 50);
1726 // We ought to be going away already, as this is a graceful close.
1729 StartGoingAway(0, err);
1732 MaybePostWriteLoop();
1735 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1737 std::string description = base::StringPrintf(
1738 "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1739 stream->url().spec();
1740 stream->LogStreamError(status, description);
1741 // We don't increment the streams abandoned counter here. If the
1742 // stream isn't active (i.e., it hasn't written anything to the wire
1743 // yet) then it's as if it never existed. If it is active, then
1744 // LogAbandonedActiveStream() will increment the counters.
1747 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1749 DCHECK_GT(it->first, 0u);
1750 LogAbandonedStream(it->second.stream, status);
1751 ++streams_abandoned_count_;
1752 base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1753 abandoned_streams.Increment();
1754 if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1755 unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1756 unclaimed_pushed_streams_.end()) {
1757 base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1758 abandoned_push_streams.Increment();
1762 SpdyStreamId SpdySession::GetNewStreamId() {
1763 CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1764 SpdyStreamId id = stream_hi_water_mark_;
1765 stream_hi_water_mark_ += 2;
1769 void SpdySession::CloseSessionOnError(Error err,
1770 const std::string& description) {
1771 DCHECK_LT(err, ERR_IO_PENDING);
1772 DoDrainSession(err, description);
1775 void SpdySession::MakeUnavailable() {
1776 if (availability_state_ == STATE_AVAILABLE) {
1777 availability_state_ = STATE_GOING_AWAY;
1778 pool_->MakeSessionUnavailable(GetWeakPtr());
1782 base::Value* SpdySession::GetInfoAsValue() const {
1783 base::DictionaryValue* dict = new base::DictionaryValue();
1785 dict->SetInteger("source_id", net_log_.source().id);
1787 dict->SetString("host_port_pair", host_port_pair().ToString());
1788 if (!pooled_aliases_.empty()) {
1789 base::ListValue* alias_list = new base::ListValue();
1790 for (std::set<SpdySessionKey>::const_iterator it =
1791 pooled_aliases_.begin();
1792 it != pooled_aliases_.end(); it++) {
1793 alias_list->Append(new base::StringValue(
1794 it->host_port_pair().ToString()));
1796 dict->Set("aliases", alias_list);
1798 dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1800 dict->SetInteger("active_streams", active_streams_.size());
1802 dict->SetInteger("unclaimed_pushed_streams",
1803 unclaimed_pushed_streams_.size());
1805 dict->SetBoolean("is_secure", is_secure_);
1807 dict->SetString("protocol_negotiated",
1808 SSLClientSocket::NextProtoToString(
1809 connection_->socket()->GetNegotiatedProtocol()));
1811 dict->SetInteger("error", error_on_close_);
1812 dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1814 dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1815 dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1816 dict->SetInteger("streams_pushed_and_claimed_count",
1817 streams_pushed_and_claimed_count_);
1818 dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1819 DCHECK(buffered_spdy_framer_.get());
1820 dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1822 dict->SetBoolean("sent_settings", sent_settings_);
1823 dict->SetBoolean("received_settings", received_settings_);
1825 dict->SetInteger("send_window_size", session_send_window_size_);
1826 dict->SetInteger("recv_window_size", session_recv_window_size_);
1827 dict->SetInteger("unacked_recv_window_bytes",
1828 session_unacked_recv_window_bytes_);
1832 bool SpdySession::IsReused() const {
1833 return buffered_spdy_framer_->frames_received() > 0 ||
1834 connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1837 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1838 LoadTimingInfo* load_timing_info) const {
1839 return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1843 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1844 int rv = ERR_SOCKET_NOT_CONNECTED;
1845 if (connection_->socket()) {
1846 rv = connection_->socket()->GetPeerAddress(address);
1849 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1850 rv == ERR_SOCKET_NOT_CONNECTED);
1855 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1856 int rv = ERR_SOCKET_NOT_CONNECTED;
1857 if (connection_->socket()) {
1858 rv = connection_->socket()->GetLocalAddress(address);
1861 UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1862 rv == ERR_SOCKET_NOT_CONNECTED);
1867 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1868 SpdyFrameType frame_type,
1869 scoped_ptr<SpdyFrame> frame) {
1870 DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1871 frame_type == WINDOW_UPDATE || frame_type == PING ||
1872 frame_type == GOAWAY);
1874 priority, frame_type,
1875 scoped_ptr<SpdyBufferProducer>(
1876 new SimpleBufferProducer(
1877 scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1878 base::WeakPtr<SpdyStream>());
1881 void SpdySession::EnqueueWrite(RequestPriority priority,
1882 SpdyFrameType frame_type,
1883 scoped_ptr<SpdyBufferProducer> producer,
1884 const base::WeakPtr<SpdyStream>& stream) {
1885 if (availability_state_ == STATE_DRAINING)
1888 write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1889 MaybePostWriteLoop();
1892 void SpdySession::MaybePostWriteLoop() {
1893 if (write_state_ == WRITE_STATE_IDLE) {
1894 CHECK(!in_flight_write_);
1895 write_state_ = WRITE_STATE_DO_WRITE;
1896 base::MessageLoop::current()->PostTask(
1898 base::Bind(&SpdySession::PumpWriteLoop,
1899 weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1903 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1904 CHECK_EQ(stream->stream_id(), 0u);
1905 CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1906 created_streams_.insert(stream.release());
1909 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1910 CHECK_EQ(stream->stream_id(), 0u);
1911 CHECK(created_streams_.find(stream) != created_streams_.end());
1912 stream->set_stream_id(GetNewStreamId());
1913 scoped_ptr<SpdyStream> owned_stream(stream);
1914 created_streams_.erase(stream);
1915 return owned_stream.Pass();
1918 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1919 SpdyStreamId stream_id = stream->stream_id();
1920 CHECK_NE(stream_id, 0u);
1921 std::pair<ActiveStreamMap::iterator, bool> result =
1922 active_streams_.insert(
1923 std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1924 CHECK(result.second);
1925 ignore_result(stream.release());
1928 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1929 if (in_flight_write_stream_.get() == stream.get()) {
1930 // If we're deleting the stream for the in-flight write, we still
1931 // need to let the write complete, so we clear
1932 // |in_flight_write_stream_| and let the write finish on its own
1933 // without notifying |in_flight_write_stream_|.
1934 in_flight_write_stream_.reset();
1937 write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1938 stream->OnClose(status);
1940 if (availability_state_ == STATE_AVAILABLE) {
1941 ProcessPendingStreamRequests();
1945 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1946 base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1948 PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1949 if (unclaimed_it == unclaimed_pushed_streams_.end())
1950 return base::WeakPtr<SpdyStream>();
1952 SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1953 unclaimed_pushed_streams_.erase(unclaimed_it);
1955 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1956 if (active_it == active_streams_.end()) {
1958 return base::WeakPtr<SpdyStream>();
1961 net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1962 used_push_streams.Increment();
1963 return active_it->second.stream->GetWeakPtr();
1966 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1967 bool* was_npn_negotiated,
1968 NextProto* protocol_negotiated) {
1969 *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1970 *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1971 return connection_->socket()->GetSSLInfo(ssl_info);
1974 bool SpdySession::GetSSLCertRequestInfo(
1975 SSLCertRequestInfo* cert_request_info) {
1978 GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1982 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1985 RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1986 std::string description =
1987 base::StringPrintf("Framer error: %d (%s).",
1989 SpdyFramer::ErrorCodeToString(error_code));
1990 DoDrainSession(MapFramerErrorToNetError(error_code), description);
1993 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1994 const std::string& description) {
1997 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1998 if (it == active_streams_.end()) {
1999 // We still want to send a frame to reset the stream even if we
2000 // don't know anything about it.
2001 EnqueueResetStreamFrame(
2002 stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
2006 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
2009 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
2014 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2016 // By the time data comes in, the stream may already be inactive.
2017 if (it == active_streams_.end())
2020 SpdyStream* stream = it->second.stream;
2021 CHECK_EQ(stream->stream_id(), stream_id);
2023 DCHECK(buffered_spdy_framer_);
2024 size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
2025 stream->IncrementRawReceivedBytes(header_len);
2028 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2034 if (data == NULL && len != 0) {
2035 // This is notification of consumed data padding.
2036 // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2037 // See crbug.com/353012.
2041 DCHECK_LT(len, 1u << 24);
2042 if (net_log().IsLogging()) {
2044 NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2045 base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2048 // Build the buffer as early as possible so that we go through the
2049 // session flow control checks and update
2050 // |unacked_recv_window_bytes_| properly even when the stream is
2051 // inactive (since the other side has still reduced its session send
2053 scoped_ptr<SpdyBuffer> buffer;
2056 CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2057 buffer.reset(new SpdyBuffer(data, len));
2059 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2060 DecreaseRecvWindowSize(static_cast<int32>(len));
2061 buffer->AddConsumeCallback(
2062 base::Bind(&SpdySession::OnReadBufferConsumed,
2063 weak_factory_.GetWeakPtr()));
2069 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2071 // By the time data comes in, the stream may already be inactive.
2072 if (it == active_streams_.end())
2075 SpdyStream* stream = it->second.stream;
2076 CHECK_EQ(stream->stream_id(), stream_id);
2078 stream->IncrementRawReceivedBytes(len);
2080 if (it->second.waiting_for_syn_reply) {
2081 const std::string& error = "Data received before SYN_REPLY.";
2082 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2083 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2087 stream->OnDataReceived(buffer.Pass());
2090 void SpdySession::OnSettings(bool clear_persisted) {
2093 if (clear_persisted)
2094 http_server_properties_->ClearSpdySettings(host_port_pair());
2096 if (net_log_.IsLogging()) {
2098 NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2099 base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2103 if (GetProtocolVersion() >= SPDY4) {
2104 // Send an acknowledgment of the setting.
2105 SpdySettingsIR settings_ir;
2106 settings_ir.set_is_ack(true);
2107 EnqueueSessionWrite(
2110 scoped_ptr<SpdyFrame>(
2111 buffered_spdy_framer_->SerializeFrame(settings_ir)));
2115 void SpdySession::OnSetting(SpdySettingsIds id,
2120 HandleSetting(id, value);
2121 http_server_properties_->SetSpdySetting(
2124 static_cast<SpdySettingsFlags>(flags),
2126 received_settings_ = true;
2129 const SpdyMajorVersion protocol_version = GetProtocolVersion();
2130 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2131 base::Bind(&NetLogSpdySettingCallback,
2134 static_cast<SpdySettingsFlags>(flags),
2138 void SpdySession::OnSendCompressedFrame(
2139 SpdyStreamId stream_id,
2143 if (type != SYN_STREAM && type != HEADERS)
2146 DCHECK(buffered_spdy_framer_.get());
2147 size_t compressed_len =
2148 frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2151 // Make sure we avoid early decimal truncation.
2152 int compression_pct = 100 - (100 * compressed_len) / payload_len;
2153 UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2158 void SpdySession::OnReceiveCompressedFrame(
2159 SpdyStreamId stream_id,
2162 last_compressed_frame_len_ = frame_len;
2165 int SpdySession::OnInitialResponseHeadersReceived(
2166 const SpdyHeaderBlock& response_headers,
2167 base::Time response_time,
2168 base::TimeTicks recv_first_byte_time,
2169 SpdyStream* stream) {
2171 SpdyStreamId stream_id = stream->stream_id();
2173 if (stream->type() == SPDY_PUSH_STREAM) {
2174 DCHECK(stream->IsReservedRemote());
2175 if (max_concurrent_pushed_streams_ &&
2176 num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2177 ResetStream(stream_id,
2178 RST_STREAM_REFUSED_STREAM,
2179 "Stream concurrency limit reached.");
2180 return STATUS_CODE_REFUSED_STREAM;
2184 if (stream->type() == SPDY_PUSH_STREAM) {
2185 // Will be balanced in DeleteStream.
2186 num_active_pushed_streams_++;
2189 // May invalidate |stream|.
2190 int rv = stream->OnInitialResponseHeadersReceived(
2191 response_headers, response_time, recv_first_byte_time);
2193 DCHECK_NE(rv, ERR_IO_PENDING);
2194 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2200 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2201 SpdyStreamId associated_stream_id,
2202 SpdyPriority priority,
2204 bool unidirectional,
2205 const SpdyHeaderBlock& headers) {
2208 DCHECK_LE(GetProtocolVersion(), SPDY3);
2210 base::Time response_time = base::Time::Now();
2211 base::TimeTicks recv_first_byte_time = time_func_();
2213 if (net_log_.IsLogging()) {
2215 NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2216 base::Bind(&NetLogSpdySynStreamReceivedCallback,
2217 &headers, fin, unidirectional, priority,
2218 stream_id, associated_stream_id));
2221 // Split headers to simulate push promise and response.
2222 SpdyHeaderBlock request_headers;
2223 SpdyHeaderBlock response_headers;
2224 SplitPushedHeadersToRequestAndResponse(
2225 headers, GetProtocolVersion(), &request_headers, &response_headers);
2227 if (!TryCreatePushStream(
2228 stream_id, associated_stream_id, priority, request_headers))
2231 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2232 if (active_it == active_streams_.end()) {
2237 if (OnInitialResponseHeadersReceived(response_headers,
2239 recv_first_byte_time,
2240 active_it->second.stream) != OK)
2243 base::StatsCounter push_requests("spdy.pushed_streams");
2244 push_requests.Increment();
2247 void SpdySession::DeleteExpiredPushedStreams() {
2248 if (unclaimed_pushed_streams_.empty())
2251 // Check that adequate time has elapsed since the last sweep.
2252 if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2255 // Gather old streams to delete.
2256 base::TimeTicks minimum_freshness = time_func_() -
2257 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2258 std::vector<SpdyStreamId> streams_to_close;
2259 for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2260 it != unclaimed_pushed_streams_.end(); ++it) {
2261 if (minimum_freshness > it->second.creation_time)
2262 streams_to_close.push_back(it->second.stream_id);
2265 for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2266 streams_to_close.begin();
2267 to_close_it != streams_to_close.end(); ++to_close_it) {
2268 ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2269 if (active_it == active_streams_.end())
2272 LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2273 // CloseActiveStreamIterator() will remove the stream from
2274 // |unclaimed_pushed_streams_|.
2275 ResetStreamIterator(
2276 active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2279 next_unclaimed_push_stream_sweep_time_ = time_func_() +
2280 base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2283 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2285 const SpdyHeaderBlock& headers) {
2288 base::Time response_time = base::Time::Now();
2289 base::TimeTicks recv_first_byte_time = time_func_();
2291 if (net_log().IsLogging()) {
2293 NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2294 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2295 &headers, fin, stream_id));
2298 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2299 if (it == active_streams_.end()) {
2300 // NOTE: it may just be that the stream was cancelled.
2304 SpdyStream* stream = it->second.stream;
2305 CHECK_EQ(stream->stream_id(), stream_id);
2307 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2308 last_compressed_frame_len_ = 0;
2310 if (GetProtocolVersion() >= SPDY4) {
2311 const std::string& error =
2312 "SPDY4 wasn't expecting SYN_REPLY.";
2313 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2314 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2317 if (!it->second.waiting_for_syn_reply) {
2318 const std::string& error =
2319 "Received duplicate SYN_REPLY for stream.";
2320 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2321 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2324 it->second.waiting_for_syn_reply = false;
2326 ignore_result(OnInitialResponseHeadersReceived(
2327 headers, response_time, recv_first_byte_time, stream));
2330 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2332 SpdyPriority priority,
2334 const SpdyHeaderBlock& headers) {
2337 if (net_log().IsLogging()) {
2339 NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2340 base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2341 &headers, fin, stream_id));
2344 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2345 if (it == active_streams_.end()) {
2346 // NOTE: it may just be that the stream was cancelled.
2347 LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2351 SpdyStream* stream = it->second.stream;
2352 CHECK_EQ(stream->stream_id(), stream_id);
2354 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2355 last_compressed_frame_len_ = 0;
2357 base::Time response_time = base::Time::Now();
2358 base::TimeTicks recv_first_byte_time = time_func_();
2360 if (it->second.waiting_for_syn_reply) {
2361 if (GetProtocolVersion() < SPDY4) {
2362 const std::string& error =
2363 "Was expecting SYN_REPLY, not HEADERS.";
2364 stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2365 ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2369 it->second.waiting_for_syn_reply = false;
2370 ignore_result(OnInitialResponseHeadersReceived(
2371 headers, response_time, recv_first_byte_time, stream));
2372 } else if (it->second.stream->IsReservedRemote()) {
2373 ignore_result(OnInitialResponseHeadersReceived(
2374 headers, response_time, recv_first_byte_time, stream));
2376 int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2378 DCHECK_NE(rv, ERR_IO_PENDING);
2379 DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2384 bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) {
2385 // Validate stream id.
2386 // Was the frame sent on a stream id that has not been used in this session?
2387 if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_)
2390 if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_)
2396 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2397 SpdyRstStreamStatus status) {
2400 std::string description;
2402 NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2403 base::Bind(&NetLogSpdyRstCallback,
2404 stream_id, status, &description));
2406 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2407 if (it == active_streams_.end()) {
2408 // NOTE: it may just be that the stream was cancelled.
2409 LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2413 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2416 it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2417 } else if (status == RST_STREAM_REFUSED_STREAM) {
2418 CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2420 RecordProtocolErrorHistogram(
2421 PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2422 it->second.stream->LogStreamError(
2423 ERR_SPDY_PROTOCOL_ERROR,
2424 base::StringPrintf("SPDY stream closed with status: %d", status));
2425 // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2426 // For now, it doesn't matter much - it is a protocol error.
2427 CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2431 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2432 SpdyGoAwayStatus status) {
2435 // TODO(jgraettinger): UMA histogram on |status|.
2437 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2438 base::Bind(&NetLogSpdyGoAwayCallback,
2439 last_accepted_stream_id,
2440 active_streams_.size(),
2441 unclaimed_pushed_streams_.size(),
2444 StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2445 // This is to handle the case when we already don't have any active
2446 // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2447 // active streams and so the last one being closed will finish the
2448 // going away process (see DeleteStream()).
2449 MaybeFinishGoingAway();
2452 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2456 NetLog::TYPE_SPDY_SESSION_PING,
2457 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2459 // Send response to a PING from server.
2460 if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2461 (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2462 WritePingFrame(unique_id, true);
2467 if (pings_in_flight_ < 0) {
2468 RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2469 DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2470 pings_in_flight_ = 0;
2474 if (pings_in_flight_ > 0)
2477 // We will record RTT in histogram when there are no more client sent
2478 // pings_in_flight_.
2479 RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2482 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2483 uint32 delta_window_size) {
2486 DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2488 NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2489 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2490 stream_id, delta_window_size));
2492 if (stream_id == kSessionFlowControlStreamId) {
2493 // WINDOW_UPDATE for the session.
2494 if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2495 LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2496 << "session flow control is not turned on";
2497 // TODO(akalin): Record an error and close the session.
2501 if (delta_window_size < 1u) {
2502 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2504 ERR_SPDY_PROTOCOL_ERROR,
2505 "Received WINDOW_UPDATE with an invalid delta_window_size " +
2506 base::UintToString(delta_window_size));
2510 IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2512 // WINDOW_UPDATE for a stream.
2513 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2514 // TODO(akalin): Record an error and close the session.
2515 LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2516 << " when flow control is not turned on";
2520 ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2522 if (it == active_streams_.end()) {
2523 // NOTE: it may just be that the stream was cancelled.
2524 LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2528 SpdyStream* stream = it->second.stream;
2529 CHECK_EQ(stream->stream_id(), stream_id);
2531 if (delta_window_size < 1u) {
2532 ResetStreamIterator(it,
2533 RST_STREAM_FLOW_CONTROL_ERROR,
2535 "Received WINDOW_UPDATE with an invalid "
2536 "delta_window_size %ud", delta_window_size));
2540 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2541 it->second.stream->IncreaseSendWindowSize(
2542 static_cast<int32>(delta_window_size));
2546 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2547 SpdyStreamId associated_stream_id,
2548 SpdyPriority priority,
2549 const SpdyHeaderBlock& headers) {
2550 // Server-initiated streams should have even sequence numbers.
2551 if ((stream_id & 0x1) != 0) {
2552 LOG(WARNING) << "Received invalid push stream id " << stream_id;
2553 if (GetProtocolVersion() > SPDY2)
2554 CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id.");
2558 if (GetProtocolVersion() > SPDY2) {
2559 if (stream_id <= last_accepted_push_stream_id_) {
2560 LOG(WARNING) << "Received push stream id lesser or equal to the last "
2561 << "accepted before " << stream_id;
2562 CloseSessionOnError(
2563 ERR_SPDY_PROTOCOL_ERROR,
2564 "New push stream id must be greater than the last accepted.");
2569 if (IsStreamActive(stream_id)) {
2570 // For SPDY3 and higher we should not get here, we'll start going away
2571 // earlier on |last_seen_push_stream_id_| check.
2572 CHECK_GT(SPDY3, GetProtocolVersion());
2573 LOG(WARNING) << "Received push for active stream " << stream_id;
2577 last_accepted_push_stream_id_ = stream_id;
2579 RequestPriority request_priority =
2580 ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2582 if (availability_state_ == STATE_GOING_AWAY) {
2583 // TODO(akalin): This behavior isn't in the SPDY spec, although it
2584 // probably should be.
2585 EnqueueResetStreamFrame(stream_id,
2587 RST_STREAM_REFUSED_STREAM,
2588 "push stream request received when going away");
2592 if (associated_stream_id == 0) {
2593 // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2594 // session going away. We should never get here.
2595 CHECK_GT(SPDY4, GetProtocolVersion());
2596 std::string description = base::StringPrintf(
2597 "Received invalid associated stream id %d for pushed stream %d",
2598 associated_stream_id,
2600 EnqueueResetStreamFrame(
2601 stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2605 streams_pushed_count_++;
2607 // TODO(mbelshe): DCHECK that this is a GET method?
2609 // Verify that the response had a URL for us.
2610 GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2611 if (!gurl.is_valid()) {
2612 EnqueueResetStreamFrame(stream_id,
2614 RST_STREAM_PROTOCOL_ERROR,
2615 "Pushed stream url was invalid: " + gurl.spec());
2619 // Verify we have a valid stream association.
2620 ActiveStreamMap::iterator associated_it =
2621 active_streams_.find(associated_stream_id);
2622 if (associated_it == active_streams_.end()) {
2623 EnqueueResetStreamFrame(
2626 RST_STREAM_INVALID_STREAM,
2627 base::StringPrintf("Received push for inactive associated stream %d",
2628 associated_stream_id));
2632 // Check that the pushed stream advertises the same origin as its associated
2633 // stream. Bypass this check if and only if this session is with a SPDY proxy
2634 // that is trusted explicitly via the --trusted-spdy-proxy switch.
2635 if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2636 // Disallow pushing of HTTPS content.
2637 if (gurl.SchemeIs("https")) {
2638 EnqueueResetStreamFrame(
2641 RST_STREAM_REFUSED_STREAM,
2642 base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2643 associated_stream_id));
2646 GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2647 if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2648 EnqueueResetStreamFrame(
2651 RST_STREAM_REFUSED_STREAM,
2652 base::StringPrintf("Rejected Cross Origin Push Stream %d",
2653 associated_stream_id));
2658 // There should not be an existing pushed stream with the same path.
2659 PushedStreamMap::iterator pushed_it =
2660 unclaimed_pushed_streams_.lower_bound(gurl);
2661 if (pushed_it != unclaimed_pushed_streams_.end() &&
2662 pushed_it->first == gurl) {
2663 EnqueueResetStreamFrame(
2666 RST_STREAM_PROTOCOL_ERROR,
2667 "Received duplicate pushed stream with url: " + gurl.spec());
2671 scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2675 stream_initial_send_window_size_,
2676 stream_initial_recv_window_size_,
2678 stream->set_stream_id(stream_id);
2680 // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2681 if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2682 associated_it->second.stream->IncrementRawReceivedBytes(
2683 last_compressed_frame_len_);
2685 stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2688 last_compressed_frame_len_ = 0;
2690 DeleteExpiredPushedStreams();
2691 PushedStreamMap::iterator inserted_pushed_it =
2692 unclaimed_pushed_streams_.insert(
2694 std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2695 DCHECK(inserted_pushed_it != pushed_it);
2697 InsertActivatedStream(stream.Pass());
2699 ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2700 if (active_it == active_streams_.end()) {
2705 active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2706 DCHECK(active_it->second.stream->IsReservedRemote());
2707 num_pushed_streams_++;
2711 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2712 SpdyStreamId promised_stream_id,
2713 const SpdyHeaderBlock& headers) {
2716 if (net_log_.IsLogging()) {
2717 net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2718 base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2721 promised_stream_id));
2724 // Any priority will do.
2725 // TODO(baranovich): pass parent stream id priority?
2726 if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2729 base::StatsCounter push_requests("spdy.pushed_streams");
2730 push_requests.Increment();
2733 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2734 uint32 delta_window_size) {
2735 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2736 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2737 CHECK(it != active_streams_.end());
2738 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2739 SendWindowUpdateFrame(
2740 stream_id, delta_window_size, it->second.stream->priority());
2743 void SpdySession::SendInitialData() {
2744 DCHECK(enable_sending_initial_data_);
2746 if (send_connection_header_prefix_) {
2747 DCHECK_EQ(protocol_, kProtoSPDY4);
2748 scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2749 new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2750 kHttp2ConnectionHeaderPrefixSize,
2751 false /* take_ownership */));
2752 // Count the prefix as part of the subsequent SETTINGS frame.
2753 EnqueueSessionWrite(HIGHEST, SETTINGS,
2754 connection_header_prefix_frame.Pass());
2757 // First, notify the server about the settings they should use when
2758 // communicating with us.
2759 SettingsMap settings_map;
2760 // Create a new settings frame notifying the server of our
2761 // max concurrent streams and initial window size.
2762 settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2763 SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2764 if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2765 stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2766 settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2767 SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2768 stream_initial_recv_window_size_);
2770 SendSettings(settings_map);
2772 // Next, notify the server about our initial recv window size.
2773 if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2774 // Bump up the receive window size to the real initial value. This
2775 // has to go here since the WINDOW_UPDATE frame sent by
2776 // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2777 DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2778 // This condition implies that |kDefaultInitialRecvWindowSize| -
2779 // |session_recv_window_size_| doesn't overflow.
2780 DCHECK_GT(session_recv_window_size_, 0);
2781 IncreaseRecvWindowSize(
2782 kDefaultInitialRecvWindowSize - session_recv_window_size_);
2785 if (protocol_ <= kProtoSPDY31) {
2786 // Finally, notify the server about the settings they have
2787 // previously told us to use when communicating with them (after
2789 const SettingsMap& server_settings_map =
2790 http_server_properties_->GetSpdySettings(host_port_pair());
2791 if (server_settings_map.empty())
2794 SettingsMap::const_iterator it =
2795 server_settings_map.find(SETTINGS_CURRENT_CWND);
2796 uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2797 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2799 for (SettingsMap::const_iterator it = server_settings_map.begin();
2800 it != server_settings_map.end(); ++it) {
2801 const SpdySettingsIds new_id = it->first;
2802 const uint32 new_val = it->second.second;
2803 HandleSetting(new_id, new_val);
2806 SendSettings(server_settings_map);
2811 void SpdySession::SendSettings(const SettingsMap& settings) {
2812 const SpdyMajorVersion protocol_version = GetProtocolVersion();
2814 NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2815 base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version));
2816 // Create the SETTINGS frame and send it.
2817 DCHECK(buffered_spdy_framer_.get());
2818 scoped_ptr<SpdyFrame> settings_frame(
2819 buffered_spdy_framer_->CreateSettings(settings));
2820 sent_settings_ = true;
2821 EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2824 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2826 case SETTINGS_MAX_CONCURRENT_STREAMS:
2827 max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2828 kMaxConcurrentStreamLimit);
2829 ProcessPendingStreamRequests();
2831 case SETTINGS_INITIAL_WINDOW_SIZE: {
2832 if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2834 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2838 if (value > static_cast<uint32>(kint32max)) {
2840 NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2841 NetLog::IntegerCallback("initial_window_size", value));
2845 // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2846 int32 delta_window_size =
2847 static_cast<int32>(value) - stream_initial_send_window_size_;
2848 stream_initial_send_window_size_ = static_cast<int32>(value);
2849 UpdateStreamsSendWindowSize(delta_window_size);
2851 NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2852 NetLog::IntegerCallback("delta_window_size", delta_window_size));
2858 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2859 DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2860 for (ActiveStreamMap::iterator it = active_streams_.begin();
2861 it != active_streams_.end(); ++it) {
2862 it->second.stream->AdjustSendWindowSize(delta_window_size);
2865 for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2866 it != created_streams_.end(); it++) {
2867 (*it)->AdjustSendWindowSize(delta_window_size);
2871 void SpdySession::SendPrefacePingIfNoneInFlight() {
2872 if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2875 base::TimeTicks now = time_func_();
2876 // If there is no activity in the session, then send a preface-PING.
2877 if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2881 void SpdySession::SendPrefacePing() {
2882 WritePingFrame(next_ping_id_, false);
2885 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2886 uint32 delta_window_size,
2887 RequestPriority priority) {
2888 CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2889 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2890 if (it != active_streams_.end()) {
2891 CHECK_EQ(it->second.stream->stream_id(), stream_id);
2893 CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2894 CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2898 NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2899 base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2900 stream_id, delta_window_size));
2902 DCHECK(buffered_spdy_framer_.get());
2903 scoped_ptr<SpdyFrame> window_update_frame(
2904 buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2905 EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2908 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2909 DCHECK(buffered_spdy_framer_.get());
2910 scoped_ptr<SpdyFrame> ping_frame(
2911 buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2912 EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2914 if (net_log().IsLogging()) {
2916 NetLog::TYPE_SPDY_SESSION_PING,
2917 base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2922 PlanToCheckPingStatus();
2923 last_ping_sent_time_ = time_func_();
2927 void SpdySession::PlanToCheckPingStatus() {
2928 if (check_ping_status_pending_)
2931 check_ping_status_pending_ = true;
2932 base::MessageLoop::current()->PostDelayedTask(
2934 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2935 time_func_()), hung_interval_);
2938 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2939 CHECK(!in_io_loop_);
2941 // Check if we got a response back for all PINGs we had sent.
2942 if (pings_in_flight_ == 0) {
2943 check_ping_status_pending_ = false;
2947 DCHECK(check_ping_status_pending_);
2949 base::TimeTicks now = time_func_();
2950 base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2952 if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2953 // Track all failed PING messages in a separate bucket.
2954 RecordPingRTTHistogram(base::TimeDelta::Max());
2955 DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2959 // Check the status of connection after a delay.
2960 base::MessageLoop::current()->PostDelayedTask(
2962 base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2967 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2968 UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2971 void SpdySession::RecordProtocolErrorHistogram(
2972 SpdyProtocolErrorDetails details) {
2973 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2974 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2975 if (EndsWith(host_port_pair().host(), "google.com", false)) {
2976 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2977 NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2981 void SpdySession::RecordHistograms() {
2982 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2983 streams_initiated_count_,
2985 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2986 streams_pushed_count_,
2988 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2989 streams_pushed_and_claimed_count_,
2991 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2992 streams_abandoned_count_,
2994 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2995 sent_settings_ ? 1 : 0, 2);
2996 UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2997 received_settings_ ? 1 : 0, 2);
2998 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
3001 UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
3002 stalled_streams_ > 0 ? 1 : 0, 2);
3004 if (received_settings_) {
3005 // Enumerate the saved settings, and set histograms for it.
3006 const SettingsMap& settings_map =
3007 http_server_properties_->GetSpdySettings(host_port_pair());
3009 SettingsMap::const_iterator it;
3010 for (it = settings_map.begin(); it != settings_map.end(); ++it) {
3011 const SpdySettingsIds id = it->first;
3012 const uint32 val = it->second.second;
3014 case SETTINGS_CURRENT_CWND:
3015 // Record several different histograms to see if cwnd converges
3016 // for larger volumes of data being sent.
3017 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
3019 if (total_bytes_received_ > 10 * 1024) {
3020 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
3022 if (total_bytes_received_ > 25 * 1024) {
3023 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
3025 if (total_bytes_received_ > 50 * 1024) {
3026 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
3028 if (total_bytes_received_ > 100 * 1024) {
3029 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
3036 case SETTINGS_ROUND_TRIP_TIME:
3037 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
3040 case SETTINGS_DOWNLOAD_RETRANS_RATE:
3041 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
3051 void SpdySession::CompleteStreamRequest(
3052 const base::WeakPtr<SpdyStreamRequest>& pending_request) {
3053 // Abort if the request has already been cancelled.
3054 if (!pending_request)
3057 base::WeakPtr<SpdyStream> stream;
3058 int rv = TryCreateStream(pending_request, &stream);
3062 pending_request->OnRequestCompleteSuccess(stream);
3067 if (rv != ERR_IO_PENDING) {
3068 pending_request->OnRequestCompleteFailure(rv);
3072 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3075 SSLClientSocket* ssl_socket =
3076 reinterpret_cast<SSLClientSocket*>(connection_->socket());
3081 void SpdySession::OnWriteBufferConsumed(
3082 size_t frame_payload_size,
3083 size_t consume_size,
3084 SpdyBuffer::ConsumeSource consume_source) {
3085 // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3086 // deleted (e.g., a stream is closed due to incoming data).
3088 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3090 if (consume_source == SpdyBuffer::DISCARD) {
3091 // If we're discarding a frame or part of it, increase the send
3092 // window by the number of discarded bytes. (Although if we're
3093 // discarding part of a frame, it's probably because of a write
3094 // error and we'll be tearing down the session soon.)
3095 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3096 DCHECK_GT(remaining_payload_bytes, 0u);
3097 IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3099 // For consumed bytes, the send window is increased when we receive
3100 // a WINDOW_UPDATE frame.
3103 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3104 // We can be called with |in_io_loop_| set if a SpdyBuffer is
3105 // deleted (e.g., a stream is closed due to incoming data).
3107 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3108 DCHECK_GE(delta_window_size, 1);
3110 // Check for overflow.
3111 int32 max_delta_window_size = kint32max - session_send_window_size_;
3112 if (delta_window_size > max_delta_window_size) {
3113 RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3115 ERR_SPDY_PROTOCOL_ERROR,
3116 "Received WINDOW_UPDATE [delta: " +
3117 base::IntToString(delta_window_size) +
3118 "] for session overflows session_send_window_size_ [current: " +
3119 base::IntToString(session_send_window_size_) + "]");
3123 session_send_window_size_ += delta_window_size;
3126 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3127 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3128 delta_window_size, session_send_window_size_));
3130 DCHECK(!IsSendStalled());
3131 ResumeSendStalledStreams();
3134 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3135 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3137 // We only call this method when sending a frame. Therefore,
3138 // |delta_window_size| should be within the valid frame size range.
3139 DCHECK_GE(delta_window_size, 1);
3140 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3142 // |send_window_size_| should have been at least |delta_window_size| for
3143 // this call to happen.
3144 DCHECK_GE(session_send_window_size_, delta_window_size);
3146 session_send_window_size_ -= delta_window_size;
3149 NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3150 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3151 -delta_window_size, session_send_window_size_));
3154 void SpdySession::OnReadBufferConsumed(
3155 size_t consume_size,
3156 SpdyBuffer::ConsumeSource consume_source) {
3157 // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3158 // deleted (e.g., discarded by a SpdyReadQueue).
3160 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3161 DCHECK_GE(consume_size, 1u);
3162 DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3164 IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3167 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3168 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3169 DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3170 DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3171 DCHECK_GE(delta_window_size, 1);
3172 // Check for overflow.
3173 DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3175 session_recv_window_size_ += delta_window_size;
3177 NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3178 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3179 delta_window_size, session_recv_window_size_));
3181 session_unacked_recv_window_bytes_ += delta_window_size;
3182 if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3183 SendWindowUpdateFrame(kSessionFlowControlStreamId,
3184 session_unacked_recv_window_bytes_,
3186 session_unacked_recv_window_bytes_ = 0;
3190 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3192 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3193 DCHECK_GE(delta_window_size, 1);
3195 // Since we never decrease the initial receive window size,
3196 // |delta_window_size| should never cause |recv_window_size_| to go
3197 // negative. If we do, the receive window isn't being respected.
3198 if (delta_window_size > session_recv_window_size_) {
3199 RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3201 ERR_SPDY_FLOW_CONTROL_ERROR,
3202 "delta_window_size is " + base::IntToString(delta_window_size) +
3203 " in DecreaseRecvWindowSize, which is larger than the receive " +
3204 "window size of " + base::IntToString(session_recv_window_size_));
3208 session_recv_window_size_ -= delta_window_size;
3210 NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3211 base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3212 -delta_window_size, session_recv_window_size_));
3215 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3216 DCHECK(stream.send_stalled_by_flow_control());
3217 RequestPriority priority = stream.priority();
3218 CHECK_GE(priority, MINIMUM_PRIORITY);
3219 CHECK_LE(priority, MAXIMUM_PRIORITY);
3220 stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3223 void SpdySession::ResumeSendStalledStreams() {
3224 DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3226 // We don't have to worry about new streams being queued, since
3227 // doing so would cause IsSendStalled() to return true. But we do
3228 // have to worry about streams being closed, as well as ourselves
3231 while (!IsSendStalled()) {
3232 size_t old_size = 0;
3234 old_size = GetTotalSize(stream_send_unstall_queue_);
3237 SpdyStreamId stream_id = PopStreamToPossiblyResume();
3240 ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3241 // The stream may actually still be send-stalled after this (due
3242 // to its own send window) but that's okay -- it'll then be
3243 // resumed once its send window increases.
3244 if (it != active_streams_.end())
3245 it->second.stream->PossiblyResumeIfSendStalled();
3247 // The size should decrease unless we got send-stalled again.
3248 if (!IsSendStalled())
3249 DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3253 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3254 for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3255 std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3256 if (!queue->empty()) {
3257 SpdyStreamId stream_id = queue->front();