51c6ee7919db120bb9c610c0028ff3daf12a1255
[platform/framework/web/crosswalk.git] / src / net / spdy / spdy_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/stl_util.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/strings/utf_string_conversions.h"
24 #include "base/time/time.h"
25 #include "base/values.h"
26 #include "crypto/ec_private_key.h"
27 #include "crypto/ec_signature_creator.h"
28 #include "net/base/connection_type_histograms.h"
29 #include "net/base/net_log.h"
30 #include "net/base/net_util.h"
31 #include "net/cert/asn1_util.h"
32 #include "net/cert/cert_verify_result.h"
33 #include "net/http/http_log_util.h"
34 #include "net/http/http_network_session.h"
35 #include "net/http/http_server_properties.h"
36 #include "net/http/http_util.h"
37 #include "net/http/transport_security_state.h"
38 #include "net/spdy/spdy_buffer_producer.h"
39 #include "net/spdy/spdy_frame_builder.h"
40 #include "net/spdy/spdy_http_utils.h"
41 #include "net/spdy/spdy_protocol.h"
42 #include "net/spdy/spdy_session_pool.h"
43 #include "net/spdy/spdy_stream.h"
44 #include "net/ssl/channel_id_service.h"
45 #include "net/ssl/ssl_cipher_suite_names.h"
46 #include "net/ssl/ssl_connection_status_flags.h"
47
48 namespace net {
49
50 namespace {
51
52 const int kReadBufferSize = 8 * 1024;
53 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
54 const int kHungIntervalSeconds = 10;
55
56 // Minimum seconds that unclaimed pushed streams will be kept in memory.
57 const int kMinPushedStreamLifetimeSeconds = 300;
58
59 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
60     const SpdyHeaderBlock& headers,
61     net::NetLog::LogLevel log_level) {
62   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
63   for (SpdyHeaderBlock::const_iterator it = headers.begin();
64        it != headers.end(); ++it) {
65     headers_list->AppendString(
66         it->first + ": " +
67         ElideHeaderValueForNetLog(log_level, it->first, it->second));
68   }
69   return headers_list.Pass();
70 }
71
72 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
73                                              bool fin,
74                                              bool unidirectional,
75                                              SpdyPriority spdy_priority,
76                                              SpdyStreamId stream_id,
77                                              NetLog::LogLevel log_level) {
78   base::DictionaryValue* dict = new base::DictionaryValue();
79   dict->Set("headers",
80             SpdyHeaderBlockToListValue(*headers, log_level).release());
81   dict->SetBoolean("fin", fin);
82   dict->SetBoolean("unidirectional", unidirectional);
83   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
84   dict->SetInteger("stream_id", stream_id);
85   return dict;
86 }
87
88 base::Value* NetLogSpdySynStreamReceivedCallback(
89     const SpdyHeaderBlock* headers,
90     bool fin,
91     bool unidirectional,
92     SpdyPriority spdy_priority,
93     SpdyStreamId stream_id,
94     SpdyStreamId associated_stream,
95     NetLog::LogLevel log_level) {
96   base::DictionaryValue* dict = new base::DictionaryValue();
97   dict->Set("headers",
98             SpdyHeaderBlockToListValue(*headers, log_level).release());
99   dict->SetBoolean("fin", fin);
100   dict->SetBoolean("unidirectional", unidirectional);
101   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
102   dict->SetInteger("stream_id", stream_id);
103   dict->SetInteger("associated_stream", associated_stream);
104   return dict;
105 }
106
107 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
108     const SpdyHeaderBlock* headers,
109     bool fin,
110     SpdyStreamId stream_id,
111     NetLog::LogLevel log_level) {
112   base::DictionaryValue* dict = new base::DictionaryValue();
113   dict->Set("headers",
114             SpdyHeaderBlockToListValue(*headers, log_level).release());
115   dict->SetBoolean("fin", fin);
116   dict->SetInteger("stream_id", stream_id);
117   return dict;
118 }
119
120 base::Value* NetLogSpdySessionCloseCallback(int net_error,
121                                             const std::string* description,
122                                             NetLog::LogLevel /* log_level */) {
123   base::DictionaryValue* dict = new base::DictionaryValue();
124   dict->SetInteger("net_error", net_error);
125   dict->SetString("description", *description);
126   return dict;
127 }
128
129 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
130                                        NetLog::LogLevel /* log_level */) {
131   base::DictionaryValue* dict = new base::DictionaryValue();
132   dict->SetString("host", host_pair->first.ToString());
133   dict->SetString("proxy", host_pair->second.ToPacString());
134   return dict;
135 }
136
137 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
138                                         bool clear_persisted,
139                                         NetLog::LogLevel /* log_level */) {
140   base::DictionaryValue* dict = new base::DictionaryValue();
141   dict->SetString("host", host_port_pair.ToString());
142   dict->SetBoolean("clear_persisted", clear_persisted);
143   return dict;
144 }
145
146 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
147                                        SpdySettingsFlags flags,
148                                        uint32 value,
149                                        NetLog::LogLevel /* log_level */) {
150   base::DictionaryValue* dict = new base::DictionaryValue();
151   dict->SetInteger("id", id);
152   dict->SetInteger("flags", flags);
153   dict->SetInteger("value", value);
154   return dict;
155 }
156
157 base::Value* NetLogSpdySendSettingsCallback(const SettingsMap* settings,
158                                             NetLog::LogLevel /* log_level */) {
159   base::DictionaryValue* dict = new base::DictionaryValue();
160   base::ListValue* settings_list = new base::ListValue();
161   for (SettingsMap::const_iterator it = settings->begin();
162        it != settings->end(); ++it) {
163     const SpdySettingsIds id = it->first;
164     const SpdySettingsFlags flags = it->second.first;
165     const uint32 value = it->second.second;
166     settings_list->Append(new base::StringValue(
167         base::StringPrintf("[id:%u flags:%u value:%u]", id, flags, value)));
168   }
169   dict->Set("settings", settings_list);
170   return dict;
171 }
172
173 base::Value* NetLogSpdyWindowUpdateFrameCallback(
174     SpdyStreamId stream_id,
175     uint32 delta,
176     NetLog::LogLevel /* log_level */) {
177   base::DictionaryValue* dict = new base::DictionaryValue();
178   dict->SetInteger("stream_id", static_cast<int>(stream_id));
179   dict->SetInteger("delta", delta);
180   return dict;
181 }
182
183 base::Value* NetLogSpdySessionWindowUpdateCallback(
184     int32 delta,
185     int32 window_size,
186     NetLog::LogLevel /* log_level */) {
187   base::DictionaryValue* dict = new base::DictionaryValue();
188   dict->SetInteger("delta", delta);
189   dict->SetInteger("window_size", window_size);
190   return dict;
191 }
192
193 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
194                                     int size,
195                                     bool fin,
196                                     NetLog::LogLevel /* log_level */) {
197   base::DictionaryValue* dict = new base::DictionaryValue();
198   dict->SetInteger("stream_id", static_cast<int>(stream_id));
199   dict->SetInteger("size", size);
200   dict->SetBoolean("fin", fin);
201   return dict;
202 }
203
204 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
205                                    int status,
206                                    const std::string* description,
207                                    NetLog::LogLevel /* log_level */) {
208   base::DictionaryValue* dict = new base::DictionaryValue();
209   dict->SetInteger("stream_id", static_cast<int>(stream_id));
210   dict->SetInteger("status", status);
211   dict->SetString("description", *description);
212   return dict;
213 }
214
215 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
216                                     bool is_ack,
217                                     const char* type,
218                                     NetLog::LogLevel /* log_level */) {
219   base::DictionaryValue* dict = new base::DictionaryValue();
220   dict->SetInteger("unique_id", unique_id);
221   dict->SetString("type", type);
222   dict->SetBoolean("is_ack", is_ack);
223   return dict;
224 }
225
226 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
227                                       int active_streams,
228                                       int unclaimed_streams,
229                                       SpdyGoAwayStatus status,
230                                       NetLog::LogLevel /* log_level */) {
231   base::DictionaryValue* dict = new base::DictionaryValue();
232   dict->SetInteger("last_accepted_stream_id",
233                    static_cast<int>(last_stream_id));
234   dict->SetInteger("active_streams", active_streams);
235   dict->SetInteger("unclaimed_streams", unclaimed_streams);
236   dict->SetInteger("status", static_cast<int>(status));
237   return dict;
238 }
239
240 base::Value* NetLogSpdyPushPromiseReceivedCallback(
241     const SpdyHeaderBlock* headers,
242     SpdyStreamId stream_id,
243     SpdyStreamId promised_stream_id,
244     NetLog::LogLevel log_level) {
245   base::DictionaryValue* dict = new base::DictionaryValue();
246   dict->Set("headers",
247             SpdyHeaderBlockToListValue(*headers, log_level).release());
248   dict->SetInteger("id", stream_id);
249   dict->SetInteger("promised_stream_id", promised_stream_id);
250   return dict;
251 }
252
253 // Helper function to return the total size of an array of objects
254 // with .size() member functions.
255 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
256   size_t total_size = 0;
257   for (size_t i = 0; i < N; ++i) {
258     total_size += arr[i].size();
259   }
260   return total_size;
261 }
262
263 // Helper class for std:find_if on STL container containing
264 // SpdyStreamRequest weak pointers.
265 class RequestEquals {
266  public:
267   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
268       : request_(request) {}
269
270   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
271     return request_.get() == request.get();
272   }
273
274  private:
275   const base::WeakPtr<SpdyStreamRequest> request_;
276 };
277
278 // The maximum number of concurrent streams we will ever create.  Even if
279 // the server permits more, we will never exceed this limit.
280 const size_t kMaxConcurrentStreamLimit = 256;
281
282 }  // namespace
283
284 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
285     SpdyFramer::SpdyError err) {
286   switch(err) {
287     case SpdyFramer::SPDY_NO_ERROR:
288       return SPDY_ERROR_NO_ERROR;
289     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
290       return SPDY_ERROR_INVALID_CONTROL_FRAME;
291     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
292       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
293     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
294       return SPDY_ERROR_ZLIB_INIT_FAILURE;
295     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
296       return SPDY_ERROR_UNSUPPORTED_VERSION;
297     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
298       return SPDY_ERROR_DECOMPRESS_FAILURE;
299     case SpdyFramer::SPDY_COMPRESS_FAILURE:
300       return SPDY_ERROR_COMPRESS_FAILURE;
301     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
302       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
303     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
304       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
305     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
306       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
307     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
308       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
309     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
310       return SPDY_ERROR_UNEXPECTED_FRAME;
311     default:
312       NOTREACHED();
313       return static_cast<SpdyProtocolErrorDetails>(-1);
314   }
315 }
316
317 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
318   switch (err) {
319     case SpdyFramer::SPDY_NO_ERROR:
320       return OK;
321     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
322       return ERR_SPDY_PROTOCOL_ERROR;
323     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
324       return ERR_SPDY_FRAME_SIZE_ERROR;
325     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
326       return ERR_SPDY_COMPRESSION_ERROR;
327     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
328       return ERR_SPDY_PROTOCOL_ERROR;
329     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
330       return ERR_SPDY_COMPRESSION_ERROR;
331     case SpdyFramer::SPDY_COMPRESS_FAILURE:
332       return ERR_SPDY_COMPRESSION_ERROR;
333     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
334       return ERR_SPDY_PROTOCOL_ERROR;
335     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
336       return ERR_SPDY_PROTOCOL_ERROR;
337     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
338       return ERR_SPDY_PROTOCOL_ERROR;
339     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
340       return ERR_SPDY_PROTOCOL_ERROR;
341     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
342       return ERR_SPDY_PROTOCOL_ERROR;
343     default:
344       NOTREACHED();
345       return ERR_SPDY_PROTOCOL_ERROR;
346   }
347 }
348
349 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
350     SpdyRstStreamStatus status) {
351   switch(status) {
352     case RST_STREAM_PROTOCOL_ERROR:
353       return STATUS_CODE_PROTOCOL_ERROR;
354     case RST_STREAM_INVALID_STREAM:
355       return STATUS_CODE_INVALID_STREAM;
356     case RST_STREAM_REFUSED_STREAM:
357       return STATUS_CODE_REFUSED_STREAM;
358     case RST_STREAM_UNSUPPORTED_VERSION:
359       return STATUS_CODE_UNSUPPORTED_VERSION;
360     case RST_STREAM_CANCEL:
361       return STATUS_CODE_CANCEL;
362     case RST_STREAM_INTERNAL_ERROR:
363       return STATUS_CODE_INTERNAL_ERROR;
364     case RST_STREAM_FLOW_CONTROL_ERROR:
365       return STATUS_CODE_FLOW_CONTROL_ERROR;
366     case RST_STREAM_STREAM_IN_USE:
367       return STATUS_CODE_STREAM_IN_USE;
368     case RST_STREAM_STREAM_ALREADY_CLOSED:
369       return STATUS_CODE_STREAM_ALREADY_CLOSED;
370     case RST_STREAM_INVALID_CREDENTIALS:
371       return STATUS_CODE_INVALID_CREDENTIALS;
372     case RST_STREAM_FRAME_SIZE_ERROR:
373       return STATUS_CODE_FRAME_SIZE_ERROR;
374     case RST_STREAM_SETTINGS_TIMEOUT:
375       return STATUS_CODE_SETTINGS_TIMEOUT;
376     case RST_STREAM_CONNECT_ERROR:
377       return STATUS_CODE_CONNECT_ERROR;
378     case RST_STREAM_ENHANCE_YOUR_CALM:
379       return STATUS_CODE_ENHANCE_YOUR_CALM;
380     default:
381       NOTREACHED();
382       return static_cast<SpdyProtocolErrorDetails>(-1);
383   }
384 }
385
386 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
387   switch (err) {
388     case OK:
389       return GOAWAY_NO_ERROR;
390     case ERR_SPDY_PROTOCOL_ERROR:
391       return GOAWAY_PROTOCOL_ERROR;
392     case ERR_SPDY_FLOW_CONTROL_ERROR:
393       return GOAWAY_FLOW_CONTROL_ERROR;
394     case ERR_SPDY_FRAME_SIZE_ERROR:
395       return GOAWAY_FRAME_SIZE_ERROR;
396     case ERR_SPDY_COMPRESSION_ERROR:
397       return GOAWAY_COMPRESSION_ERROR;
398     case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
399       return GOAWAY_INADEQUATE_SECURITY;
400     default:
401       return GOAWAY_PROTOCOL_ERROR;
402   }
403 }
404
405 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
406                                             SpdyMajorVersion protocol_version,
407                                             SpdyHeaderBlock* request_headers,
408                                             SpdyHeaderBlock* response_headers) {
409   DCHECK(response_headers);
410   DCHECK(request_headers);
411   for (SpdyHeaderBlock::const_iterator it = headers.begin();
412        it != headers.end();
413        ++it) {
414     SpdyHeaderBlock* to_insert = response_headers;
415     if (protocol_version == SPDY2) {
416       if (it->first == "url")
417         to_insert = request_headers;
418     } else {
419       const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
420       static const char* scheme = ":scheme";
421       static const char* path = ":path";
422       if (it->first == host || it->first == scheme || it->first == path)
423         to_insert = request_headers;
424     }
425     to_insert->insert(*it);
426   }
427 }
428
429 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
430   Reset();
431 }
432
433 SpdyStreamRequest::~SpdyStreamRequest() {
434   CancelRequest();
435 }
436
437 int SpdyStreamRequest::StartRequest(
438     SpdyStreamType type,
439     const base::WeakPtr<SpdySession>& session,
440     const GURL& url,
441     RequestPriority priority,
442     const BoundNetLog& net_log,
443     const CompletionCallback& callback) {
444   DCHECK(session);
445   DCHECK(!session_);
446   DCHECK(!stream_);
447   DCHECK(callback_.is_null());
448
449   type_ = type;
450   session_ = session;
451   url_ = url;
452   priority_ = priority;
453   net_log_ = net_log;
454   callback_ = callback;
455
456   base::WeakPtr<SpdyStream> stream;
457   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
458   if (rv == OK) {
459     Reset();
460     stream_ = stream;
461   }
462   return rv;
463 }
464
465 void SpdyStreamRequest::CancelRequest() {
466   if (session_)
467     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
468   Reset();
469   // Do this to cancel any pending CompleteStreamRequest() tasks.
470   weak_ptr_factory_.InvalidateWeakPtrs();
471 }
472
473 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
474   DCHECK(!session_);
475   base::WeakPtr<SpdyStream> stream = stream_;
476   DCHECK(stream);
477   Reset();
478   return stream;
479 }
480
481 void SpdyStreamRequest::OnRequestCompleteSuccess(
482     const base::WeakPtr<SpdyStream>& stream) {
483   DCHECK(session_);
484   DCHECK(!stream_);
485   DCHECK(!callback_.is_null());
486   CompletionCallback callback = callback_;
487   Reset();
488   DCHECK(stream);
489   stream_ = stream;
490   callback.Run(OK);
491 }
492
493 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
494   DCHECK(session_);
495   DCHECK(!stream_);
496   DCHECK(!callback_.is_null());
497   CompletionCallback callback = callback_;
498   Reset();
499   DCHECK_NE(rv, OK);
500   callback.Run(rv);
501 }
502
503 void SpdyStreamRequest::Reset() {
504   type_ = SPDY_BIDIRECTIONAL_STREAM;
505   session_.reset();
506   stream_.reset();
507   url_ = GURL();
508   priority_ = MINIMUM_PRIORITY;
509   net_log_ = BoundNetLog();
510   callback_.Reset();
511 }
512
513 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
514     : stream(NULL),
515       waiting_for_syn_reply(false) {}
516
517 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
518     : stream(stream),
519       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
520 }
521
522 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
523
524 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
525
526 SpdySession::PushedStreamInfo::PushedStreamInfo(
527     SpdyStreamId stream_id,
528     base::TimeTicks creation_time)
529     : stream_id(stream_id),
530       creation_time(creation_time) {}
531
532 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
533
534 // static
535 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
536                           const SSLInfo& ssl_info,
537                           const std::string& old_hostname,
538                           const std::string& new_hostname) {
539   // Pooling is prohibited if the server cert is not valid for the new domain,
540   // and for connections on which client certs were sent. It is also prohibited
541   // when channel ID was sent if the hosts are from different eTLDs+1.
542   if (IsCertStatusError(ssl_info.cert_status))
543     return false;
544
545   if (ssl_info.client_cert_sent)
546     return false;
547
548   if (ssl_info.channel_id_sent &&
549       ChannelIDService::GetDomainForHost(new_hostname) !=
550       ChannelIDService::GetDomainForHost(old_hostname)) {
551     return false;
552   }
553
554   bool unused = false;
555   if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
556     return false;
557
558   std::string pinning_failure_log;
559   if (!transport_security_state->CheckPublicKeyPins(
560           new_hostname,
561           true, /* sni_available */
562           ssl_info.is_issued_by_known_root,
563           ssl_info.public_key_hashes,
564           &pinning_failure_log)) {
565     return false;
566   }
567
568   return true;
569 }
570
571 SpdySession::SpdySession(
572     const SpdySessionKey& spdy_session_key,
573     const base::WeakPtr<HttpServerProperties>& http_server_properties,
574     TransportSecurityState* transport_security_state,
575     bool verify_domain_authentication,
576     bool enable_sending_initial_data,
577     bool enable_compression,
578     bool enable_ping_based_connection_checking,
579     NextProto default_protocol,
580     size_t stream_initial_recv_window_size,
581     size_t initial_max_concurrent_streams,
582     size_t max_concurrent_streams_limit,
583     TimeFunc time_func,
584     const HostPortPair& trusted_spdy_proxy,
585     NetLog* net_log)
586     : in_io_loop_(false),
587       spdy_session_key_(spdy_session_key),
588       pool_(NULL),
589       http_server_properties_(http_server_properties),
590       transport_security_state_(transport_security_state),
591       read_buffer_(new IOBuffer(kReadBufferSize)),
592       stream_hi_water_mark_(kFirstStreamId),
593       num_pushed_streams_(0u),
594       num_active_pushed_streams_(0u),
595       in_flight_write_frame_type_(DATA),
596       in_flight_write_frame_size_(0),
597       is_secure_(false),
598       certificate_error_code_(OK),
599       availability_state_(STATE_AVAILABLE),
600       read_state_(READ_STATE_DO_READ),
601       write_state_(WRITE_STATE_IDLE),
602       error_on_close_(OK),
603       max_concurrent_streams_(initial_max_concurrent_streams == 0
604                                   ? kInitialMaxConcurrentStreams
605                                   : initial_max_concurrent_streams),
606       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
607                                         ? kMaxConcurrentStreamLimit
608                                         : max_concurrent_streams_limit),
609       max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
610       streams_initiated_count_(0),
611       streams_pushed_count_(0),
612       streams_pushed_and_claimed_count_(0),
613       streams_abandoned_count_(0),
614       total_bytes_received_(0),
615       sent_settings_(false),
616       received_settings_(false),
617       stalled_streams_(0),
618       pings_in_flight_(0),
619       next_ping_id_(1),
620       last_activity_time_(time_func()),
621       last_compressed_frame_len_(0),
622       check_ping_status_pending_(false),
623       send_connection_header_prefix_(false),
624       flow_control_state_(FLOW_CONTROL_NONE),
625       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
626       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
627                                            ? kDefaultInitialRecvWindowSize
628                                            : stream_initial_recv_window_size),
629       session_send_window_size_(0),
630       session_recv_window_size_(0),
631       session_unacked_recv_window_bytes_(0),
632       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
633       verify_domain_authentication_(verify_domain_authentication),
634       enable_sending_initial_data_(enable_sending_initial_data),
635       enable_compression_(enable_compression),
636       enable_ping_based_connection_checking_(
637           enable_ping_based_connection_checking),
638       protocol_(default_protocol),
639       connection_at_risk_of_loss_time_(
640           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
641       hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
642       trusted_spdy_proxy_(trusted_spdy_proxy),
643       time_func_(time_func),
644       weak_factory_(this) {
645   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
646   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
647   DCHECK(HttpStreamFactory::spdy_enabled());
648   net_log_.BeginEvent(
649       NetLog::TYPE_SPDY_SESSION,
650       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
651   next_unclaimed_push_stream_sweep_time_ = time_func_() +
652       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
653   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
654 }
655
656 SpdySession::~SpdySession() {
657   CHECK(!in_io_loop_);
658   DcheckDraining();
659
660   // TODO(akalin): Check connection->is_initialized() instead. This
661   // requires re-working CreateFakeSpdySession(), though.
662   DCHECK(connection_->socket());
663   // With SPDY we can't recycle sockets.
664   connection_->socket()->Disconnect();
665
666   RecordHistograms();
667
668   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
669 }
670
671 void SpdySession::InitializeWithSocket(
672     scoped_ptr<ClientSocketHandle> connection,
673     SpdySessionPool* pool,
674     bool is_secure,
675     int certificate_error_code) {
676   CHECK(!in_io_loop_);
677   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
678   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
679   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
680   DCHECK(!connection_);
681
682   DCHECK(certificate_error_code == OK ||
683          certificate_error_code < ERR_IO_PENDING);
684   // TODO(akalin): Check connection->is_initialized() instead. This
685   // requires re-working CreateFakeSpdySession(), though.
686   DCHECK(connection->socket());
687
688   base::StatsCounter spdy_sessions("spdy.sessions");
689   spdy_sessions.Increment();
690
691   connection_ = connection.Pass();
692   is_secure_ = is_secure;
693   certificate_error_code_ = certificate_error_code;
694
695   NextProto protocol_negotiated =
696       connection_->socket()->GetNegotiatedProtocol();
697   if (protocol_negotiated != kProtoUnknown) {
698     protocol_ = protocol_negotiated;
699   }
700   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
701   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
702
703   if (protocol_ == kProtoSPDY4)
704     send_connection_header_prefix_ = true;
705
706   if (protocol_ >= kProtoSPDY31) {
707     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
708     session_send_window_size_ = kSpdySessionInitialWindowSize;
709     session_recv_window_size_ = kSpdySessionInitialWindowSize;
710   } else if (protocol_ >= kProtoSPDY3) {
711     flow_control_state_ = FLOW_CONTROL_STREAM;
712   } else {
713     flow_control_state_ = FLOW_CONTROL_NONE;
714   }
715
716   buffered_spdy_framer_.reset(
717       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
718                              enable_compression_));
719   buffered_spdy_framer_->set_visitor(this);
720   buffered_spdy_framer_->set_debug_visitor(this);
721   UMA_HISTOGRAM_ENUMERATION("Net.SpdyVersion", protocol_, kProtoMaximumVersion);
722 #if defined(SPDY_PROXY_AUTH_ORIGIN)
723   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessions_DataReductionProxy",
724                         host_port_pair().Equals(HostPortPair::FromURL(
725                             GURL(SPDY_PROXY_AUTH_ORIGIN))));
726 #endif
727
728   net_log_.AddEvent(
729       NetLog::TYPE_SPDY_SESSION_INITIALIZED,
730       connection_->socket()->NetLog().source().ToEventParametersCallback());
731
732   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
733   connection_->AddHigherLayeredPool(this);
734   if (enable_sending_initial_data_)
735     SendInitialData();
736   pool_ = pool;
737
738   // Bootstrap the read loop.
739   base::MessageLoop::current()->PostTask(
740       FROM_HERE,
741       base::Bind(&SpdySession::PumpReadLoop,
742                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
743 }
744
745 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
746   if (!verify_domain_authentication_)
747     return true;
748
749   if (availability_state_ == STATE_DRAINING)
750     return false;
751
752   SSLInfo ssl_info;
753   bool was_npn_negotiated;
754   NextProto protocol_negotiated = kProtoUnknown;
755   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
756     return true;   // This is not a secure session, so all domains are okay.
757
758   return CanPool(transport_security_state_, ssl_info,
759                  host_port_pair().host(), domain);
760 }
761
762 int SpdySession::GetPushStream(
763     const GURL& url,
764     base::WeakPtr<SpdyStream>* stream,
765     const BoundNetLog& stream_net_log) {
766   CHECK(!in_io_loop_);
767
768   stream->reset();
769
770   if (availability_state_ == STATE_DRAINING)
771     return ERR_CONNECTION_CLOSED;
772
773   Error err = TryAccessStream(url);
774   if (err != OK)
775     return err;
776
777   *stream = GetActivePushStream(url);
778   if (*stream) {
779     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
780     streams_pushed_and_claimed_count_++;
781   }
782   return OK;
783 }
784
785 // {,Try}CreateStream() and TryAccessStream() can be called with
786 // |in_io_loop_| set if a stream is being created in response to
787 // another being closed due to received data.
788
789 Error SpdySession::TryAccessStream(const GURL& url) {
790   if (is_secure_ && certificate_error_code_ != OK &&
791       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
792     RecordProtocolErrorHistogram(
793         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
794     DoDrainSession(
795         static_cast<Error>(certificate_error_code_),
796         "Tried to get SPDY stream for secure content over an unauthenticated "
797         "session.");
798     return ERR_SPDY_PROTOCOL_ERROR;
799   }
800   return OK;
801 }
802
803 int SpdySession::TryCreateStream(
804     const base::WeakPtr<SpdyStreamRequest>& request,
805     base::WeakPtr<SpdyStream>* stream) {
806   DCHECK(request);
807
808   if (availability_state_ == STATE_GOING_AWAY)
809     return ERR_FAILED;
810
811   if (availability_state_ == STATE_DRAINING)
812     return ERR_CONNECTION_CLOSED;
813
814   Error err = TryAccessStream(request->url());
815   if (err != OK)
816     return err;
817
818   if (!max_concurrent_streams_ ||
819       (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
820        max_concurrent_streams_)) {
821     return CreateStream(*request, stream);
822   }
823
824   stalled_streams_++;
825   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
826   RequestPriority priority = request->priority();
827   CHECK_GE(priority, MINIMUM_PRIORITY);
828   CHECK_LE(priority, MAXIMUM_PRIORITY);
829   pending_create_stream_queues_[priority].push_back(request);
830   return ERR_IO_PENDING;
831 }
832
833 int SpdySession::CreateStream(const SpdyStreamRequest& request,
834                               base::WeakPtr<SpdyStream>* stream) {
835   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
836   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
837
838   if (availability_state_ == STATE_GOING_AWAY)
839     return ERR_FAILED;
840
841   if (availability_state_ == STATE_DRAINING)
842     return ERR_CONNECTION_CLOSED;
843
844   Error err = TryAccessStream(request.url());
845   if (err != OK) {
846     // This should have been caught in TryCreateStream().
847     NOTREACHED();
848     return err;
849   }
850
851   DCHECK(connection_->socket());
852   DCHECK(connection_->socket()->IsConnected());
853   if (connection_->socket()) {
854     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
855                           connection_->socket()->IsConnected());
856     if (!connection_->socket()->IsConnected()) {
857       DoDrainSession(
858           ERR_CONNECTION_CLOSED,
859           "Tried to create SPDY stream for a closed socket connection.");
860       return ERR_CONNECTION_CLOSED;
861     }
862   }
863
864   scoped_ptr<SpdyStream> new_stream(
865       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
866                      request.priority(),
867                      stream_initial_send_window_size_,
868                      stream_initial_recv_window_size_,
869                      request.net_log()));
870   *stream = new_stream->GetWeakPtr();
871   InsertCreatedStream(new_stream.Pass());
872
873   UMA_HISTOGRAM_CUSTOM_COUNTS(
874       "Net.SpdyPriorityCount",
875       static_cast<int>(request.priority()), 0, 10, 11);
876
877   return OK;
878 }
879
880 void SpdySession::CancelStreamRequest(
881     const base::WeakPtr<SpdyStreamRequest>& request) {
882   DCHECK(request);
883   RequestPriority priority = request->priority();
884   CHECK_GE(priority, MINIMUM_PRIORITY);
885   CHECK_LE(priority, MAXIMUM_PRIORITY);
886
887 #if DCHECK_IS_ON
888   // |request| should not be in a queue not matching its priority.
889   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
890     if (priority == i)
891       continue;
892     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
893     DCHECK(std::find_if(queue->begin(),
894                         queue->end(),
895                         RequestEquals(request)) == queue->end());
896   }
897 #endif
898
899   PendingStreamRequestQueue* queue =
900       &pending_create_stream_queues_[priority];
901   // Remove |request| from |queue| while preserving the order of the
902   // other elements.
903   PendingStreamRequestQueue::iterator it =
904       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
905   // The request may already be removed if there's a
906   // CompleteStreamRequest() in flight.
907   if (it != queue->end()) {
908     it = queue->erase(it);
909     // |request| should be in the queue at most once, and if it is
910     // present, should not be pending completion.
911     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
912            queue->end());
913   }
914 }
915
916 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
917   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
918     if (pending_create_stream_queues_[j].empty())
919       continue;
920
921     base::WeakPtr<SpdyStreamRequest> pending_request =
922         pending_create_stream_queues_[j].front();
923     DCHECK(pending_request);
924     pending_create_stream_queues_[j].pop_front();
925     return pending_request;
926   }
927   return base::WeakPtr<SpdyStreamRequest>();
928 }
929
930 void SpdySession::ProcessPendingStreamRequests() {
931   // Like |max_concurrent_streams_|, 0 means infinite for
932   // |max_requests_to_process|.
933   size_t max_requests_to_process = 0;
934   if (max_concurrent_streams_ != 0) {
935     max_requests_to_process =
936         max_concurrent_streams_ -
937         (active_streams_.size() + created_streams_.size());
938   }
939   for (size_t i = 0;
940        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
941     base::WeakPtr<SpdyStreamRequest> pending_request =
942         GetNextPendingStreamRequest();
943     if (!pending_request)
944       break;
945
946     // Note that this post can race with other stream creations, and it's
947     // possible that the un-stalled stream will be stalled again if it loses.
948     // TODO(jgraettinger): Provide stronger ordering guarantees.
949     base::MessageLoop::current()->PostTask(
950         FROM_HERE,
951         base::Bind(&SpdySession::CompleteStreamRequest,
952                    weak_factory_.GetWeakPtr(),
953                    pending_request));
954   }
955 }
956
957 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
958   pooled_aliases_.insert(alias_key);
959 }
960
961 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
962   DCHECK(buffered_spdy_framer_.get());
963   return buffered_spdy_framer_->protocol_version();
964 }
965
966 bool SpdySession::HasAcceptableTransportSecurity() const {
967   // If we're not even using TLS, we have no standards to meet.
968   if (!is_secure_) {
969     return true;
970   }
971
972   // We don't enforce transport security standards for older SPDY versions.
973   if (GetProtocolVersion() < SPDY4) {
974     return true;
975   }
976
977   SSLInfo ssl_info;
978   CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
979
980   // HTTP/2 requires TLS 1.2+
981   if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
982       SSL_CONNECTION_VERSION_TLS1_2) {
983     return false;
984   }
985
986   if (!IsSecureTLSCipherSuite(
987           SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
988     return false;
989   }
990
991   return true;
992 }
993
994 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
995   return weak_factory_.GetWeakPtr();
996 }
997
998 bool SpdySession::CloseOneIdleConnection() {
999   CHECK(!in_io_loop_);
1000   DCHECK(pool_);
1001   if (active_streams_.empty()) {
1002     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1003   }
1004   // Return false as the socket wasn't immediately closed.
1005   return false;
1006 }
1007
1008 void SpdySession::EnqueueStreamWrite(
1009     const base::WeakPtr<SpdyStream>& stream,
1010     SpdyFrameType frame_type,
1011     scoped_ptr<SpdyBufferProducer> producer) {
1012   DCHECK(frame_type == HEADERS ||
1013          frame_type == DATA ||
1014          frame_type == CREDENTIAL ||
1015          frame_type == SYN_STREAM);
1016   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1017 }
1018
1019 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1020     SpdyStreamId stream_id,
1021     RequestPriority priority,
1022     SpdyControlFlags flags,
1023     const SpdyHeaderBlock& block) {
1024   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1025   CHECK(it != active_streams_.end());
1026   CHECK_EQ(it->second.stream->stream_id(), stream_id);
1027
1028   SendPrefacePingIfNoneInFlight();
1029
1030   DCHECK(buffered_spdy_framer_.get());
1031   SpdyPriority spdy_priority =
1032       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1033
1034   scoped_ptr<SpdyFrame> syn_frame;
1035   // TODO(hkhalil): Avoid copy of |block|.
1036   if (GetProtocolVersion() <= SPDY3) {
1037     SpdySynStreamIR syn_stream(stream_id);
1038     syn_stream.set_associated_to_stream_id(0);
1039     syn_stream.set_priority(spdy_priority);
1040     syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1041     syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1042     syn_stream.set_name_value_block(block);
1043     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1044   } else {
1045     SpdyHeadersIR headers(stream_id);
1046     headers.set_priority(spdy_priority);
1047     headers.set_has_priority(true);
1048     headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1049     headers.set_name_value_block(block);
1050     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1051   }
1052
1053   base::StatsCounter spdy_requests("spdy.requests");
1054   spdy_requests.Increment();
1055   streams_initiated_count_++;
1056
1057   if (net_log().IsLogging()) {
1058     net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1059                        base::Bind(&NetLogSpdySynStreamSentCallback,
1060                                   &block,
1061                                   (flags & CONTROL_FLAG_FIN) != 0,
1062                                   (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1063                                   spdy_priority,
1064                                   stream_id));
1065   }
1066
1067   return syn_frame.Pass();
1068 }
1069
1070 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1071                                                      IOBuffer* data,
1072                                                      int len,
1073                                                      SpdyDataFlags flags) {
1074   if (availability_state_ == STATE_DRAINING) {
1075     return scoped_ptr<SpdyBuffer>();
1076   }
1077
1078   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1079   CHECK(it != active_streams_.end());
1080   SpdyStream* stream = it->second.stream;
1081   CHECK_EQ(stream->stream_id(), stream_id);
1082
1083   if (len < 0) {
1084     NOTREACHED();
1085     return scoped_ptr<SpdyBuffer>();
1086   }
1087
1088   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1089
1090   bool send_stalled_by_stream =
1091       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1092       (stream->send_window_size() <= 0);
1093   bool send_stalled_by_session = IsSendStalled();
1094
1095   // NOTE: There's an enum of the same name in histograms.xml.
1096   enum SpdyFrameFlowControlState {
1097     SEND_NOT_STALLED,
1098     SEND_STALLED_BY_STREAM,
1099     SEND_STALLED_BY_SESSION,
1100     SEND_STALLED_BY_STREAM_AND_SESSION,
1101   };
1102
1103   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1104   if (send_stalled_by_stream) {
1105     if (send_stalled_by_session) {
1106       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1107     } else {
1108       frame_flow_control_state = SEND_STALLED_BY_STREAM;
1109     }
1110   } else if (send_stalled_by_session) {
1111     frame_flow_control_state = SEND_STALLED_BY_SESSION;
1112   }
1113
1114   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1115     UMA_HISTOGRAM_ENUMERATION(
1116         "Net.SpdyFrameStreamFlowControlState",
1117         frame_flow_control_state,
1118         SEND_STALLED_BY_STREAM + 1);
1119   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1120     UMA_HISTOGRAM_ENUMERATION(
1121         "Net.SpdyFrameStreamAndSessionFlowControlState",
1122         frame_flow_control_state,
1123         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1124   }
1125
1126   // Obey send window size of the stream if stream flow control is
1127   // enabled.
1128   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1129     if (send_stalled_by_stream) {
1130       stream->set_send_stalled_by_flow_control(true);
1131       // Even though we're currently stalled only by the stream, we
1132       // might end up being stalled by the session also.
1133       QueueSendStalledStream(*stream);
1134       net_log().AddEvent(
1135           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1136           NetLog::IntegerCallback("stream_id", stream_id));
1137       return scoped_ptr<SpdyBuffer>();
1138     }
1139
1140     effective_len = std::min(effective_len, stream->send_window_size());
1141   }
1142
1143   // Obey send window size of the session if session flow control is
1144   // enabled.
1145   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1146     if (send_stalled_by_session) {
1147       stream->set_send_stalled_by_flow_control(true);
1148       QueueSendStalledStream(*stream);
1149       net_log().AddEvent(
1150           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1151           NetLog::IntegerCallback("stream_id", stream_id));
1152       return scoped_ptr<SpdyBuffer>();
1153     }
1154
1155     effective_len = std::min(effective_len, session_send_window_size_);
1156   }
1157
1158   DCHECK_GE(effective_len, 0);
1159
1160   // Clear FIN flag if only some of the data will be in the data
1161   // frame.
1162   if (effective_len < len)
1163     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1164
1165   if (net_log().IsLogging()) {
1166     net_log().AddEvent(
1167         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1168         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1169                    (flags & DATA_FLAG_FIN) != 0));
1170   }
1171
1172   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1173   if (effective_len > 0)
1174     SendPrefacePingIfNoneInFlight();
1175
1176   // TODO(mbelshe): reduce memory copies here.
1177   DCHECK(buffered_spdy_framer_.get());
1178   scoped_ptr<SpdyFrame> frame(
1179       buffered_spdy_framer_->CreateDataFrame(
1180           stream_id, data->data(),
1181           static_cast<uint32>(effective_len), flags));
1182
1183   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1184
1185   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1186     DecreaseSendWindowSize(static_cast<int32>(effective_len));
1187     data_buffer->AddConsumeCallback(
1188         base::Bind(&SpdySession::OnWriteBufferConsumed,
1189                    weak_factory_.GetWeakPtr(),
1190                    static_cast<size_t>(effective_len)));
1191   }
1192
1193   return data_buffer.Pass();
1194 }
1195
1196 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1197   DCHECK_NE(stream_id, 0u);
1198
1199   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1200   if (it == active_streams_.end()) {
1201     NOTREACHED();
1202     return;
1203   }
1204
1205   CloseActiveStreamIterator(it, status);
1206 }
1207
1208 void SpdySession::CloseCreatedStream(
1209     const base::WeakPtr<SpdyStream>& stream, int status) {
1210   DCHECK_EQ(stream->stream_id(), 0u);
1211
1212   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1213   if (it == created_streams_.end()) {
1214     NOTREACHED();
1215     return;
1216   }
1217
1218   CloseCreatedStreamIterator(it, status);
1219 }
1220
1221 void SpdySession::ResetStream(SpdyStreamId stream_id,
1222                               SpdyRstStreamStatus status,
1223                               const std::string& description) {
1224   DCHECK_NE(stream_id, 0u);
1225
1226   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1227   if (it == active_streams_.end()) {
1228     NOTREACHED();
1229     return;
1230   }
1231
1232   ResetStreamIterator(it, status, description);
1233 }
1234
1235 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1236   return ContainsKey(active_streams_, stream_id);
1237 }
1238
1239 LoadState SpdySession::GetLoadState() const {
1240   // Just report that we're idle since the session could be doing
1241   // many things concurrently.
1242   return LOAD_STATE_IDLE;
1243 }
1244
1245 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1246                                             int status) {
1247   // TODO(mbelshe): We should send a RST_STREAM control frame here
1248   //                so that the server can cancel a large send.
1249
1250   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1251   active_streams_.erase(it);
1252
1253   // TODO(akalin): When SpdyStream was ref-counted (and
1254   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1255   // was only done when status was not OK. This meant that pushed
1256   // streams can still be claimed after they're closed. This is
1257   // probably something that we still want to support, although server
1258   // push is hardly used. Write tests for this and fix this. (See
1259   // http://crbug.com/261712 .)
1260   if (owned_stream->type() == SPDY_PUSH_STREAM) {
1261     unclaimed_pushed_streams_.erase(owned_stream->url());
1262     num_pushed_streams_--;
1263     if (!owned_stream->IsReservedRemote())
1264       num_active_pushed_streams_--;
1265   }
1266
1267   DeleteStream(owned_stream.Pass(), status);
1268   MaybeFinishGoingAway();
1269
1270   // If there are no active streams and the socket pool is stalled, close the
1271   // session to free up a socket slot.
1272   if (active_streams_.empty() && connection_->IsPoolStalled()) {
1273     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1274   }
1275 }
1276
1277 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1278                                              int status) {
1279   scoped_ptr<SpdyStream> owned_stream(*it);
1280   created_streams_.erase(it);
1281   DeleteStream(owned_stream.Pass(), status);
1282 }
1283
1284 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1285                                       SpdyRstStreamStatus status,
1286                                       const std::string& description) {
1287   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1288   // may close us.
1289   SpdyStreamId stream_id = it->first;
1290   RequestPriority priority = it->second.stream->priority();
1291   EnqueueResetStreamFrame(stream_id, priority, status, description);
1292
1293   // Removes any pending writes for the stream except for possibly an
1294   // in-flight one.
1295   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1296 }
1297
1298 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1299                                           RequestPriority priority,
1300                                           SpdyRstStreamStatus status,
1301                                           const std::string& description) {
1302   DCHECK_NE(stream_id, 0u);
1303
1304   net_log().AddEvent(
1305       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1306       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1307
1308   DCHECK(buffered_spdy_framer_.get());
1309   scoped_ptr<SpdyFrame> rst_frame(
1310       buffered_spdy_framer_->CreateRstStream(stream_id, status));
1311
1312   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1313   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1314 }
1315
1316 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1317   CHECK(!in_io_loop_);
1318   if (availability_state_ == STATE_DRAINING) {
1319     return;
1320   }
1321   ignore_result(DoReadLoop(expected_read_state, result));
1322 }
1323
1324 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1325   CHECK(!in_io_loop_);
1326   CHECK_EQ(read_state_, expected_read_state);
1327
1328   in_io_loop_ = true;
1329
1330   int bytes_read_without_yielding = 0;
1331
1332   // Loop until the session is draining, the read becomes blocked, or
1333   // the read limit is exceeded.
1334   while (true) {
1335     switch (read_state_) {
1336       case READ_STATE_DO_READ:
1337         CHECK_EQ(result, OK);
1338         result = DoRead();
1339         break;
1340       case READ_STATE_DO_READ_COMPLETE:
1341         if (result > 0)
1342           bytes_read_without_yielding += result;
1343         result = DoReadComplete(result);
1344         break;
1345       default:
1346         NOTREACHED() << "read_state_: " << read_state_;
1347         break;
1348     }
1349
1350     if (availability_state_ == STATE_DRAINING)
1351       break;
1352
1353     if (result == ERR_IO_PENDING)
1354       break;
1355
1356     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1357       read_state_ = READ_STATE_DO_READ;
1358       base::MessageLoop::current()->PostTask(
1359           FROM_HERE,
1360           base::Bind(&SpdySession::PumpReadLoop,
1361                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1362       result = ERR_IO_PENDING;
1363       break;
1364     }
1365   }
1366
1367   CHECK(in_io_loop_);
1368   in_io_loop_ = false;
1369
1370   return result;
1371 }
1372
1373 int SpdySession::DoRead() {
1374   CHECK(in_io_loop_);
1375
1376   CHECK(connection_);
1377   CHECK(connection_->socket());
1378   read_state_ = READ_STATE_DO_READ_COMPLETE;
1379   return connection_->socket()->Read(
1380       read_buffer_.get(),
1381       kReadBufferSize,
1382       base::Bind(&SpdySession::PumpReadLoop,
1383                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1384 }
1385
1386 int SpdySession::DoReadComplete(int result) {
1387   CHECK(in_io_loop_);
1388
1389   // Parse a frame.  For now this code requires that the frame fit into our
1390   // buffer (kReadBufferSize).
1391   // TODO(mbelshe): support arbitrarily large frames!
1392
1393   if (result == 0) {
1394     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1395                                 total_bytes_received_, 1, 100000000, 50);
1396     DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1397
1398     return ERR_CONNECTION_CLOSED;
1399   }
1400
1401   if (result < 0) {
1402     DoDrainSession(static_cast<Error>(result), "result is < 0.");
1403     return result;
1404   }
1405   CHECK_LE(result, kReadBufferSize);
1406   total_bytes_received_ += result;
1407
1408   last_activity_time_ = time_func_();
1409
1410   DCHECK(buffered_spdy_framer_.get());
1411   char* data = read_buffer_->data();
1412   while (result > 0) {
1413     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1414     result -= bytes_processed;
1415     data += bytes_processed;
1416
1417     if (availability_state_ == STATE_DRAINING) {
1418       return ERR_CONNECTION_CLOSED;
1419     }
1420
1421     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1422   }
1423
1424   read_state_ = READ_STATE_DO_READ;
1425   return OK;
1426 }
1427
1428 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1429   CHECK(!in_io_loop_);
1430   DCHECK_EQ(write_state_, expected_write_state);
1431
1432   DoWriteLoop(expected_write_state, result);
1433
1434   if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1435       write_queue_.IsEmpty()) {
1436     pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
1437     return;
1438   }
1439 }
1440
1441 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1442   CHECK(!in_io_loop_);
1443   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1444   DCHECK_EQ(write_state_, expected_write_state);
1445
1446   in_io_loop_ = true;
1447
1448   // Loop until the session is closed or the write becomes blocked.
1449   while (true) {
1450     switch (write_state_) {
1451       case WRITE_STATE_DO_WRITE:
1452         DCHECK_EQ(result, OK);
1453         result = DoWrite();
1454         break;
1455       case WRITE_STATE_DO_WRITE_COMPLETE:
1456         result = DoWriteComplete(result);
1457         break;
1458       case WRITE_STATE_IDLE:
1459       default:
1460         NOTREACHED() << "write_state_: " << write_state_;
1461         break;
1462     }
1463
1464     if (write_state_ == WRITE_STATE_IDLE) {
1465       DCHECK_EQ(result, ERR_IO_PENDING);
1466       break;
1467     }
1468
1469     if (result == ERR_IO_PENDING)
1470       break;
1471   }
1472
1473   CHECK(in_io_loop_);
1474   in_io_loop_ = false;
1475
1476   return result;
1477 }
1478
1479 int SpdySession::DoWrite() {
1480   CHECK(in_io_loop_);
1481
1482   DCHECK(buffered_spdy_framer_);
1483   if (in_flight_write_) {
1484     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1485   } else {
1486     // Grab the next frame to send.
1487     SpdyFrameType frame_type = DATA;
1488     scoped_ptr<SpdyBufferProducer> producer;
1489     base::WeakPtr<SpdyStream> stream;
1490     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1491       write_state_ = WRITE_STATE_IDLE;
1492       return ERR_IO_PENDING;
1493     }
1494
1495     if (stream.get())
1496       CHECK(!stream->IsClosed());
1497
1498     // Activate the stream only when sending the SYN_STREAM frame to
1499     // guarantee monotonically-increasing stream IDs.
1500     if (frame_type == SYN_STREAM) {
1501       CHECK(stream.get());
1502       CHECK_EQ(stream->stream_id(), 0u);
1503       scoped_ptr<SpdyStream> owned_stream =
1504           ActivateCreatedStream(stream.get());
1505       InsertActivatedStream(owned_stream.Pass());
1506
1507       if (stream_hi_water_mark_ > kLastStreamId) {
1508         CHECK_EQ(stream->stream_id(), kLastStreamId);
1509         // We've exhausted the stream ID space, and no new streams may be
1510         // created after this one.
1511         MakeUnavailable();
1512         StartGoingAway(kLastStreamId, ERR_ABORTED);
1513       }
1514     }
1515
1516     in_flight_write_ = producer->ProduceBuffer();
1517     if (!in_flight_write_) {
1518       NOTREACHED();
1519       return ERR_UNEXPECTED;
1520     }
1521     in_flight_write_frame_type_ = frame_type;
1522     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1523     DCHECK_GE(in_flight_write_frame_size_,
1524               buffered_spdy_framer_->GetFrameMinimumSize());
1525     in_flight_write_stream_ = stream;
1526   }
1527
1528   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1529
1530   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1531   // with Socket implementations that don't store their IOBuffer
1532   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1533   scoped_refptr<IOBuffer> write_io_buffer =
1534       in_flight_write_->GetIOBufferForRemainingData();
1535   return connection_->socket()->Write(
1536       write_io_buffer.get(),
1537       in_flight_write_->GetRemainingSize(),
1538       base::Bind(&SpdySession::PumpWriteLoop,
1539                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1540 }
1541
1542 int SpdySession::DoWriteComplete(int result) {
1543   CHECK(in_io_loop_);
1544   DCHECK_NE(result, ERR_IO_PENDING);
1545   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1546
1547   last_activity_time_ = time_func_();
1548
1549   if (result < 0) {
1550     DCHECK_NE(result, ERR_IO_PENDING);
1551     in_flight_write_.reset();
1552     in_flight_write_frame_type_ = DATA;
1553     in_flight_write_frame_size_ = 0;
1554     in_flight_write_stream_.reset();
1555     write_state_ = WRITE_STATE_DO_WRITE;
1556     DoDrainSession(static_cast<Error>(result), "Write error");
1557     return OK;
1558   }
1559
1560   // It should not be possible to have written more bytes than our
1561   // in_flight_write_.
1562   DCHECK_LE(static_cast<size_t>(result),
1563             in_flight_write_->GetRemainingSize());
1564
1565   if (result > 0) {
1566     in_flight_write_->Consume(static_cast<size_t>(result));
1567
1568     // We only notify the stream when we've fully written the pending frame.
1569     if (in_flight_write_->GetRemainingSize() == 0) {
1570       // It is possible that the stream was cancelled while we were
1571       // writing to the socket.
1572       if (in_flight_write_stream_.get()) {
1573         DCHECK_GT(in_flight_write_frame_size_, 0u);
1574         in_flight_write_stream_->OnFrameWriteComplete(
1575             in_flight_write_frame_type_,
1576             in_flight_write_frame_size_);
1577       }
1578
1579       // Cleanup the write which just completed.
1580       in_flight_write_.reset();
1581       in_flight_write_frame_type_ = DATA;
1582       in_flight_write_frame_size_ = 0;
1583       in_flight_write_stream_.reset();
1584     }
1585   }
1586
1587   write_state_ = WRITE_STATE_DO_WRITE;
1588   return OK;
1589 }
1590
1591 void SpdySession::DcheckGoingAway() const {
1592 #if DCHECK_IS_ON
1593   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1594   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1595     DCHECK(pending_create_stream_queues_[i].empty());
1596   }
1597   DCHECK(created_streams_.empty());
1598 #endif
1599 }
1600
1601 void SpdySession::DcheckDraining() const {
1602   DcheckGoingAway();
1603   DCHECK_EQ(availability_state_, STATE_DRAINING);
1604   DCHECK(active_streams_.empty());
1605   DCHECK(unclaimed_pushed_streams_.empty());
1606 }
1607
1608 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1609                                  Error status) {
1610   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1611
1612   // The loops below are carefully written to avoid reentrancy problems.
1613
1614   while (true) {
1615     size_t old_size = GetTotalSize(pending_create_stream_queues_);
1616     base::WeakPtr<SpdyStreamRequest> pending_request =
1617         GetNextPendingStreamRequest();
1618     if (!pending_request)
1619       break;
1620     // No new stream requests should be added while the session is
1621     // going away.
1622     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1623     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1624   }
1625
1626   while (true) {
1627     size_t old_size = active_streams_.size();
1628     ActiveStreamMap::iterator it =
1629         active_streams_.lower_bound(last_good_stream_id + 1);
1630     if (it == active_streams_.end())
1631       break;
1632     LogAbandonedActiveStream(it, status);
1633     CloseActiveStreamIterator(it, status);
1634     // No new streams should be activated while the session is going
1635     // away.
1636     DCHECK_GT(old_size, active_streams_.size());
1637   }
1638
1639   while (!created_streams_.empty()) {
1640     size_t old_size = created_streams_.size();
1641     CreatedStreamSet::iterator it = created_streams_.begin();
1642     LogAbandonedStream(*it, status);
1643     CloseCreatedStreamIterator(it, status);
1644     // No new streams should be created while the session is going
1645     // away.
1646     DCHECK_GT(old_size, created_streams_.size());
1647   }
1648
1649   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1650
1651   DcheckGoingAway();
1652 }
1653
1654 void SpdySession::MaybeFinishGoingAway() {
1655   if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1656     DoDrainSession(OK, "Finished going away");
1657   }
1658 }
1659
1660 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1661   if (availability_state_ == STATE_DRAINING) {
1662     return;
1663   }
1664   MakeUnavailable();
1665
1666   // If |err| indicates an error occurred, inform the peer that we're closing
1667   // and why. Don't GOAWAY on a graceful or idle close, as that may
1668   // unnecessarily wake the radio. We could technically GOAWAY on network errors
1669   // (we'll probably fail to actually write it, but that's okay), however many
1670   // unit-tests would need to be updated.
1671   if (err != OK &&
1672       err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
1673       err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
1674       err != ERR_SOCKET_NOT_CONNECTED &&
1675       err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1676     // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1677     SpdyGoAwayIR goaway_ir(0,  // Last accepted stream ID.
1678                            MapNetErrorToGoAwayStatus(err),
1679                            description);
1680     EnqueueSessionWrite(HIGHEST,
1681                         GOAWAY,
1682                         scoped_ptr<SpdyFrame>(
1683                             buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1684   }
1685
1686   availability_state_ = STATE_DRAINING;
1687   error_on_close_ = err;
1688
1689   net_log_.AddEvent(
1690       NetLog::TYPE_SPDY_SESSION_CLOSE,
1691       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1692
1693   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1694   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1695                               total_bytes_received_, 1, 100000000, 50);
1696
1697   if (err == OK) {
1698     // We ought to be going away already, as this is a graceful close.
1699     DcheckGoingAway();
1700   } else {
1701     StartGoingAway(0, err);
1702   }
1703   DcheckDraining();
1704   MaybePostWriteLoop();
1705 }
1706
1707 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1708   DCHECK(stream);
1709   std::string description = base::StringPrintf(
1710       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1711       stream->url().spec();
1712   stream->LogStreamError(status, description);
1713   // We don't increment the streams abandoned counter here. If the
1714   // stream isn't active (i.e., it hasn't written anything to the wire
1715   // yet) then it's as if it never existed. If it is active, then
1716   // LogAbandonedActiveStream() will increment the counters.
1717 }
1718
1719 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1720                                            Error status) {
1721   DCHECK_GT(it->first, 0u);
1722   LogAbandonedStream(it->second.stream, status);
1723   ++streams_abandoned_count_;
1724   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1725   abandoned_streams.Increment();
1726   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1727       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1728       unclaimed_pushed_streams_.end()) {
1729     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1730     abandoned_push_streams.Increment();
1731   }
1732 }
1733
1734 SpdyStreamId SpdySession::GetNewStreamId() {
1735   CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1736   SpdyStreamId id = stream_hi_water_mark_;
1737   stream_hi_water_mark_ += 2;
1738   return id;
1739 }
1740
1741 void SpdySession::CloseSessionOnError(Error err,
1742                                       const std::string& description) {
1743   DCHECK_LT(err, ERR_IO_PENDING);
1744   DoDrainSession(err, description);
1745 }
1746
1747 void SpdySession::MakeUnavailable() {
1748   if (availability_state_ == STATE_AVAILABLE) {
1749     availability_state_ = STATE_GOING_AWAY;
1750     pool_->MakeSessionUnavailable(GetWeakPtr());
1751   }
1752 }
1753
1754 base::Value* SpdySession::GetInfoAsValue() const {
1755   base::DictionaryValue* dict = new base::DictionaryValue();
1756
1757   dict->SetInteger("source_id", net_log_.source().id);
1758
1759   dict->SetString("host_port_pair", host_port_pair().ToString());
1760   if (!pooled_aliases_.empty()) {
1761     base::ListValue* alias_list = new base::ListValue();
1762     for (std::set<SpdySessionKey>::const_iterator it =
1763              pooled_aliases_.begin();
1764          it != pooled_aliases_.end(); it++) {
1765       alias_list->Append(new base::StringValue(
1766           it->host_port_pair().ToString()));
1767     }
1768     dict->Set("aliases", alias_list);
1769   }
1770   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1771
1772   dict->SetInteger("active_streams", active_streams_.size());
1773
1774   dict->SetInteger("unclaimed_pushed_streams",
1775                    unclaimed_pushed_streams_.size());
1776
1777   dict->SetBoolean("is_secure", is_secure_);
1778
1779   dict->SetString("protocol_negotiated",
1780                   SSLClientSocket::NextProtoToString(
1781                       connection_->socket()->GetNegotiatedProtocol()));
1782
1783   dict->SetInteger("error", error_on_close_);
1784   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1785
1786   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1787   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1788   dict->SetInteger("streams_pushed_and_claimed_count",
1789       streams_pushed_and_claimed_count_);
1790   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1791   DCHECK(buffered_spdy_framer_.get());
1792   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1793
1794   dict->SetBoolean("sent_settings", sent_settings_);
1795   dict->SetBoolean("received_settings", received_settings_);
1796
1797   dict->SetInteger("send_window_size", session_send_window_size_);
1798   dict->SetInteger("recv_window_size", session_recv_window_size_);
1799   dict->SetInteger("unacked_recv_window_bytes",
1800                    session_unacked_recv_window_bytes_);
1801   return dict;
1802 }
1803
1804 bool SpdySession::IsReused() const {
1805   return buffered_spdy_framer_->frames_received() > 0 ||
1806       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1807 }
1808
1809 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1810                                     LoadTimingInfo* load_timing_info) const {
1811   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1812                                         load_timing_info);
1813 }
1814
1815 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1816   int rv = ERR_SOCKET_NOT_CONNECTED;
1817   if (connection_->socket()) {
1818     rv = connection_->socket()->GetPeerAddress(address);
1819   }
1820
1821   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1822                         rv == ERR_SOCKET_NOT_CONNECTED);
1823
1824   return rv;
1825 }
1826
1827 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1828   int rv = ERR_SOCKET_NOT_CONNECTED;
1829   if (connection_->socket()) {
1830     rv = connection_->socket()->GetLocalAddress(address);
1831   }
1832
1833   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1834                         rv == ERR_SOCKET_NOT_CONNECTED);
1835
1836   return rv;
1837 }
1838
1839 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1840                                       SpdyFrameType frame_type,
1841                                       scoped_ptr<SpdyFrame> frame) {
1842   DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1843          frame_type == WINDOW_UPDATE || frame_type == PING ||
1844          frame_type == GOAWAY);
1845   EnqueueWrite(
1846       priority, frame_type,
1847       scoped_ptr<SpdyBufferProducer>(
1848           new SimpleBufferProducer(
1849               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1850       base::WeakPtr<SpdyStream>());
1851 }
1852
1853 void SpdySession::EnqueueWrite(RequestPriority priority,
1854                                SpdyFrameType frame_type,
1855                                scoped_ptr<SpdyBufferProducer> producer,
1856                                const base::WeakPtr<SpdyStream>& stream) {
1857   if (availability_state_ == STATE_DRAINING)
1858     return;
1859
1860   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1861   MaybePostWriteLoop();
1862 }
1863
1864 void SpdySession::MaybePostWriteLoop() {
1865   if (write_state_ == WRITE_STATE_IDLE) {
1866     CHECK(!in_flight_write_);
1867     write_state_ = WRITE_STATE_DO_WRITE;
1868     base::MessageLoop::current()->PostTask(
1869         FROM_HERE,
1870         base::Bind(&SpdySession::PumpWriteLoop,
1871                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1872   }
1873 }
1874
1875 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1876   CHECK_EQ(stream->stream_id(), 0u);
1877   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1878   created_streams_.insert(stream.release());
1879 }
1880
1881 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1882   CHECK_EQ(stream->stream_id(), 0u);
1883   CHECK(created_streams_.find(stream) != created_streams_.end());
1884   stream->set_stream_id(GetNewStreamId());
1885   scoped_ptr<SpdyStream> owned_stream(stream);
1886   created_streams_.erase(stream);
1887   return owned_stream.Pass();
1888 }
1889
1890 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1891   SpdyStreamId stream_id = stream->stream_id();
1892   CHECK_NE(stream_id, 0u);
1893   std::pair<ActiveStreamMap::iterator, bool> result =
1894       active_streams_.insert(
1895           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1896   CHECK(result.second);
1897   ignore_result(stream.release());
1898 }
1899
1900 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1901   if (in_flight_write_stream_.get() == stream.get()) {
1902     // If we're deleting the stream for the in-flight write, we still
1903     // need to let the write complete, so we clear
1904     // |in_flight_write_stream_| and let the write finish on its own
1905     // without notifying |in_flight_write_stream_|.
1906     in_flight_write_stream_.reset();
1907   }
1908
1909   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1910   stream->OnClose(status);
1911
1912   if (availability_state_ == STATE_AVAILABLE) {
1913     ProcessPendingStreamRequests();
1914   }
1915 }
1916
1917 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1918   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1919
1920   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1921   if (unclaimed_it == unclaimed_pushed_streams_.end())
1922     return base::WeakPtr<SpdyStream>();
1923
1924   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1925   unclaimed_pushed_streams_.erase(unclaimed_it);
1926
1927   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1928   if (active_it == active_streams_.end()) {
1929     NOTREACHED();
1930     return base::WeakPtr<SpdyStream>();
1931   }
1932
1933   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1934   used_push_streams.Increment();
1935   return active_it->second.stream->GetWeakPtr();
1936 }
1937
1938 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1939                              bool* was_npn_negotiated,
1940                              NextProto* protocol_negotiated) {
1941   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1942   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1943   return connection_->socket()->GetSSLInfo(ssl_info);
1944 }
1945
1946 bool SpdySession::GetSSLCertRequestInfo(
1947     SSLCertRequestInfo* cert_request_info) {
1948   if (!is_secure_)
1949     return false;
1950   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1951   return true;
1952 }
1953
1954 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1955   CHECK(in_io_loop_);
1956
1957   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1958   std::string description =
1959       base::StringPrintf("Framer error: %d (%s).",
1960                          error_code,
1961                          SpdyFramer::ErrorCodeToString(error_code));
1962   DoDrainSession(MapFramerErrorToNetError(error_code), description);
1963 }
1964
1965 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1966                                 const std::string& description) {
1967   CHECK(in_io_loop_);
1968
1969   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1970   if (it == active_streams_.end()) {
1971     // We still want to send a frame to reset the stream even if we
1972     // don't know anything about it.
1973     EnqueueResetStreamFrame(
1974         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
1975     return;
1976   }
1977
1978   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
1979 }
1980
1981 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
1982                                     size_t length,
1983                                     bool fin) {
1984   CHECK(in_io_loop_);
1985
1986   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1987
1988   // By the time data comes in, the stream may already be inactive.
1989   if (it == active_streams_.end())
1990     return;
1991
1992   SpdyStream* stream = it->second.stream;
1993   CHECK_EQ(stream->stream_id(), stream_id);
1994
1995   DCHECK(buffered_spdy_framer_);
1996   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
1997   stream->IncrementRawReceivedBytes(header_len);
1998 }
1999
2000 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2001                                     const char* data,
2002                                     size_t len,
2003                                     bool fin) {
2004   CHECK(in_io_loop_);
2005
2006   if (data == NULL && len != 0) {
2007     // This is notification of consumed data padding.
2008     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2009     // See crbug.com/353012.
2010     return;
2011   }
2012
2013   DCHECK_LT(len, 1u << 24);
2014   if (net_log().IsLogging()) {
2015     net_log().AddEvent(
2016         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2017         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2018   }
2019
2020   // Build the buffer as early as possible so that we go through the
2021   // session flow control checks and update
2022   // |unacked_recv_window_bytes_| properly even when the stream is
2023   // inactive (since the other side has still reduced its session send
2024   // window).
2025   scoped_ptr<SpdyBuffer> buffer;
2026   if (data) {
2027     DCHECK_GT(len, 0u);
2028     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2029     buffer.reset(new SpdyBuffer(data, len));
2030
2031     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2032       DecreaseRecvWindowSize(static_cast<int32>(len));
2033       buffer->AddConsumeCallback(
2034           base::Bind(&SpdySession::OnReadBufferConsumed,
2035                      weak_factory_.GetWeakPtr()));
2036     }
2037   } else {
2038     DCHECK_EQ(len, 0u);
2039   }
2040
2041   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2042
2043   // By the time data comes in, the stream may already be inactive.
2044   if (it == active_streams_.end())
2045     return;
2046
2047   SpdyStream* stream = it->second.stream;
2048   CHECK_EQ(stream->stream_id(), stream_id);
2049
2050   stream->IncrementRawReceivedBytes(len);
2051
2052   if (it->second.waiting_for_syn_reply) {
2053     const std::string& error = "Data received before SYN_REPLY.";
2054     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2055     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2056     return;
2057   }
2058
2059   stream->OnDataReceived(buffer.Pass());
2060 }
2061
2062 void SpdySession::OnSettings(bool clear_persisted) {
2063   CHECK(in_io_loop_);
2064
2065   if (clear_persisted)
2066     http_server_properties_->ClearSpdySettings(host_port_pair());
2067
2068   if (net_log_.IsLogging()) {
2069     net_log_.AddEvent(
2070         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2071         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2072                    clear_persisted));
2073   }
2074
2075   if (GetProtocolVersion() >= SPDY4) {
2076     // Send an acknowledgment of the setting.
2077     SpdySettingsIR settings_ir;
2078     settings_ir.set_is_ack(true);
2079     EnqueueSessionWrite(
2080         HIGHEST,
2081         SETTINGS,
2082         scoped_ptr<SpdyFrame>(
2083             buffered_spdy_framer_->SerializeFrame(settings_ir)));
2084   }
2085 }
2086
2087 void SpdySession::OnSetting(SpdySettingsIds id,
2088                             uint8 flags,
2089                             uint32 value) {
2090   CHECK(in_io_loop_);
2091
2092   HandleSetting(id, value);
2093   http_server_properties_->SetSpdySetting(
2094       host_port_pair(),
2095       id,
2096       static_cast<SpdySettingsFlags>(flags),
2097       value);
2098   received_settings_ = true;
2099
2100   // Log the setting.
2101   net_log_.AddEvent(
2102       NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2103       base::Bind(&NetLogSpdySettingCallback,
2104                  id, static_cast<SpdySettingsFlags>(flags), value));
2105 }
2106
2107 void SpdySession::OnSendCompressedFrame(
2108     SpdyStreamId stream_id,
2109     SpdyFrameType type,
2110     size_t payload_len,
2111     size_t frame_len) {
2112   if (type != SYN_STREAM && type != HEADERS)
2113     return;
2114
2115   DCHECK(buffered_spdy_framer_.get());
2116   size_t compressed_len =
2117       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2118
2119   if (payload_len) {
2120     // Make sure we avoid early decimal truncation.
2121     int compression_pct = 100 - (100 * compressed_len) / payload_len;
2122     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2123                              compression_pct);
2124   }
2125 }
2126
2127 void SpdySession::OnReceiveCompressedFrame(
2128     SpdyStreamId stream_id,
2129     SpdyFrameType type,
2130     size_t frame_len) {
2131   last_compressed_frame_len_ = frame_len;
2132 }
2133
2134 int SpdySession::OnInitialResponseHeadersReceived(
2135     const SpdyHeaderBlock& response_headers,
2136     base::Time response_time,
2137     base::TimeTicks recv_first_byte_time,
2138     SpdyStream* stream) {
2139   CHECK(in_io_loop_);
2140   SpdyStreamId stream_id = stream->stream_id();
2141
2142   if (stream->type() == SPDY_PUSH_STREAM) {
2143     DCHECK(stream->IsReservedRemote());
2144     if (max_concurrent_pushed_streams_ &&
2145         num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2146       ResetStream(stream_id,
2147                   RST_STREAM_REFUSED_STREAM,
2148                   "Stream concurrency limit reached.");
2149       return STATUS_CODE_REFUSED_STREAM;
2150     }
2151   }
2152
2153   if (stream->type() == SPDY_PUSH_STREAM) {
2154     // Will be balanced in DeleteStream.
2155     num_active_pushed_streams_++;
2156   }
2157
2158   // May invalidate |stream|.
2159   int rv = stream->OnInitialResponseHeadersReceived(
2160       response_headers, response_time, recv_first_byte_time);
2161   if (rv < 0) {
2162     DCHECK_NE(rv, ERR_IO_PENDING);
2163     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2164   }
2165
2166   return rv;
2167 }
2168
2169 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2170                               SpdyStreamId associated_stream_id,
2171                               SpdyPriority priority,
2172                               bool fin,
2173                               bool unidirectional,
2174                               const SpdyHeaderBlock& headers) {
2175   CHECK(in_io_loop_);
2176
2177   if (GetProtocolVersion() >= SPDY4) {
2178     DCHECK_EQ(0u, associated_stream_id);
2179     OnHeaders(stream_id, fin, headers);
2180     return;
2181   }
2182
2183   base::Time response_time = base::Time::Now();
2184   base::TimeTicks recv_first_byte_time = time_func_();
2185
2186   if (net_log_.IsLogging()) {
2187     net_log_.AddEvent(
2188         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2189         base::Bind(&NetLogSpdySynStreamReceivedCallback,
2190                    &headers, fin, unidirectional, priority,
2191                    stream_id, associated_stream_id));
2192   }
2193
2194   // Split headers to simulate push promise and response.
2195   SpdyHeaderBlock request_headers;
2196   SpdyHeaderBlock response_headers;
2197   SplitPushedHeadersToRequestAndResponse(
2198       headers, GetProtocolVersion(), &request_headers, &response_headers);
2199
2200   if (!TryCreatePushStream(
2201           stream_id, associated_stream_id, priority, request_headers))
2202     return;
2203
2204   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2205   if (active_it == active_streams_.end()) {
2206     NOTREACHED();
2207     return;
2208   }
2209
2210   if (OnInitialResponseHeadersReceived(response_headers,
2211                                        response_time,
2212                                        recv_first_byte_time,
2213                                        active_it->second.stream) != OK)
2214     return;
2215
2216   base::StatsCounter push_requests("spdy.pushed_streams");
2217   push_requests.Increment();
2218 }
2219
2220 void SpdySession::DeleteExpiredPushedStreams() {
2221   if (unclaimed_pushed_streams_.empty())
2222     return;
2223
2224   // Check that adequate time has elapsed since the last sweep.
2225   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2226     return;
2227
2228   // Gather old streams to delete.
2229   base::TimeTicks minimum_freshness = time_func_() -
2230       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2231   std::vector<SpdyStreamId> streams_to_close;
2232   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2233        it != unclaimed_pushed_streams_.end(); ++it) {
2234     if (minimum_freshness > it->second.creation_time)
2235       streams_to_close.push_back(it->second.stream_id);
2236   }
2237
2238   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2239            streams_to_close.begin();
2240        to_close_it != streams_to_close.end(); ++to_close_it) {
2241     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2242     if (active_it == active_streams_.end())
2243       continue;
2244
2245     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2246     // CloseActiveStreamIterator() will remove the stream from
2247     // |unclaimed_pushed_streams_|.
2248     ResetStreamIterator(
2249         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2250   }
2251
2252   next_unclaimed_push_stream_sweep_time_ = time_func_() +
2253       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2254 }
2255
2256 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2257                              bool fin,
2258                              const SpdyHeaderBlock& headers) {
2259   CHECK(in_io_loop_);
2260
2261   base::Time response_time = base::Time::Now();
2262   base::TimeTicks recv_first_byte_time = time_func_();
2263
2264   if (net_log().IsLogging()) {
2265     net_log().AddEvent(
2266         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2267         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2268                    &headers, fin, stream_id));
2269   }
2270
2271   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2272   if (it == active_streams_.end()) {
2273     // NOTE:  it may just be that the stream was cancelled.
2274     return;
2275   }
2276
2277   SpdyStream* stream = it->second.stream;
2278   CHECK_EQ(stream->stream_id(), stream_id);
2279
2280   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2281   last_compressed_frame_len_ = 0;
2282
2283   if (GetProtocolVersion() >= SPDY4) {
2284     const std::string& error =
2285         "SPDY4 wasn't expecting SYN_REPLY.";
2286     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2287     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2288     return;
2289   }
2290   if (!it->second.waiting_for_syn_reply) {
2291     const std::string& error =
2292         "Received duplicate SYN_REPLY for stream.";
2293     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2294     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2295     return;
2296   }
2297   it->second.waiting_for_syn_reply = false;
2298
2299   ignore_result(OnInitialResponseHeadersReceived(
2300       headers, response_time, recv_first_byte_time, stream));
2301 }
2302
2303 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2304                             bool fin,
2305                             const SpdyHeaderBlock& headers) {
2306   CHECK(in_io_loop_);
2307
2308   if (net_log().IsLogging()) {
2309     net_log().AddEvent(
2310         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2311         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2312                    &headers, fin, stream_id));
2313   }
2314
2315   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2316   if (it == active_streams_.end()) {
2317     // NOTE:  it may just be that the stream was cancelled.
2318     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2319     return;
2320   }
2321
2322   SpdyStream* stream = it->second.stream;
2323   CHECK_EQ(stream->stream_id(), stream_id);
2324
2325   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2326   last_compressed_frame_len_ = 0;
2327
2328   base::Time response_time = base::Time::Now();
2329   base::TimeTicks recv_first_byte_time = time_func_();
2330
2331   if (it->second.waiting_for_syn_reply) {
2332     if (GetProtocolVersion() < SPDY4) {
2333       const std::string& error =
2334           "Was expecting SYN_REPLY, not HEADERS.";
2335       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2336       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2337       return;
2338     }
2339
2340     it->second.waiting_for_syn_reply = false;
2341     ignore_result(OnInitialResponseHeadersReceived(
2342         headers, response_time, recv_first_byte_time, stream));
2343   } else if (it->second.stream->IsReservedRemote()) {
2344     ignore_result(OnInitialResponseHeadersReceived(
2345         headers, response_time, recv_first_byte_time, stream));
2346   } else {
2347     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2348     if (rv < 0) {
2349       DCHECK_NE(rv, ERR_IO_PENDING);
2350       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2351     }
2352   }
2353 }
2354
2355 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2356                               SpdyRstStreamStatus status) {
2357   CHECK(in_io_loop_);
2358
2359   std::string description;
2360   net_log().AddEvent(
2361       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2362       base::Bind(&NetLogSpdyRstCallback,
2363                  stream_id, status, &description));
2364
2365   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2366   if (it == active_streams_.end()) {
2367     // NOTE:  it may just be that the stream was cancelled.
2368     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2369     return;
2370   }
2371
2372   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2373
2374   if (status == 0) {
2375     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2376   } else if (status == RST_STREAM_REFUSED_STREAM) {
2377     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2378   } else {
2379     RecordProtocolErrorHistogram(
2380         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2381     it->second.stream->LogStreamError(
2382         ERR_SPDY_PROTOCOL_ERROR,
2383         base::StringPrintf("SPDY stream closed with status: %d", status));
2384     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2385     //                For now, it doesn't matter much - it is a protocol error.
2386     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2387   }
2388 }
2389
2390 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2391                            SpdyGoAwayStatus status) {
2392   CHECK(in_io_loop_);
2393
2394   // TODO(jgraettinger): UMA histogram on |status|.
2395
2396   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2397       base::Bind(&NetLogSpdyGoAwayCallback,
2398                  last_accepted_stream_id,
2399                  active_streams_.size(),
2400                  unclaimed_pushed_streams_.size(),
2401                  status));
2402   MakeUnavailable();
2403   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2404   // This is to handle the case when we already don't have any active
2405   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2406   // active streams and so the last one being closed will finish the
2407   // going away process (see DeleteStream()).
2408   MaybeFinishGoingAway();
2409 }
2410
2411 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2412   CHECK(in_io_loop_);
2413
2414   net_log_.AddEvent(
2415       NetLog::TYPE_SPDY_SESSION_PING,
2416       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2417
2418   // Send response to a PING from server.
2419   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2420       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2421     WritePingFrame(unique_id, true);
2422     return;
2423   }
2424
2425   --pings_in_flight_;
2426   if (pings_in_flight_ < 0) {
2427     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2428     DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2429     pings_in_flight_ = 0;
2430     return;
2431   }
2432
2433   if (pings_in_flight_ > 0)
2434     return;
2435
2436   // We will record RTT in histogram when there are no more client sent
2437   // pings_in_flight_.
2438   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2439 }
2440
2441 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2442                                  uint32 delta_window_size) {
2443   CHECK(in_io_loop_);
2444
2445   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2446   net_log_.AddEvent(
2447       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2448       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2449                  stream_id, delta_window_size));
2450
2451   if (stream_id == kSessionFlowControlStreamId) {
2452     // WINDOW_UPDATE for the session.
2453     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2454       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2455                    << "session flow control is not turned on";
2456       // TODO(akalin): Record an error and close the session.
2457       return;
2458     }
2459
2460     if (delta_window_size < 1u) {
2461       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2462       DoDrainSession(
2463           ERR_SPDY_PROTOCOL_ERROR,
2464           "Received WINDOW_UPDATE with an invalid delta_window_size " +
2465               base::UintToString(delta_window_size));
2466       return;
2467     }
2468
2469     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2470   } else {
2471     // WINDOW_UPDATE for a stream.
2472     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2473       // TODO(akalin): Record an error and close the session.
2474       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2475                    << " when flow control is not turned on";
2476       return;
2477     }
2478
2479     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2480
2481     if (it == active_streams_.end()) {
2482       // NOTE:  it may just be that the stream was cancelled.
2483       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2484       return;
2485     }
2486
2487     SpdyStream* stream = it->second.stream;
2488     CHECK_EQ(stream->stream_id(), stream_id);
2489
2490     if (delta_window_size < 1u) {
2491       ResetStreamIterator(it,
2492                           RST_STREAM_FLOW_CONTROL_ERROR,
2493                           base::StringPrintf(
2494                               "Received WINDOW_UPDATE with an invalid "
2495                               "delta_window_size %ud", delta_window_size));
2496       return;
2497     }
2498
2499     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2500     it->second.stream->IncreaseSendWindowSize(
2501         static_cast<int32>(delta_window_size));
2502   }
2503 }
2504
2505 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2506                                       SpdyStreamId associated_stream_id,
2507                                       SpdyPriority priority,
2508                                       const SpdyHeaderBlock& headers) {
2509   // Server-initiated streams should have even sequence numbers.
2510   if ((stream_id & 0x1) != 0) {
2511     LOG(WARNING) << "Received invalid push stream id " << stream_id;
2512     return false;
2513   }
2514
2515   if (IsStreamActive(stream_id)) {
2516     LOG(WARNING) << "Received push for active stream " << stream_id;
2517     return false;
2518   }
2519
2520   RequestPriority request_priority =
2521       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2522
2523   if (availability_state_ == STATE_GOING_AWAY) {
2524     // TODO(akalin): This behavior isn't in the SPDY spec, although it
2525     // probably should be.
2526     EnqueueResetStreamFrame(stream_id,
2527                             request_priority,
2528                             RST_STREAM_REFUSED_STREAM,
2529                             "push stream request received when going away");
2530     return false;
2531   }
2532
2533   if (associated_stream_id == 0) {
2534     // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2535     // session going away. We should never get here.
2536     CHECK_GT(SPDY4, GetProtocolVersion());
2537     std::string description = base::StringPrintf(
2538         "Received invalid associated stream id %d for pushed stream %d",
2539         associated_stream_id,
2540         stream_id);
2541     EnqueueResetStreamFrame(
2542         stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2543     return false;
2544   }
2545
2546   streams_pushed_count_++;
2547
2548   // TODO(mbelshe): DCHECK that this is a GET method?
2549
2550   // Verify that the response had a URL for us.
2551   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2552   if (!gurl.is_valid()) {
2553     EnqueueResetStreamFrame(stream_id,
2554                             request_priority,
2555                             RST_STREAM_PROTOCOL_ERROR,
2556                             "Pushed stream url was invalid: " + gurl.spec());
2557     return false;
2558   }
2559
2560   // Verify we have a valid stream association.
2561   ActiveStreamMap::iterator associated_it =
2562       active_streams_.find(associated_stream_id);
2563   if (associated_it == active_streams_.end()) {
2564     EnqueueResetStreamFrame(
2565         stream_id,
2566         request_priority,
2567         RST_STREAM_INVALID_STREAM,
2568         base::StringPrintf("Received push for inactive associated stream %d",
2569                            associated_stream_id));
2570     return false;
2571   }
2572
2573   // Check that the pushed stream advertises the same origin as its associated
2574   // stream. Bypass this check if and only if this session is with a SPDY proxy
2575   // that is trusted explicitly via the --trusted-spdy-proxy switch.
2576   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2577     // Disallow pushing of HTTPS content.
2578     if (gurl.SchemeIs("https")) {
2579       EnqueueResetStreamFrame(
2580           stream_id,
2581           request_priority,
2582           RST_STREAM_REFUSED_STREAM,
2583           base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2584                              associated_stream_id));
2585     }
2586   } else {
2587     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2588     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2589       EnqueueResetStreamFrame(
2590           stream_id,
2591           request_priority,
2592           RST_STREAM_REFUSED_STREAM,
2593           base::StringPrintf("Rejected Cross Origin Push Stream %d",
2594                              associated_stream_id));
2595       return false;
2596     }
2597   }
2598
2599   // There should not be an existing pushed stream with the same path.
2600   PushedStreamMap::iterator pushed_it =
2601       unclaimed_pushed_streams_.lower_bound(gurl);
2602   if (pushed_it != unclaimed_pushed_streams_.end() &&
2603       pushed_it->first == gurl) {
2604     EnqueueResetStreamFrame(
2605         stream_id,
2606         request_priority,
2607         RST_STREAM_PROTOCOL_ERROR,
2608         "Received duplicate pushed stream with url: " + gurl.spec());
2609     return false;
2610   }
2611
2612   scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2613                                                GetWeakPtr(),
2614                                                gurl,
2615                                                request_priority,
2616                                                stream_initial_send_window_size_,
2617                                                stream_initial_recv_window_size_,
2618                                                net_log_));
2619   stream->set_stream_id(stream_id);
2620
2621   // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2622   if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2623     associated_it->second.stream->IncrementRawReceivedBytes(
2624         last_compressed_frame_len_);
2625   } else {
2626     stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2627   }
2628
2629   last_compressed_frame_len_ = 0;
2630
2631   DeleteExpiredPushedStreams();
2632   PushedStreamMap::iterator inserted_pushed_it =
2633       unclaimed_pushed_streams_.insert(
2634           pushed_it,
2635           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2636   DCHECK(inserted_pushed_it != pushed_it);
2637
2638   InsertActivatedStream(stream.Pass());
2639
2640   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2641   if (active_it == active_streams_.end()) {
2642     NOTREACHED();
2643     return false;
2644   }
2645
2646   active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2647   DCHECK(active_it->second.stream->IsReservedRemote());
2648   num_pushed_streams_++;
2649   return true;
2650 }
2651
2652 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2653                                 SpdyStreamId promised_stream_id,
2654                                 const SpdyHeaderBlock& headers) {
2655   CHECK(in_io_loop_);
2656
2657   if (net_log_.IsLogging()) {
2658     net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2659                       base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2660                                  &headers,
2661                                  stream_id,
2662                                  promised_stream_id));
2663   }
2664
2665   // Any priority will do.
2666   // TODO(baranovich): pass parent stream id priority?
2667   if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2668     return;
2669
2670   base::StatsCounter push_requests("spdy.pushed_streams");
2671   push_requests.Increment();
2672 }
2673
2674 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2675                                          uint32 delta_window_size) {
2676   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2677   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2678   CHECK(it != active_streams_.end());
2679   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2680   SendWindowUpdateFrame(
2681       stream_id, delta_window_size, it->second.stream->priority());
2682 }
2683
2684 void SpdySession::SendInitialData() {
2685   DCHECK(enable_sending_initial_data_);
2686
2687   if (send_connection_header_prefix_) {
2688     DCHECK_EQ(protocol_, kProtoSPDY4);
2689     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2690         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2691                       kHttp2ConnectionHeaderPrefixSize,
2692                       false /* take_ownership */));
2693     // Count the prefix as part of the subsequent SETTINGS frame.
2694     EnqueueSessionWrite(HIGHEST, SETTINGS,
2695                         connection_header_prefix_frame.Pass());
2696   }
2697
2698   // First, notify the server about the settings they should use when
2699   // communicating with us.
2700   SettingsMap settings_map;
2701   // Create a new settings frame notifying the server of our
2702   // max concurrent streams and initial window size.
2703   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2704       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2705   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2706       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2707     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2708         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2709                               stream_initial_recv_window_size_);
2710   }
2711   SendSettings(settings_map);
2712
2713   // Next, notify the server about our initial recv window size.
2714   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2715     // Bump up the receive window size to the real initial value. This
2716     // has to go here since the WINDOW_UPDATE frame sent by
2717     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2718     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2719     // This condition implies that |kDefaultInitialRecvWindowSize| -
2720     // |session_recv_window_size_| doesn't overflow.
2721     DCHECK_GT(session_recv_window_size_, 0);
2722     IncreaseRecvWindowSize(
2723         kDefaultInitialRecvWindowSize - session_recv_window_size_);
2724   }
2725
2726   // Finally, notify the server about the settings they have
2727   // previously told us to use when communicating with them (after
2728   // applying them).
2729   const SettingsMap& server_settings_map =
2730       http_server_properties_->GetSpdySettings(host_port_pair());
2731   if (server_settings_map.empty())
2732     return;
2733
2734   SettingsMap::const_iterator it =
2735       server_settings_map.find(SETTINGS_CURRENT_CWND);
2736   uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2737   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2738
2739   for (SettingsMap::const_iterator it = server_settings_map.begin();
2740        it != server_settings_map.end(); ++it) {
2741     const SpdySettingsIds new_id = it->first;
2742     const uint32 new_val = it->second.second;
2743     HandleSetting(new_id, new_val);
2744   }
2745
2746   SendSettings(server_settings_map);
2747 }
2748
2749
2750 void SpdySession::SendSettings(const SettingsMap& settings) {
2751   net_log_.AddEvent(
2752       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2753       base::Bind(&NetLogSpdySendSettingsCallback, &settings));
2754
2755   // Create the SETTINGS frame and send it.
2756   DCHECK(buffered_spdy_framer_.get());
2757   scoped_ptr<SpdyFrame> settings_frame(
2758       buffered_spdy_framer_->CreateSettings(settings));
2759   sent_settings_ = true;
2760   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2761 }
2762
2763 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2764   switch (id) {
2765     case SETTINGS_MAX_CONCURRENT_STREAMS:
2766       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2767                                          kMaxConcurrentStreamLimit);
2768       ProcessPendingStreamRequests();
2769       break;
2770     case SETTINGS_INITIAL_WINDOW_SIZE: {
2771       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2772         net_log().AddEvent(
2773             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2774         return;
2775       }
2776
2777       if (value > static_cast<uint32>(kint32max)) {
2778         net_log().AddEvent(
2779             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2780             NetLog::IntegerCallback("initial_window_size", value));
2781         return;
2782       }
2783
2784       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2785       int32 delta_window_size =
2786           static_cast<int32>(value) - stream_initial_send_window_size_;
2787       stream_initial_send_window_size_ = static_cast<int32>(value);
2788       UpdateStreamsSendWindowSize(delta_window_size);
2789       net_log().AddEvent(
2790           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2791           NetLog::IntegerCallback("delta_window_size", delta_window_size));
2792       break;
2793     }
2794   }
2795 }
2796
2797 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2798   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2799   for (ActiveStreamMap::iterator it = active_streams_.begin();
2800        it != active_streams_.end(); ++it) {
2801     it->second.stream->AdjustSendWindowSize(delta_window_size);
2802   }
2803
2804   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2805        it != created_streams_.end(); it++) {
2806     (*it)->AdjustSendWindowSize(delta_window_size);
2807   }
2808 }
2809
2810 void SpdySession::SendPrefacePingIfNoneInFlight() {
2811   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2812     return;
2813
2814   base::TimeTicks now = time_func_();
2815   // If there is no activity in the session, then send a preface-PING.
2816   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2817     SendPrefacePing();
2818 }
2819
2820 void SpdySession::SendPrefacePing() {
2821   WritePingFrame(next_ping_id_, false);
2822 }
2823
2824 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2825                                         uint32 delta_window_size,
2826                                         RequestPriority priority) {
2827   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2828   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2829   if (it != active_streams_.end()) {
2830     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2831   } else {
2832     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2833     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2834   }
2835
2836   net_log_.AddEvent(
2837       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2838       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2839                  stream_id, delta_window_size));
2840
2841   DCHECK(buffered_spdy_framer_.get());
2842   scoped_ptr<SpdyFrame> window_update_frame(
2843       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2844   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2845 }
2846
2847 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2848   DCHECK(buffered_spdy_framer_.get());
2849   scoped_ptr<SpdyFrame> ping_frame(
2850       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2851   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2852
2853   if (net_log().IsLogging()) {
2854     net_log().AddEvent(
2855         NetLog::TYPE_SPDY_SESSION_PING,
2856         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2857   }
2858   if (!is_ack) {
2859     next_ping_id_ += 2;
2860     ++pings_in_flight_;
2861     PlanToCheckPingStatus();
2862     last_ping_sent_time_ = time_func_();
2863   }
2864 }
2865
2866 void SpdySession::PlanToCheckPingStatus() {
2867   if (check_ping_status_pending_)
2868     return;
2869
2870   check_ping_status_pending_ = true;
2871   base::MessageLoop::current()->PostDelayedTask(
2872       FROM_HERE,
2873       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2874                  time_func_()), hung_interval_);
2875 }
2876
2877 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2878   CHECK(!in_io_loop_);
2879
2880   // Check if we got a response back for all PINGs we had sent.
2881   if (pings_in_flight_ == 0) {
2882     check_ping_status_pending_ = false;
2883     return;
2884   }
2885
2886   DCHECK(check_ping_status_pending_);
2887
2888   base::TimeTicks now = time_func_();
2889   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2890
2891   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2892     // Track all failed PING messages in a separate bucket.
2893     RecordPingRTTHistogram(base::TimeDelta::Max());
2894     DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2895     return;
2896   }
2897
2898   // Check the status of connection after a delay.
2899   base::MessageLoop::current()->PostDelayedTask(
2900       FROM_HERE,
2901       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2902                  now),
2903       delay);
2904 }
2905
2906 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2907   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2908 }
2909
2910 void SpdySession::RecordProtocolErrorHistogram(
2911     SpdyProtocolErrorDetails details) {
2912   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2913                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2914   if (EndsWith(host_port_pair().host(), "google.com", false)) {
2915     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2916                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2917   }
2918 }
2919
2920 void SpdySession::RecordHistograms() {
2921   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2922                               streams_initiated_count_,
2923                               0, 300, 50);
2924   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2925                               streams_pushed_count_,
2926                               0, 300, 50);
2927   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2928                               streams_pushed_and_claimed_count_,
2929                               0, 300, 50);
2930   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2931                               streams_abandoned_count_,
2932                               0, 300, 50);
2933   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2934                             sent_settings_ ? 1 : 0, 2);
2935   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2936                             received_settings_ ? 1 : 0, 2);
2937   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2938                               stalled_streams_,
2939                               0, 300, 50);
2940   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
2941                             stalled_streams_ > 0 ? 1 : 0, 2);
2942
2943   if (received_settings_) {
2944     // Enumerate the saved settings, and set histograms for it.
2945     const SettingsMap& settings_map =
2946         http_server_properties_->GetSpdySettings(host_port_pair());
2947
2948     SettingsMap::const_iterator it;
2949     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
2950       const SpdySettingsIds id = it->first;
2951       const uint32 val = it->second.second;
2952       switch (id) {
2953         case SETTINGS_CURRENT_CWND:
2954           // Record several different histograms to see if cwnd converges
2955           // for larger volumes of data being sent.
2956           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
2957                                       val, 1, 200, 100);
2958           if (total_bytes_received_ > 10 * 1024) {
2959             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
2960                                         val, 1, 200, 100);
2961             if (total_bytes_received_ > 25 * 1024) {
2962               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
2963                                           val, 1, 200, 100);
2964               if (total_bytes_received_ > 50 * 1024) {
2965                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
2966                                             val, 1, 200, 100);
2967                 if (total_bytes_received_ > 100 * 1024) {
2968                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
2969                                               val, 1, 200, 100);
2970                 }
2971               }
2972             }
2973           }
2974           break;
2975         case SETTINGS_ROUND_TRIP_TIME:
2976           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
2977                                       val, 1, 1200, 100);
2978           break;
2979         case SETTINGS_DOWNLOAD_RETRANS_RATE:
2980           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
2981                                       val, 1, 100, 50);
2982           break;
2983         default:
2984           break;
2985       }
2986     }
2987   }
2988 }
2989
2990 void SpdySession::CompleteStreamRequest(
2991     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
2992   // Abort if the request has already been cancelled.
2993   if (!pending_request)
2994     return;
2995
2996   base::WeakPtr<SpdyStream> stream;
2997   int rv = TryCreateStream(pending_request, &stream);
2998
2999   if (rv == OK) {
3000     DCHECK(stream);
3001     pending_request->OnRequestCompleteSuccess(stream);
3002     return;
3003   }
3004   DCHECK(!stream);
3005
3006   if (rv != ERR_IO_PENDING) {
3007     pending_request->OnRequestCompleteFailure(rv);
3008   }
3009 }
3010
3011 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3012   if (!is_secure_)
3013     return NULL;
3014   SSLClientSocket* ssl_socket =
3015       reinterpret_cast<SSLClientSocket*>(connection_->socket());
3016   DCHECK(ssl_socket);
3017   return ssl_socket;
3018 }
3019
3020 void SpdySession::OnWriteBufferConsumed(
3021     size_t frame_payload_size,
3022     size_t consume_size,
3023     SpdyBuffer::ConsumeSource consume_source) {
3024   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3025   // deleted (e.g., a stream is closed due to incoming data).
3026
3027   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3028
3029   if (consume_source == SpdyBuffer::DISCARD) {
3030     // If we're discarding a frame or part of it, increase the send
3031     // window by the number of discarded bytes. (Although if we're
3032     // discarding part of a frame, it's probably because of a write
3033     // error and we'll be tearing down the session soon.)
3034     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3035     DCHECK_GT(remaining_payload_bytes, 0u);
3036     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3037   }
3038   // For consumed bytes, the send window is increased when we receive
3039   // a WINDOW_UPDATE frame.
3040 }
3041
3042 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3043   // We can be called with |in_io_loop_| set if a SpdyBuffer is
3044   // deleted (e.g., a stream is closed due to incoming data).
3045
3046   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3047   DCHECK_GE(delta_window_size, 1);
3048
3049   // Check for overflow.
3050   int32 max_delta_window_size = kint32max - session_send_window_size_;
3051   if (delta_window_size > max_delta_window_size) {
3052     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3053     DoDrainSession(
3054         ERR_SPDY_PROTOCOL_ERROR,
3055         "Received WINDOW_UPDATE [delta: " +
3056             base::IntToString(delta_window_size) +
3057             "] for session overflows session_send_window_size_ [current: " +
3058             base::IntToString(session_send_window_size_) + "]");
3059     return;
3060   }
3061
3062   session_send_window_size_ += delta_window_size;
3063
3064   net_log_.AddEvent(
3065       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3066       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3067                  delta_window_size, session_send_window_size_));
3068
3069   DCHECK(!IsSendStalled());
3070   ResumeSendStalledStreams();
3071 }
3072
3073 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3074   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3075
3076   // We only call this method when sending a frame. Therefore,
3077   // |delta_window_size| should be within the valid frame size range.
3078   DCHECK_GE(delta_window_size, 1);
3079   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3080
3081   // |send_window_size_| should have been at least |delta_window_size| for
3082   // this call to happen.
3083   DCHECK_GE(session_send_window_size_, delta_window_size);
3084
3085   session_send_window_size_ -= delta_window_size;
3086
3087   net_log_.AddEvent(
3088       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3089       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3090                  -delta_window_size, session_send_window_size_));
3091 }
3092
3093 void SpdySession::OnReadBufferConsumed(
3094     size_t consume_size,
3095     SpdyBuffer::ConsumeSource consume_source) {
3096   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3097   // deleted (e.g., discarded by a SpdyReadQueue).
3098
3099   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3100   DCHECK_GE(consume_size, 1u);
3101   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3102
3103   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3104 }
3105
3106 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3107   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3108   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3109   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3110   DCHECK_GE(delta_window_size, 1);
3111   // Check for overflow.
3112   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3113
3114   session_recv_window_size_ += delta_window_size;
3115   net_log_.AddEvent(
3116       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3117       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3118                  delta_window_size, session_recv_window_size_));
3119
3120   session_unacked_recv_window_bytes_ += delta_window_size;
3121   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3122     SendWindowUpdateFrame(kSessionFlowControlStreamId,
3123                           session_unacked_recv_window_bytes_,
3124                           HIGHEST);
3125     session_unacked_recv_window_bytes_ = 0;
3126   }
3127 }
3128
3129 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3130   CHECK(in_io_loop_);
3131   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3132   DCHECK_GE(delta_window_size, 1);
3133
3134   // Since we never decrease the initial receive window size,
3135   // |delta_window_size| should never cause |recv_window_size_| to go
3136   // negative. If we do, the receive window isn't being respected.
3137   if (delta_window_size > session_recv_window_size_) {
3138     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3139     DoDrainSession(
3140         ERR_SPDY_FLOW_CONTROL_ERROR,
3141         "delta_window_size is " + base::IntToString(delta_window_size) +
3142             " in DecreaseRecvWindowSize, which is larger than the receive " +
3143             "window size of " + base::IntToString(session_recv_window_size_));
3144     return;
3145   }
3146
3147   session_recv_window_size_ -= delta_window_size;
3148   net_log_.AddEvent(
3149       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3150       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3151                  -delta_window_size, session_recv_window_size_));
3152 }
3153
3154 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3155   DCHECK(stream.send_stalled_by_flow_control());
3156   RequestPriority priority = stream.priority();
3157   CHECK_GE(priority, MINIMUM_PRIORITY);
3158   CHECK_LE(priority, MAXIMUM_PRIORITY);
3159   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3160 }
3161
3162 void SpdySession::ResumeSendStalledStreams() {
3163   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3164
3165   // We don't have to worry about new streams being queued, since
3166   // doing so would cause IsSendStalled() to return true. But we do
3167   // have to worry about streams being closed, as well as ourselves
3168   // being closed.
3169
3170   while (!IsSendStalled()) {
3171     size_t old_size = 0;
3172 #if DCHECK_IS_ON
3173     old_size = GetTotalSize(stream_send_unstall_queue_);
3174 #endif
3175
3176     SpdyStreamId stream_id = PopStreamToPossiblyResume();
3177     if (stream_id == 0)
3178       break;
3179     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3180     // The stream may actually still be send-stalled after this (due
3181     // to its own send window) but that's okay -- it'll then be
3182     // resumed once its send window increases.
3183     if (it != active_streams_.end())
3184       it->second.stream->PossiblyResumeIfSendStalled();
3185
3186     // The size should decrease unless we got send-stalled again.
3187     if (!IsSendStalled())
3188       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3189   }
3190 }
3191
3192 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3193   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3194     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3195     if (!queue->empty()) {
3196       SpdyStreamId stream_id = queue->front();
3197       queue->pop_front();
3198       return stream_id;
3199     }
3200   }
3201   return 0;
3202 }
3203
3204 }  // namespace net