Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / net / spdy / spdy_session.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_session.h"
6
7 #include <algorithm>
8 #include <map>
9
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/field_trial.h"
16 #include "base/metrics/histogram.h"
17 #include "base/metrics/sparse_histogram.h"
18 #include "base/metrics/stats_counters.h"
19 #include "base/profiler/scoped_tracker.h"
20 #include "base/stl_util.h"
21 #include "base/strings/string_number_conversions.h"
22 #include "base/strings/string_util.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/strings/utf_string_conversions.h"
25 #include "base/time/time.h"
26 #include "base/values.h"
27 #include "crypto/ec_private_key.h"
28 #include "crypto/ec_signature_creator.h"
29 #include "net/base/connection_type_histograms.h"
30 #include "net/base/net_log.h"
31 #include "net/base/net_util.h"
32 #include "net/cert/asn1_util.h"
33 #include "net/cert/cert_verify_result.h"
34 #include "net/http/http_log_util.h"
35 #include "net/http/http_network_session.h"
36 #include "net/http/http_server_properties.h"
37 #include "net/http/http_util.h"
38 #include "net/http/transport_security_state.h"
39 #include "net/socket/ssl_client_socket.h"
40 #include "net/spdy/spdy_buffer_producer.h"
41 #include "net/spdy/spdy_frame_builder.h"
42 #include "net/spdy/spdy_http_utils.h"
43 #include "net/spdy/spdy_protocol.h"
44 #include "net/spdy/spdy_session_pool.h"
45 #include "net/spdy/spdy_stream.h"
46 #include "net/ssl/channel_id_service.h"
47 #include "net/ssl/ssl_cipher_suite_names.h"
48 #include "net/ssl/ssl_connection_status_flags.h"
49
50 namespace net {
51
52 namespace {
53
54 const int kReadBufferSize = 8 * 1024;
55 const int kDefaultConnectionAtRiskOfLossSeconds = 10;
56 const int kHungIntervalSeconds = 10;
57
58 // Minimum seconds that unclaimed pushed streams will be kept in memory.
59 const int kMinPushedStreamLifetimeSeconds = 300;
60
61 scoped_ptr<base::ListValue> SpdyHeaderBlockToListValue(
62     const SpdyHeaderBlock& headers,
63     net::NetLog::LogLevel log_level) {
64   scoped_ptr<base::ListValue> headers_list(new base::ListValue());
65   for (SpdyHeaderBlock::const_iterator it = headers.begin();
66        it != headers.end(); ++it) {
67     headers_list->AppendString(
68         it->first + ": " +
69         ElideHeaderValueForNetLog(log_level, it->first, it->second));
70   }
71   return headers_list.Pass();
72 }
73
74 base::Value* NetLogSpdySynStreamSentCallback(const SpdyHeaderBlock* headers,
75                                              bool fin,
76                                              bool unidirectional,
77                                              SpdyPriority spdy_priority,
78                                              SpdyStreamId stream_id,
79                                              NetLog::LogLevel log_level) {
80   base::DictionaryValue* dict = new base::DictionaryValue();
81   dict->Set("headers",
82             SpdyHeaderBlockToListValue(*headers, log_level).release());
83   dict->SetBoolean("fin", fin);
84   dict->SetBoolean("unidirectional", unidirectional);
85   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
86   dict->SetInteger("stream_id", stream_id);
87   return dict;
88 }
89
90 base::Value* NetLogSpdySynStreamReceivedCallback(
91     const SpdyHeaderBlock* headers,
92     bool fin,
93     bool unidirectional,
94     SpdyPriority spdy_priority,
95     SpdyStreamId stream_id,
96     SpdyStreamId associated_stream,
97     NetLog::LogLevel log_level) {
98   base::DictionaryValue* dict = new base::DictionaryValue();
99   dict->Set("headers",
100             SpdyHeaderBlockToListValue(*headers, log_level).release());
101   dict->SetBoolean("fin", fin);
102   dict->SetBoolean("unidirectional", unidirectional);
103   dict->SetInteger("spdy_priority", static_cast<int>(spdy_priority));
104   dict->SetInteger("stream_id", stream_id);
105   dict->SetInteger("associated_stream", associated_stream);
106   return dict;
107 }
108
109 base::Value* NetLogSpdySynReplyOrHeadersReceivedCallback(
110     const SpdyHeaderBlock* headers,
111     bool fin,
112     SpdyStreamId stream_id,
113     NetLog::LogLevel log_level) {
114   base::DictionaryValue* dict = new base::DictionaryValue();
115   dict->Set("headers",
116             SpdyHeaderBlockToListValue(*headers, log_level).release());
117   dict->SetBoolean("fin", fin);
118   dict->SetInteger("stream_id", stream_id);
119   return dict;
120 }
121
122 base::Value* NetLogSpdySessionCloseCallback(int net_error,
123                                             const std::string* description,
124                                             NetLog::LogLevel /* log_level */) {
125   base::DictionaryValue* dict = new base::DictionaryValue();
126   dict->SetInteger("net_error", net_error);
127   dict->SetString("description", *description);
128   return dict;
129 }
130
131 base::Value* NetLogSpdySessionCallback(const HostPortProxyPair* host_pair,
132                                        NetLog::LogLevel /* log_level */) {
133   base::DictionaryValue* dict = new base::DictionaryValue();
134   dict->SetString("host", host_pair->first.ToString());
135   dict->SetString("proxy", host_pair->second.ToPacString());
136   return dict;
137 }
138
139 base::Value* NetLogSpdyInitializedCallback(NetLog::Source source,
140                                            const NextProto protocol_version,
141                                            NetLog::LogLevel /* log_level */) {
142   base::DictionaryValue* dict = new base::DictionaryValue();
143   if (source.IsValid()) {
144     source.AddToEventParameters(dict);
145   }
146   dict->SetString("protocol",
147                   SSLClientSocket::NextProtoToString(protocol_version));
148   return dict;
149 }
150
151 base::Value* NetLogSpdySettingsCallback(const HostPortPair& host_port_pair,
152                                         bool clear_persisted,
153                                         NetLog::LogLevel /* log_level */) {
154   base::DictionaryValue* dict = new base::DictionaryValue();
155   dict->SetString("host", host_port_pair.ToString());
156   dict->SetBoolean("clear_persisted", clear_persisted);
157   return dict;
158 }
159
160 base::Value* NetLogSpdySettingCallback(SpdySettingsIds id,
161                                        const SpdyMajorVersion protocol_version,
162                                        SpdySettingsFlags flags,
163                                        uint32 value,
164                                        NetLog::LogLevel /* log_level */) {
165   base::DictionaryValue* dict = new base::DictionaryValue();
166   dict->SetInteger("id",
167                    SpdyConstants::SerializeSettingId(protocol_version, id));
168   dict->SetInteger("flags", flags);
169   dict->SetInteger("value", value);
170   return dict;
171 }
172
173 base::Value* NetLogSpdySendSettingsCallback(
174     const SettingsMap* settings,
175     const SpdyMajorVersion protocol_version,
176     NetLog::LogLevel /* log_level */) {
177   base::DictionaryValue* dict = new base::DictionaryValue();
178   base::ListValue* settings_list = new base::ListValue();
179   for (SettingsMap::const_iterator it = settings->begin();
180        it != settings->end(); ++it) {
181     const SpdySettingsIds id = it->first;
182     const SpdySettingsFlags flags = it->second.first;
183     const uint32 value = it->second.second;
184     settings_list->Append(new base::StringValue(base::StringPrintf(
185         "[id:%u flags:%u value:%u]",
186         SpdyConstants::SerializeSettingId(protocol_version, id),
187         flags,
188         value)));
189   }
190   dict->Set("settings", settings_list);
191   return dict;
192 }
193
194 base::Value* NetLogSpdyWindowUpdateFrameCallback(
195     SpdyStreamId stream_id,
196     uint32 delta,
197     NetLog::LogLevel /* log_level */) {
198   base::DictionaryValue* dict = new base::DictionaryValue();
199   dict->SetInteger("stream_id", static_cast<int>(stream_id));
200   dict->SetInteger("delta", delta);
201   return dict;
202 }
203
204 base::Value* NetLogSpdySessionWindowUpdateCallback(
205     int32 delta,
206     int32 window_size,
207     NetLog::LogLevel /* log_level */) {
208   base::DictionaryValue* dict = new base::DictionaryValue();
209   dict->SetInteger("delta", delta);
210   dict->SetInteger("window_size", window_size);
211   return dict;
212 }
213
214 base::Value* NetLogSpdyDataCallback(SpdyStreamId stream_id,
215                                     int size,
216                                     bool fin,
217                                     NetLog::LogLevel /* log_level */) {
218   base::DictionaryValue* dict = new base::DictionaryValue();
219   dict->SetInteger("stream_id", static_cast<int>(stream_id));
220   dict->SetInteger("size", size);
221   dict->SetBoolean("fin", fin);
222   return dict;
223 }
224
225 base::Value* NetLogSpdyRstCallback(SpdyStreamId stream_id,
226                                    int status,
227                                    const std::string* description,
228                                    NetLog::LogLevel /* log_level */) {
229   base::DictionaryValue* dict = new base::DictionaryValue();
230   dict->SetInteger("stream_id", static_cast<int>(stream_id));
231   dict->SetInteger("status", status);
232   dict->SetString("description", *description);
233   return dict;
234 }
235
236 base::Value* NetLogSpdyPingCallback(SpdyPingId unique_id,
237                                     bool is_ack,
238                                     const char* type,
239                                     NetLog::LogLevel /* log_level */) {
240   base::DictionaryValue* dict = new base::DictionaryValue();
241   dict->SetInteger("unique_id", unique_id);
242   dict->SetString("type", type);
243   dict->SetBoolean("is_ack", is_ack);
244   return dict;
245 }
246
247 base::Value* NetLogSpdyGoAwayCallback(SpdyStreamId last_stream_id,
248                                       int active_streams,
249                                       int unclaimed_streams,
250                                       SpdyGoAwayStatus status,
251                                       NetLog::LogLevel /* log_level */) {
252   base::DictionaryValue* dict = new base::DictionaryValue();
253   dict->SetInteger("last_accepted_stream_id",
254                    static_cast<int>(last_stream_id));
255   dict->SetInteger("active_streams", active_streams);
256   dict->SetInteger("unclaimed_streams", unclaimed_streams);
257   dict->SetInteger("status", static_cast<int>(status));
258   return dict;
259 }
260
261 base::Value* NetLogSpdyPushPromiseReceivedCallback(
262     const SpdyHeaderBlock* headers,
263     SpdyStreamId stream_id,
264     SpdyStreamId promised_stream_id,
265     NetLog::LogLevel log_level) {
266   base::DictionaryValue* dict = new base::DictionaryValue();
267   dict->Set("headers",
268             SpdyHeaderBlockToListValue(*headers, log_level).release());
269   dict->SetInteger("id", stream_id);
270   dict->SetInteger("promised_stream_id", promised_stream_id);
271   return dict;
272 }
273
274 // Helper function to return the total size of an array of objects
275 // with .size() member functions.
276 template <typename T, size_t N> size_t GetTotalSize(const T (&arr)[N]) {
277   size_t total_size = 0;
278   for (size_t i = 0; i < N; ++i) {
279     total_size += arr[i].size();
280   }
281   return total_size;
282 }
283
284 // Helper class for std:find_if on STL container containing
285 // SpdyStreamRequest weak pointers.
286 class RequestEquals {
287  public:
288   RequestEquals(const base::WeakPtr<SpdyStreamRequest>& request)
289       : request_(request) {}
290
291   bool operator()(const base::WeakPtr<SpdyStreamRequest>& request) const {
292     return request_.get() == request.get();
293   }
294
295  private:
296   const base::WeakPtr<SpdyStreamRequest> request_;
297 };
298
299 // The maximum number of concurrent streams we will ever create.  Even if
300 // the server permits more, we will never exceed this limit.
301 const size_t kMaxConcurrentStreamLimit = 256;
302
303 }  // namespace
304
305 SpdyProtocolErrorDetails MapFramerErrorToProtocolError(
306     SpdyFramer::SpdyError err) {
307   switch(err) {
308     case SpdyFramer::SPDY_NO_ERROR:
309       return SPDY_ERROR_NO_ERROR;
310     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
311       return SPDY_ERROR_INVALID_CONTROL_FRAME;
312     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
313       return SPDY_ERROR_CONTROL_PAYLOAD_TOO_LARGE;
314     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
315       return SPDY_ERROR_ZLIB_INIT_FAILURE;
316     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
317       return SPDY_ERROR_UNSUPPORTED_VERSION;
318     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
319       return SPDY_ERROR_DECOMPRESS_FAILURE;
320     case SpdyFramer::SPDY_COMPRESS_FAILURE:
321       return SPDY_ERROR_COMPRESS_FAILURE;
322     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
323       return SPDY_ERROR_GOAWAY_FRAME_CORRUPT;
324     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
325       return SPDY_ERROR_RST_STREAM_FRAME_CORRUPT;
326     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
327       return SPDY_ERROR_INVALID_DATA_FRAME_FLAGS;
328     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
329       return SPDY_ERROR_INVALID_CONTROL_FRAME_FLAGS;
330     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
331       return SPDY_ERROR_UNEXPECTED_FRAME;
332     default:
333       NOTREACHED();
334       return static_cast<SpdyProtocolErrorDetails>(-1);
335   }
336 }
337
338 Error MapFramerErrorToNetError(SpdyFramer::SpdyError err) {
339   switch (err) {
340     case SpdyFramer::SPDY_NO_ERROR:
341       return OK;
342     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME:
343       return ERR_SPDY_PROTOCOL_ERROR;
344     case SpdyFramer::SPDY_CONTROL_PAYLOAD_TOO_LARGE:
345       return ERR_SPDY_FRAME_SIZE_ERROR;
346     case SpdyFramer::SPDY_ZLIB_INIT_FAILURE:
347       return ERR_SPDY_COMPRESSION_ERROR;
348     case SpdyFramer::SPDY_UNSUPPORTED_VERSION:
349       return ERR_SPDY_PROTOCOL_ERROR;
350     case SpdyFramer::SPDY_DECOMPRESS_FAILURE:
351       return ERR_SPDY_COMPRESSION_ERROR;
352     case SpdyFramer::SPDY_COMPRESS_FAILURE:
353       return ERR_SPDY_COMPRESSION_ERROR;
354     case SpdyFramer::SPDY_GOAWAY_FRAME_CORRUPT:
355       return ERR_SPDY_PROTOCOL_ERROR;
356     case SpdyFramer::SPDY_RST_STREAM_FRAME_CORRUPT:
357       return ERR_SPDY_PROTOCOL_ERROR;
358     case SpdyFramer::SPDY_INVALID_DATA_FRAME_FLAGS:
359       return ERR_SPDY_PROTOCOL_ERROR;
360     case SpdyFramer::SPDY_INVALID_CONTROL_FRAME_FLAGS:
361       return ERR_SPDY_PROTOCOL_ERROR;
362     case SpdyFramer::SPDY_UNEXPECTED_FRAME:
363       return ERR_SPDY_PROTOCOL_ERROR;
364     default:
365       NOTREACHED();
366       return ERR_SPDY_PROTOCOL_ERROR;
367   }
368 }
369
370 SpdyProtocolErrorDetails MapRstStreamStatusToProtocolError(
371     SpdyRstStreamStatus status) {
372   switch(status) {
373     case RST_STREAM_PROTOCOL_ERROR:
374       return STATUS_CODE_PROTOCOL_ERROR;
375     case RST_STREAM_INVALID_STREAM:
376       return STATUS_CODE_INVALID_STREAM;
377     case RST_STREAM_REFUSED_STREAM:
378       return STATUS_CODE_REFUSED_STREAM;
379     case RST_STREAM_UNSUPPORTED_VERSION:
380       return STATUS_CODE_UNSUPPORTED_VERSION;
381     case RST_STREAM_CANCEL:
382       return STATUS_CODE_CANCEL;
383     case RST_STREAM_INTERNAL_ERROR:
384       return STATUS_CODE_INTERNAL_ERROR;
385     case RST_STREAM_FLOW_CONTROL_ERROR:
386       return STATUS_CODE_FLOW_CONTROL_ERROR;
387     case RST_STREAM_STREAM_IN_USE:
388       return STATUS_CODE_STREAM_IN_USE;
389     case RST_STREAM_STREAM_ALREADY_CLOSED:
390       return STATUS_CODE_STREAM_ALREADY_CLOSED;
391     case RST_STREAM_INVALID_CREDENTIALS:
392       return STATUS_CODE_INVALID_CREDENTIALS;
393     case RST_STREAM_FRAME_SIZE_ERROR:
394       return STATUS_CODE_FRAME_SIZE_ERROR;
395     case RST_STREAM_SETTINGS_TIMEOUT:
396       return STATUS_CODE_SETTINGS_TIMEOUT;
397     case RST_STREAM_CONNECT_ERROR:
398       return STATUS_CODE_CONNECT_ERROR;
399     case RST_STREAM_ENHANCE_YOUR_CALM:
400       return STATUS_CODE_ENHANCE_YOUR_CALM;
401     default:
402       NOTREACHED();
403       return static_cast<SpdyProtocolErrorDetails>(-1);
404   }
405 }
406
407 SpdyGoAwayStatus MapNetErrorToGoAwayStatus(Error err) {
408   switch (err) {
409     case OK:
410       return GOAWAY_NO_ERROR;
411     case ERR_SPDY_PROTOCOL_ERROR:
412       return GOAWAY_PROTOCOL_ERROR;
413     case ERR_SPDY_FLOW_CONTROL_ERROR:
414       return GOAWAY_FLOW_CONTROL_ERROR;
415     case ERR_SPDY_FRAME_SIZE_ERROR:
416       return GOAWAY_FRAME_SIZE_ERROR;
417     case ERR_SPDY_COMPRESSION_ERROR:
418       return GOAWAY_COMPRESSION_ERROR;
419     case ERR_SPDY_INADEQUATE_TRANSPORT_SECURITY:
420       return GOAWAY_INADEQUATE_SECURITY;
421     default:
422       return GOAWAY_PROTOCOL_ERROR;
423   }
424 }
425
426 void SplitPushedHeadersToRequestAndResponse(const SpdyHeaderBlock& headers,
427                                             SpdyMajorVersion protocol_version,
428                                             SpdyHeaderBlock* request_headers,
429                                             SpdyHeaderBlock* response_headers) {
430   DCHECK(response_headers);
431   DCHECK(request_headers);
432   for (SpdyHeaderBlock::const_iterator it = headers.begin();
433        it != headers.end();
434        ++it) {
435     SpdyHeaderBlock* to_insert = response_headers;
436     if (protocol_version == SPDY2) {
437       if (it->first == "url")
438         to_insert = request_headers;
439     } else {
440       const char* host = protocol_version >= SPDY4 ? ":authority" : ":host";
441       static const char* scheme = ":scheme";
442       static const char* path = ":path";
443       if (it->first == host || it->first == scheme || it->first == path)
444         to_insert = request_headers;
445     }
446     to_insert->insert(*it);
447   }
448 }
449
450 SpdyStreamRequest::SpdyStreamRequest() : weak_ptr_factory_(this) {
451   Reset();
452 }
453
454 SpdyStreamRequest::~SpdyStreamRequest() {
455   CancelRequest();
456 }
457
458 int SpdyStreamRequest::StartRequest(
459     SpdyStreamType type,
460     const base::WeakPtr<SpdySession>& session,
461     const GURL& url,
462     RequestPriority priority,
463     const BoundNetLog& net_log,
464     const CompletionCallback& callback) {
465   DCHECK(session);
466   DCHECK(!session_);
467   DCHECK(!stream_);
468   DCHECK(callback_.is_null());
469
470   type_ = type;
471   session_ = session;
472   url_ = url;
473   priority_ = priority;
474   net_log_ = net_log;
475   callback_ = callback;
476
477   base::WeakPtr<SpdyStream> stream;
478   int rv = session->TryCreateStream(weak_ptr_factory_.GetWeakPtr(), &stream);
479   if (rv == OK) {
480     Reset();
481     stream_ = stream;
482   }
483   return rv;
484 }
485
486 void SpdyStreamRequest::CancelRequest() {
487   if (session_)
488     session_->CancelStreamRequest(weak_ptr_factory_.GetWeakPtr());
489   Reset();
490   // Do this to cancel any pending CompleteStreamRequest() tasks.
491   weak_ptr_factory_.InvalidateWeakPtrs();
492 }
493
494 base::WeakPtr<SpdyStream> SpdyStreamRequest::ReleaseStream() {
495   DCHECK(!session_);
496   base::WeakPtr<SpdyStream> stream = stream_;
497   DCHECK(stream);
498   Reset();
499   return stream;
500 }
501
502 void SpdyStreamRequest::OnRequestCompleteSuccess(
503     const base::WeakPtr<SpdyStream>& stream) {
504   DCHECK(session_);
505   DCHECK(!stream_);
506   DCHECK(!callback_.is_null());
507   CompletionCallback callback = callback_;
508   Reset();
509   DCHECK(stream);
510   stream_ = stream;
511   callback.Run(OK);
512 }
513
514 void SpdyStreamRequest::OnRequestCompleteFailure(int rv) {
515   DCHECK(session_);
516   DCHECK(!stream_);
517   DCHECK(!callback_.is_null());
518   CompletionCallback callback = callback_;
519   Reset();
520   DCHECK_NE(rv, OK);
521   callback.Run(rv);
522 }
523
524 void SpdyStreamRequest::Reset() {
525   type_ = SPDY_BIDIRECTIONAL_STREAM;
526   session_.reset();
527   stream_.reset();
528   url_ = GURL();
529   priority_ = MINIMUM_PRIORITY;
530   net_log_ = BoundNetLog();
531   callback_.Reset();
532 }
533
534 SpdySession::ActiveStreamInfo::ActiveStreamInfo()
535     : stream(NULL),
536       waiting_for_syn_reply(false) {}
537
538 SpdySession::ActiveStreamInfo::ActiveStreamInfo(SpdyStream* stream)
539     : stream(stream),
540       waiting_for_syn_reply(stream->type() != SPDY_PUSH_STREAM) {
541 }
542
543 SpdySession::ActiveStreamInfo::~ActiveStreamInfo() {}
544
545 SpdySession::PushedStreamInfo::PushedStreamInfo() : stream_id(0) {}
546
547 SpdySession::PushedStreamInfo::PushedStreamInfo(
548     SpdyStreamId stream_id,
549     base::TimeTicks creation_time)
550     : stream_id(stream_id),
551       creation_time(creation_time) {}
552
553 SpdySession::PushedStreamInfo::~PushedStreamInfo() {}
554
555 // static
556 bool SpdySession::CanPool(TransportSecurityState* transport_security_state,
557                           const SSLInfo& ssl_info,
558                           const std::string& old_hostname,
559                           const std::string& new_hostname) {
560   // Pooling is prohibited if the server cert is not valid for the new domain,
561   // and for connections on which client certs were sent. It is also prohibited
562   // when channel ID was sent if the hosts are from different eTLDs+1.
563   if (IsCertStatusError(ssl_info.cert_status))
564     return false;
565
566   if (ssl_info.client_cert_sent)
567     return false;
568
569   if (ssl_info.channel_id_sent &&
570       ChannelIDService::GetDomainForHost(new_hostname) !=
571       ChannelIDService::GetDomainForHost(old_hostname)) {
572     return false;
573   }
574
575   bool unused = false;
576   if (!ssl_info.cert->VerifyNameMatch(new_hostname, &unused))
577     return false;
578
579   std::string pinning_failure_log;
580   if (!transport_security_state->CheckPublicKeyPins(
581           new_hostname,
582           ssl_info.is_issued_by_known_root,
583           ssl_info.public_key_hashes,
584           &pinning_failure_log)) {
585     return false;
586   }
587
588   return true;
589 }
590
591 SpdySession::SpdySession(
592     const SpdySessionKey& spdy_session_key,
593     const base::WeakPtr<HttpServerProperties>& http_server_properties,
594     TransportSecurityState* transport_security_state,
595     bool verify_domain_authentication,
596     bool enable_sending_initial_data,
597     bool enable_compression,
598     bool enable_ping_based_connection_checking,
599     NextProto default_protocol,
600     size_t stream_initial_recv_window_size,
601     size_t initial_max_concurrent_streams,
602     size_t max_concurrent_streams_limit,
603     TimeFunc time_func,
604     const HostPortPair& trusted_spdy_proxy,
605     NetLog* net_log)
606     : in_io_loop_(false),
607       spdy_session_key_(spdy_session_key),
608       pool_(NULL),
609       http_server_properties_(http_server_properties),
610       transport_security_state_(transport_security_state),
611       read_buffer_(new IOBuffer(kReadBufferSize)),
612       stream_hi_water_mark_(kFirstStreamId),
613       last_accepted_push_stream_id_(0),
614       num_pushed_streams_(0u),
615       num_active_pushed_streams_(0u),
616       in_flight_write_frame_type_(DATA),
617       in_flight_write_frame_size_(0),
618       is_secure_(false),
619       certificate_error_code_(OK),
620       availability_state_(STATE_AVAILABLE),
621       read_state_(READ_STATE_DO_READ),
622       write_state_(WRITE_STATE_IDLE),
623       error_on_close_(OK),
624       max_concurrent_streams_(initial_max_concurrent_streams == 0
625                                   ? kInitialMaxConcurrentStreams
626                                   : initial_max_concurrent_streams),
627       max_concurrent_streams_limit_(max_concurrent_streams_limit == 0
628                                         ? kMaxConcurrentStreamLimit
629                                         : max_concurrent_streams_limit),
630       max_concurrent_pushed_streams_(kMaxConcurrentPushedStreams),
631       streams_initiated_count_(0),
632       streams_pushed_count_(0),
633       streams_pushed_and_claimed_count_(0),
634       streams_abandoned_count_(0),
635       total_bytes_received_(0),
636       sent_settings_(false),
637       received_settings_(false),
638       stalled_streams_(0),
639       pings_in_flight_(0),
640       next_ping_id_(1),
641       last_activity_time_(time_func()),
642       last_compressed_frame_len_(0),
643       check_ping_status_pending_(false),
644       send_connection_header_prefix_(false),
645       flow_control_state_(FLOW_CONTROL_NONE),
646       stream_initial_send_window_size_(kSpdyStreamInitialWindowSize),
647       stream_initial_recv_window_size_(stream_initial_recv_window_size == 0
648                                            ? kDefaultInitialRecvWindowSize
649                                            : stream_initial_recv_window_size),
650       session_send_window_size_(0),
651       session_recv_window_size_(0),
652       session_unacked_recv_window_bytes_(0),
653       net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_SPDY_SESSION)),
654       verify_domain_authentication_(verify_domain_authentication),
655       enable_sending_initial_data_(enable_sending_initial_data),
656       enable_compression_(enable_compression),
657       enable_ping_based_connection_checking_(
658           enable_ping_based_connection_checking),
659       protocol_(default_protocol),
660       connection_at_risk_of_loss_time_(
661           base::TimeDelta::FromSeconds(kDefaultConnectionAtRiskOfLossSeconds)),
662       hung_interval_(base::TimeDelta::FromSeconds(kHungIntervalSeconds)),
663       trusted_spdy_proxy_(trusted_spdy_proxy),
664       time_func_(time_func),
665       weak_factory_(this) {
666   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
667   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
668   DCHECK(HttpStreamFactory::spdy_enabled());
669   net_log_.BeginEvent(
670       NetLog::TYPE_SPDY_SESSION,
671       base::Bind(&NetLogSpdySessionCallback, &host_port_proxy_pair()));
672   next_unclaimed_push_stream_sweep_time_ = time_func_() +
673       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
674   // TODO(mbelshe): consider randomization of the stream_hi_water_mark.
675 }
676
677 SpdySession::~SpdySession() {
678   CHECK(!in_io_loop_);
679   DcheckDraining();
680
681   // TODO(akalin): Check connection->is_initialized() instead. This
682   // requires re-working CreateFakeSpdySession(), though.
683   DCHECK(connection_->socket());
684   // With SPDY we can't recycle sockets.
685   connection_->socket()->Disconnect();
686
687   RecordHistograms();
688
689   net_log_.EndEvent(NetLog::TYPE_SPDY_SESSION);
690 }
691
692 void SpdySession::InitializeWithSocket(
693     scoped_ptr<ClientSocketHandle> connection,
694     SpdySessionPool* pool,
695     bool is_secure,
696     int certificate_error_code) {
697   CHECK(!in_io_loop_);
698   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
699   DCHECK_EQ(read_state_, READ_STATE_DO_READ);
700   DCHECK_EQ(write_state_, WRITE_STATE_IDLE);
701   DCHECK(!connection_);
702
703   DCHECK(certificate_error_code == OK ||
704          certificate_error_code < ERR_IO_PENDING);
705   // TODO(akalin): Check connection->is_initialized() instead. This
706   // requires re-working CreateFakeSpdySession(), though.
707   DCHECK(connection->socket());
708
709   base::StatsCounter spdy_sessions("spdy.sessions");
710   spdy_sessions.Increment();
711
712   connection_ = connection.Pass();
713   is_secure_ = is_secure;
714   certificate_error_code_ = certificate_error_code;
715
716   NextProto protocol_negotiated =
717       connection_->socket()->GetNegotiatedProtocol();
718   if (protocol_negotiated != kProtoUnknown) {
719     protocol_ = protocol_negotiated;
720   }
721   DCHECK_GE(protocol_, kProtoSPDYMinimumVersion);
722   DCHECK_LE(protocol_, kProtoSPDYMaximumVersion);
723
724   if (protocol_ == kProtoSPDY4)
725     send_connection_header_prefix_ = true;
726
727   if (protocol_ >= kProtoSPDY31) {
728     flow_control_state_ = FLOW_CONTROL_STREAM_AND_SESSION;
729     session_send_window_size_ = kSpdySessionInitialWindowSize;
730     session_recv_window_size_ = kSpdySessionInitialWindowSize;
731   } else if (protocol_ >= kProtoSPDY3) {
732     flow_control_state_ = FLOW_CONTROL_STREAM;
733   } else {
734     flow_control_state_ = FLOW_CONTROL_NONE;
735   }
736
737   buffered_spdy_framer_.reset(
738       new BufferedSpdyFramer(NextProtoToSpdyMajorVersion(protocol_),
739                              enable_compression_));
740   buffered_spdy_framer_->set_visitor(this);
741   buffered_spdy_framer_->set_debug_visitor(this);
742   UMA_HISTOGRAM_ENUMERATION(
743       "Net.SpdyVersion2",
744       protocol_ - kProtoSPDYMinimumVersion,
745       kProtoSPDYMaximumVersion - kProtoSPDYMinimumVersion + 1);
746
747   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_INITIALIZED,
748                     base::Bind(&NetLogSpdyInitializedCallback,
749                                connection_->socket()->NetLog().source(),
750                                protocol_));
751
752   DCHECK_EQ(availability_state_, STATE_AVAILABLE);
753   connection_->AddHigherLayeredPool(this);
754   if (enable_sending_initial_data_)
755     SendInitialData();
756   pool_ = pool;
757
758   // Bootstrap the read loop.
759   base::MessageLoop::current()->PostTask(
760       FROM_HERE,
761       base::Bind(&SpdySession::PumpReadLoop,
762                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
763 }
764
765 bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
766   if (!verify_domain_authentication_)
767     return true;
768
769   if (availability_state_ == STATE_DRAINING)
770     return false;
771
772   SSLInfo ssl_info;
773   bool was_npn_negotiated;
774   NextProto protocol_negotiated = kProtoUnknown;
775   if (!GetSSLInfo(&ssl_info, &was_npn_negotiated, &protocol_negotiated))
776     return true;   // This is not a secure session, so all domains are okay.
777
778   return CanPool(transport_security_state_, ssl_info,
779                  host_port_pair().host(), domain);
780 }
781
782 int SpdySession::GetPushStream(
783     const GURL& url,
784     base::WeakPtr<SpdyStream>* stream,
785     const BoundNetLog& stream_net_log) {
786   CHECK(!in_io_loop_);
787
788   stream->reset();
789
790   if (availability_state_ == STATE_DRAINING)
791     return ERR_CONNECTION_CLOSED;
792
793   Error err = TryAccessStream(url);
794   if (err != OK)
795     return err;
796
797   *stream = GetActivePushStream(url);
798   if (*stream) {
799     DCHECK_LT(streams_pushed_and_claimed_count_, streams_pushed_count_);
800     streams_pushed_and_claimed_count_++;
801   }
802   return OK;
803 }
804
805 // {,Try}CreateStream() and TryAccessStream() can be called with
806 // |in_io_loop_| set if a stream is being created in response to
807 // another being closed due to received data.
808
809 Error SpdySession::TryAccessStream(const GURL& url) {
810   if (is_secure_ && certificate_error_code_ != OK &&
811       (url.SchemeIs("https") || url.SchemeIs("wss"))) {
812     RecordProtocolErrorHistogram(
813         PROTOCOL_ERROR_REQUEST_FOR_SECURE_CONTENT_OVER_INSECURE_SESSION);
814     DoDrainSession(
815         static_cast<Error>(certificate_error_code_),
816         "Tried to get SPDY stream for secure content over an unauthenticated "
817         "session.");
818     return ERR_SPDY_PROTOCOL_ERROR;
819   }
820   return OK;
821 }
822
823 int SpdySession::TryCreateStream(
824     const base::WeakPtr<SpdyStreamRequest>& request,
825     base::WeakPtr<SpdyStream>* stream) {
826   DCHECK(request);
827
828   if (availability_state_ == STATE_GOING_AWAY)
829     return ERR_FAILED;
830
831   if (availability_state_ == STATE_DRAINING)
832     return ERR_CONNECTION_CLOSED;
833
834   Error err = TryAccessStream(request->url());
835   if (err != OK)
836     return err;
837
838   if (!max_concurrent_streams_ ||
839       (active_streams_.size() + created_streams_.size() - num_pushed_streams_ <
840        max_concurrent_streams_)) {
841     return CreateStream(*request, stream);
842   }
843
844   stalled_streams_++;
845   net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_STALLED_MAX_STREAMS);
846   RequestPriority priority = request->priority();
847   CHECK_GE(priority, MINIMUM_PRIORITY);
848   CHECK_LE(priority, MAXIMUM_PRIORITY);
849   pending_create_stream_queues_[priority].push_back(request);
850   return ERR_IO_PENDING;
851 }
852
853 int SpdySession::CreateStream(const SpdyStreamRequest& request,
854                               base::WeakPtr<SpdyStream>* stream) {
855   DCHECK_GE(request.priority(), MINIMUM_PRIORITY);
856   DCHECK_LE(request.priority(), MAXIMUM_PRIORITY);
857
858   if (availability_state_ == STATE_GOING_AWAY)
859     return ERR_FAILED;
860
861   if (availability_state_ == STATE_DRAINING)
862     return ERR_CONNECTION_CLOSED;
863
864   Error err = TryAccessStream(request.url());
865   if (err != OK) {
866     // This should have been caught in TryCreateStream().
867     NOTREACHED();
868     return err;
869   }
870
871   DCHECK(connection_->socket());
872   DCHECK(connection_->socket()->IsConnected());
873   if (connection_->socket()) {
874     UMA_HISTOGRAM_BOOLEAN("Net.SpdySession.CreateStreamWithSocketConnected",
875                           connection_->socket()->IsConnected());
876     if (!connection_->socket()->IsConnected()) {
877       DoDrainSession(
878           ERR_CONNECTION_CLOSED,
879           "Tried to create SPDY stream for a closed socket connection.");
880       return ERR_CONNECTION_CLOSED;
881     }
882   }
883
884   scoped_ptr<SpdyStream> new_stream(
885       new SpdyStream(request.type(), GetWeakPtr(), request.url(),
886                      request.priority(),
887                      stream_initial_send_window_size_,
888                      stream_initial_recv_window_size_,
889                      request.net_log()));
890   *stream = new_stream->GetWeakPtr();
891   InsertCreatedStream(new_stream.Pass());
892
893   UMA_HISTOGRAM_CUSTOM_COUNTS(
894       "Net.SpdyPriorityCount",
895       static_cast<int>(request.priority()), 0, 10, 11);
896
897   return OK;
898 }
899
900 void SpdySession::CancelStreamRequest(
901     const base::WeakPtr<SpdyStreamRequest>& request) {
902   DCHECK(request);
903   RequestPriority priority = request->priority();
904   CHECK_GE(priority, MINIMUM_PRIORITY);
905   CHECK_LE(priority, MAXIMUM_PRIORITY);
906
907 #if DCHECK_IS_ON
908   // |request| should not be in a queue not matching its priority.
909   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
910     if (priority == i)
911       continue;
912     PendingStreamRequestQueue* queue = &pending_create_stream_queues_[i];
913     DCHECK(std::find_if(queue->begin(),
914                         queue->end(),
915                         RequestEquals(request)) == queue->end());
916   }
917 #endif
918
919   PendingStreamRequestQueue* queue =
920       &pending_create_stream_queues_[priority];
921   // Remove |request| from |queue| while preserving the order of the
922   // other elements.
923   PendingStreamRequestQueue::iterator it =
924       std::find_if(queue->begin(), queue->end(), RequestEquals(request));
925   // The request may already be removed if there's a
926   // CompleteStreamRequest() in flight.
927   if (it != queue->end()) {
928     it = queue->erase(it);
929     // |request| should be in the queue at most once, and if it is
930     // present, should not be pending completion.
931     DCHECK(std::find_if(it, queue->end(), RequestEquals(request)) ==
932            queue->end());
933   }
934 }
935
936 base::WeakPtr<SpdyStreamRequest> SpdySession::GetNextPendingStreamRequest() {
937   for (int j = MAXIMUM_PRIORITY; j >= MINIMUM_PRIORITY; --j) {
938     if (pending_create_stream_queues_[j].empty())
939       continue;
940
941     base::WeakPtr<SpdyStreamRequest> pending_request =
942         pending_create_stream_queues_[j].front();
943     DCHECK(pending_request);
944     pending_create_stream_queues_[j].pop_front();
945     return pending_request;
946   }
947   return base::WeakPtr<SpdyStreamRequest>();
948 }
949
950 void SpdySession::ProcessPendingStreamRequests() {
951   // Like |max_concurrent_streams_|, 0 means infinite for
952   // |max_requests_to_process|.
953   size_t max_requests_to_process = 0;
954   if (max_concurrent_streams_ != 0) {
955     max_requests_to_process =
956         max_concurrent_streams_ -
957         (active_streams_.size() + created_streams_.size());
958   }
959   for (size_t i = 0;
960        max_requests_to_process == 0 || i < max_requests_to_process; ++i) {
961     base::WeakPtr<SpdyStreamRequest> pending_request =
962         GetNextPendingStreamRequest();
963     if (!pending_request)
964       break;
965
966     // Note that this post can race with other stream creations, and it's
967     // possible that the un-stalled stream will be stalled again if it loses.
968     // TODO(jgraettinger): Provide stronger ordering guarantees.
969     base::MessageLoop::current()->PostTask(
970         FROM_HERE,
971         base::Bind(&SpdySession::CompleteStreamRequest,
972                    weak_factory_.GetWeakPtr(),
973                    pending_request));
974   }
975 }
976
977 void SpdySession::AddPooledAlias(const SpdySessionKey& alias_key) {
978   pooled_aliases_.insert(alias_key);
979 }
980
981 SpdyMajorVersion SpdySession::GetProtocolVersion() const {
982   DCHECK(buffered_spdy_framer_.get());
983   return buffered_spdy_framer_->protocol_version();
984 }
985
986 bool SpdySession::HasAcceptableTransportSecurity() const {
987   // If we're not even using TLS, we have no standards to meet.
988   if (!is_secure_) {
989     return true;
990   }
991
992   // We don't enforce transport security standards for older SPDY versions.
993   if (GetProtocolVersion() < SPDY4) {
994     return true;
995   }
996
997   SSLInfo ssl_info;
998   CHECK(connection_->socket()->GetSSLInfo(&ssl_info));
999
1000   // HTTP/2 requires TLS 1.2+
1001   if (SSLConnectionStatusToVersion(ssl_info.connection_status) <
1002       SSL_CONNECTION_VERSION_TLS1_2) {
1003     return false;
1004   }
1005
1006   if (!IsSecureTLSCipherSuite(
1007           SSLConnectionStatusToCipherSuite(ssl_info.connection_status))) {
1008     return false;
1009   }
1010
1011   return true;
1012 }
1013
1014 base::WeakPtr<SpdySession> SpdySession::GetWeakPtr() {
1015   return weak_factory_.GetWeakPtr();
1016 }
1017
1018 bool SpdySession::CloseOneIdleConnection() {
1019   CHECK(!in_io_loop_);
1020   DCHECK(pool_);
1021   if (active_streams_.empty()) {
1022     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1023   }
1024   // Return false as the socket wasn't immediately closed.
1025   return false;
1026 }
1027
1028 void SpdySession::EnqueueStreamWrite(
1029     const base::WeakPtr<SpdyStream>& stream,
1030     SpdyFrameType frame_type,
1031     scoped_ptr<SpdyBufferProducer> producer) {
1032   DCHECK(frame_type == HEADERS ||
1033          frame_type == DATA ||
1034          frame_type == CREDENTIAL ||
1035          frame_type == SYN_STREAM);
1036   EnqueueWrite(stream->priority(), frame_type, producer.Pass(), stream);
1037 }
1038
1039 scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
1040     SpdyStreamId stream_id,
1041     RequestPriority priority,
1042     SpdyControlFlags flags,
1043     const SpdyHeaderBlock& block) {
1044   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1045   CHECK(it != active_streams_.end());
1046   CHECK_EQ(it->second.stream->stream_id(), stream_id);
1047
1048   SendPrefacePingIfNoneInFlight();
1049
1050   DCHECK(buffered_spdy_framer_.get());
1051   SpdyPriority spdy_priority =
1052       ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion());
1053
1054   scoped_ptr<SpdyFrame> syn_frame;
1055   // TODO(hkhalil): Avoid copy of |block|.
1056   if (GetProtocolVersion() <= SPDY3) {
1057     SpdySynStreamIR syn_stream(stream_id);
1058     syn_stream.set_associated_to_stream_id(0);
1059     syn_stream.set_priority(spdy_priority);
1060     syn_stream.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1061     syn_stream.set_unidirectional((flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0);
1062     syn_stream.set_name_value_block(block);
1063     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(syn_stream));
1064   } else {
1065     SpdyHeadersIR headers(stream_id);
1066     headers.set_priority(spdy_priority);
1067     headers.set_has_priority(true);
1068     headers.set_fin((flags & CONTROL_FLAG_FIN) != 0);
1069     headers.set_name_value_block(block);
1070     syn_frame.reset(buffered_spdy_framer_->SerializeFrame(headers));
1071   }
1072
1073   base::StatsCounter spdy_requests("spdy.requests");
1074   spdy_requests.Increment();
1075   streams_initiated_count_++;
1076
1077   if (net_log().IsLogging()) {
1078     net_log().AddEvent(NetLog::TYPE_SPDY_SESSION_SYN_STREAM,
1079                        base::Bind(&NetLogSpdySynStreamSentCallback,
1080                                   &block,
1081                                   (flags & CONTROL_FLAG_FIN) != 0,
1082                                   (flags & CONTROL_FLAG_UNIDIRECTIONAL) != 0,
1083                                   spdy_priority,
1084                                   stream_id));
1085   }
1086
1087   return syn_frame.Pass();
1088 }
1089
1090 scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
1091                                                      IOBuffer* data,
1092                                                      int len,
1093                                                      SpdyDataFlags flags) {
1094   if (availability_state_ == STATE_DRAINING) {
1095     return scoped_ptr<SpdyBuffer>();
1096   }
1097
1098   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
1099   CHECK(it != active_streams_.end());
1100   SpdyStream* stream = it->second.stream;
1101   CHECK_EQ(stream->stream_id(), stream_id);
1102
1103   if (len < 0) {
1104     NOTREACHED();
1105     return scoped_ptr<SpdyBuffer>();
1106   }
1107
1108   int effective_len = std::min(len, kMaxSpdyFrameChunkSize);
1109
1110   bool send_stalled_by_stream =
1111       (flow_control_state_ >= FLOW_CONTROL_STREAM) &&
1112       (stream->send_window_size() <= 0);
1113   bool send_stalled_by_session = IsSendStalled();
1114
1115   // NOTE: There's an enum of the same name in histograms.xml.
1116   enum SpdyFrameFlowControlState {
1117     SEND_NOT_STALLED,
1118     SEND_STALLED_BY_STREAM,
1119     SEND_STALLED_BY_SESSION,
1120     SEND_STALLED_BY_STREAM_AND_SESSION,
1121   };
1122
1123   SpdyFrameFlowControlState frame_flow_control_state = SEND_NOT_STALLED;
1124   if (send_stalled_by_stream) {
1125     if (send_stalled_by_session) {
1126       frame_flow_control_state = SEND_STALLED_BY_STREAM_AND_SESSION;
1127     } else {
1128       frame_flow_control_state = SEND_STALLED_BY_STREAM;
1129     }
1130   } else if (send_stalled_by_session) {
1131     frame_flow_control_state = SEND_STALLED_BY_SESSION;
1132   }
1133
1134   if (flow_control_state_ == FLOW_CONTROL_STREAM) {
1135     UMA_HISTOGRAM_ENUMERATION(
1136         "Net.SpdyFrameStreamFlowControlState",
1137         frame_flow_control_state,
1138         SEND_STALLED_BY_STREAM + 1);
1139   } else if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1140     UMA_HISTOGRAM_ENUMERATION(
1141         "Net.SpdyFrameStreamAndSessionFlowControlState",
1142         frame_flow_control_state,
1143         SEND_STALLED_BY_STREAM_AND_SESSION + 1);
1144   }
1145
1146   // Obey send window size of the stream if stream flow control is
1147   // enabled.
1148   if (flow_control_state_ >= FLOW_CONTROL_STREAM) {
1149     if (send_stalled_by_stream) {
1150       stream->set_send_stalled_by_flow_control(true);
1151       // Even though we're currently stalled only by the stream, we
1152       // might end up being stalled by the session also.
1153       QueueSendStalledStream(*stream);
1154       net_log().AddEvent(
1155           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_STREAM_SEND_WINDOW,
1156           NetLog::IntegerCallback("stream_id", stream_id));
1157       return scoped_ptr<SpdyBuffer>();
1158     }
1159
1160     effective_len = std::min(effective_len, stream->send_window_size());
1161   }
1162
1163   // Obey send window size of the session if session flow control is
1164   // enabled.
1165   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
1166     if (send_stalled_by_session) {
1167       stream->set_send_stalled_by_flow_control(true);
1168       QueueSendStalledStream(*stream);
1169       net_log().AddEvent(
1170           NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_BY_SESSION_SEND_WINDOW,
1171           NetLog::IntegerCallback("stream_id", stream_id));
1172       return scoped_ptr<SpdyBuffer>();
1173     }
1174
1175     effective_len = std::min(effective_len, session_send_window_size_);
1176   }
1177
1178   DCHECK_GE(effective_len, 0);
1179
1180   // Clear FIN flag if only some of the data will be in the data
1181   // frame.
1182   if (effective_len < len)
1183     flags = static_cast<SpdyDataFlags>(flags & ~DATA_FLAG_FIN);
1184
1185   if (net_log().IsLogging()) {
1186     net_log().AddEvent(
1187         NetLog::TYPE_SPDY_SESSION_SEND_DATA,
1188         base::Bind(&NetLogSpdyDataCallback, stream_id, effective_len,
1189                    (flags & DATA_FLAG_FIN) != 0));
1190   }
1191
1192   // Send PrefacePing for DATA_FRAMEs with nonzero payload size.
1193   if (effective_len > 0)
1194     SendPrefacePingIfNoneInFlight();
1195
1196   // TODO(mbelshe): reduce memory copies here.
1197   DCHECK(buffered_spdy_framer_.get());
1198   scoped_ptr<SpdyFrame> frame(
1199       buffered_spdy_framer_->CreateDataFrame(
1200           stream_id, data->data(),
1201           static_cast<uint32>(effective_len), flags));
1202
1203   scoped_ptr<SpdyBuffer> data_buffer(new SpdyBuffer(frame.Pass()));
1204
1205   // Send window size is based on payload size, so nothing to do if this is
1206   // just a FIN with no payload.
1207   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION &&
1208       effective_len != 0) {
1209     DecreaseSendWindowSize(static_cast<int32>(effective_len));
1210     data_buffer->AddConsumeCallback(
1211         base::Bind(&SpdySession::OnWriteBufferConsumed,
1212                    weak_factory_.GetWeakPtr(),
1213                    static_cast<size_t>(effective_len)));
1214   }
1215
1216   return data_buffer.Pass();
1217 }
1218
1219 void SpdySession::CloseActiveStream(SpdyStreamId stream_id, int status) {
1220   DCHECK_NE(stream_id, 0u);
1221
1222   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1223   if (it == active_streams_.end()) {
1224     NOTREACHED();
1225     return;
1226   }
1227
1228   CloseActiveStreamIterator(it, status);
1229 }
1230
1231 void SpdySession::CloseCreatedStream(
1232     const base::WeakPtr<SpdyStream>& stream, int status) {
1233   DCHECK_EQ(stream->stream_id(), 0u);
1234
1235   CreatedStreamSet::iterator it = created_streams_.find(stream.get());
1236   if (it == created_streams_.end()) {
1237     NOTREACHED();
1238     return;
1239   }
1240
1241   CloseCreatedStreamIterator(it, status);
1242 }
1243
1244 void SpdySession::ResetStream(SpdyStreamId stream_id,
1245                               SpdyRstStreamStatus status,
1246                               const std::string& description) {
1247   DCHECK_NE(stream_id, 0u);
1248
1249   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1250   if (it == active_streams_.end()) {
1251     NOTREACHED();
1252     return;
1253   }
1254
1255   ResetStreamIterator(it, status, description);
1256 }
1257
1258 bool SpdySession::IsStreamActive(SpdyStreamId stream_id) const {
1259   return ContainsKey(active_streams_, stream_id);
1260 }
1261
1262 LoadState SpdySession::GetLoadState() const {
1263   // Just report that we're idle since the session could be doing
1264   // many things concurrently.
1265   return LOAD_STATE_IDLE;
1266 }
1267
1268 void SpdySession::CloseActiveStreamIterator(ActiveStreamMap::iterator it,
1269                                             int status) {
1270   // TODO(mbelshe): We should send a RST_STREAM control frame here
1271   //                so that the server can cancel a large send.
1272
1273   scoped_ptr<SpdyStream> owned_stream(it->second.stream);
1274   active_streams_.erase(it);
1275
1276   // TODO(akalin): When SpdyStream was ref-counted (and
1277   // |unclaimed_pushed_streams_| held scoped_refptr<SpdyStream>), this
1278   // was only done when status was not OK. This meant that pushed
1279   // streams can still be claimed after they're closed. This is
1280   // probably something that we still want to support, although server
1281   // push is hardly used. Write tests for this and fix this. (See
1282   // http://crbug.com/261712 .)
1283   if (owned_stream->type() == SPDY_PUSH_STREAM) {
1284     unclaimed_pushed_streams_.erase(owned_stream->url());
1285     num_pushed_streams_--;
1286     if (!owned_stream->IsReservedRemote())
1287       num_active_pushed_streams_--;
1288   }
1289
1290   DeleteStream(owned_stream.Pass(), status);
1291   MaybeFinishGoingAway();
1292
1293   // If there are no active streams and the socket pool is stalled, close the
1294   // session to free up a socket slot.
1295   if (active_streams_.empty() && connection_->IsPoolStalled()) {
1296     DoDrainSession(ERR_CONNECTION_CLOSED, "Closing idle connection.");
1297   }
1298 }
1299
1300 void SpdySession::CloseCreatedStreamIterator(CreatedStreamSet::iterator it,
1301                                              int status) {
1302   scoped_ptr<SpdyStream> owned_stream(*it);
1303   created_streams_.erase(it);
1304   DeleteStream(owned_stream.Pass(), status);
1305 }
1306
1307 void SpdySession::ResetStreamIterator(ActiveStreamMap::iterator it,
1308                                       SpdyRstStreamStatus status,
1309                                       const std::string& description) {
1310   // Send the RST_STREAM frame first as CloseActiveStreamIterator()
1311   // may close us.
1312   SpdyStreamId stream_id = it->first;
1313   RequestPriority priority = it->second.stream->priority();
1314   EnqueueResetStreamFrame(stream_id, priority, status, description);
1315
1316   // Removes any pending writes for the stream except for possibly an
1317   // in-flight one.
1318   CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
1319 }
1320
1321 void SpdySession::EnqueueResetStreamFrame(SpdyStreamId stream_id,
1322                                           RequestPriority priority,
1323                                           SpdyRstStreamStatus status,
1324                                           const std::string& description) {
1325   DCHECK_NE(stream_id, 0u);
1326
1327   net_log().AddEvent(
1328       NetLog::TYPE_SPDY_SESSION_SEND_RST_STREAM,
1329       base::Bind(&NetLogSpdyRstCallback, stream_id, status, &description));
1330
1331   DCHECK(buffered_spdy_framer_.get());
1332   scoped_ptr<SpdyFrame> rst_frame(
1333       buffered_spdy_framer_->CreateRstStream(stream_id, status));
1334
1335   EnqueueSessionWrite(priority, RST_STREAM, rst_frame.Pass());
1336   RecordProtocolErrorHistogram(MapRstStreamStatusToProtocolError(status));
1337 }
1338
1339 void SpdySession::PumpReadLoop(ReadState expected_read_state, int result) {
1340   // TODO(vadimt): Remove ScopedTracker below once crbug.com/418183 is fixed.
1341   tracked_objects::ScopedTracker tracking_profile(
1342       FROM_HERE_WITH_EXPLICIT_FUNCTION(
1343           "418183 DoReadCallback => SpdySession::PumpReadLoop"));
1344
1345   CHECK(!in_io_loop_);
1346   if (availability_state_ == STATE_DRAINING) {
1347     return;
1348   }
1349   ignore_result(DoReadLoop(expected_read_state, result));
1350 }
1351
1352 int SpdySession::DoReadLoop(ReadState expected_read_state, int result) {
1353   CHECK(!in_io_loop_);
1354   CHECK_EQ(read_state_, expected_read_state);
1355
1356   in_io_loop_ = true;
1357
1358   int bytes_read_without_yielding = 0;
1359
1360   // Loop until the session is draining, the read becomes blocked, or
1361   // the read limit is exceeded.
1362   while (true) {
1363     switch (read_state_) {
1364       case READ_STATE_DO_READ:
1365         CHECK_EQ(result, OK);
1366         result = DoRead();
1367         break;
1368       case READ_STATE_DO_READ_COMPLETE:
1369         if (result > 0)
1370           bytes_read_without_yielding += result;
1371         result = DoReadComplete(result);
1372         break;
1373       default:
1374         NOTREACHED() << "read_state_: " << read_state_;
1375         break;
1376     }
1377
1378     if (availability_state_ == STATE_DRAINING)
1379       break;
1380
1381     if (result == ERR_IO_PENDING)
1382       break;
1383
1384     if (bytes_read_without_yielding > kMaxReadBytesWithoutYielding) {
1385       read_state_ = READ_STATE_DO_READ;
1386       base::MessageLoop::current()->PostTask(
1387           FROM_HERE,
1388           base::Bind(&SpdySession::PumpReadLoop,
1389                      weak_factory_.GetWeakPtr(), READ_STATE_DO_READ, OK));
1390       result = ERR_IO_PENDING;
1391       break;
1392     }
1393   }
1394
1395   CHECK(in_io_loop_);
1396   in_io_loop_ = false;
1397
1398   return result;
1399 }
1400
1401 int SpdySession::DoRead() {
1402   CHECK(in_io_loop_);
1403
1404   CHECK(connection_);
1405   CHECK(connection_->socket());
1406   read_state_ = READ_STATE_DO_READ_COMPLETE;
1407   return connection_->socket()->Read(
1408       read_buffer_.get(),
1409       kReadBufferSize,
1410       base::Bind(&SpdySession::PumpReadLoop,
1411                  weak_factory_.GetWeakPtr(), READ_STATE_DO_READ_COMPLETE));
1412 }
1413
1414 int SpdySession::DoReadComplete(int result) {
1415   CHECK(in_io_loop_);
1416
1417   // Parse a frame.  For now this code requires that the frame fit into our
1418   // buffer (kReadBufferSize).
1419   // TODO(mbelshe): support arbitrarily large frames!
1420
1421   if (result == 0) {
1422     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.EOF",
1423                                 total_bytes_received_, 1, 100000000, 50);
1424     DoDrainSession(ERR_CONNECTION_CLOSED, "Connection closed");
1425
1426     return ERR_CONNECTION_CLOSED;
1427   }
1428
1429   if (result < 0) {
1430     DoDrainSession(static_cast<Error>(result), "result is < 0.");
1431     return result;
1432   }
1433   CHECK_LE(result, kReadBufferSize);
1434   total_bytes_received_ += result;
1435
1436   last_activity_time_ = time_func_();
1437
1438   DCHECK(buffered_spdy_framer_.get());
1439   char* data = read_buffer_->data();
1440   while (result > 0) {
1441     uint32 bytes_processed = buffered_spdy_framer_->ProcessInput(data, result);
1442     result -= bytes_processed;
1443     data += bytes_processed;
1444
1445     if (availability_state_ == STATE_DRAINING) {
1446       return ERR_CONNECTION_CLOSED;
1447     }
1448
1449     DCHECK_EQ(buffered_spdy_framer_->error_code(), SpdyFramer::SPDY_NO_ERROR);
1450   }
1451
1452   read_state_ = READ_STATE_DO_READ;
1453   return OK;
1454 }
1455
1456 void SpdySession::PumpWriteLoop(WriteState expected_write_state, int result) {
1457   CHECK(!in_io_loop_);
1458   DCHECK_EQ(write_state_, expected_write_state);
1459
1460   DoWriteLoop(expected_write_state, result);
1461
1462   if (availability_state_ == STATE_DRAINING && !in_flight_write_ &&
1463       write_queue_.IsEmpty()) {
1464     pool_->RemoveUnavailableSession(GetWeakPtr());  // Destroys |this|.
1465     return;
1466   }
1467 }
1468
1469 int SpdySession::DoWriteLoop(WriteState expected_write_state, int result) {
1470   CHECK(!in_io_loop_);
1471   DCHECK_NE(write_state_, WRITE_STATE_IDLE);
1472   DCHECK_EQ(write_state_, expected_write_state);
1473
1474   in_io_loop_ = true;
1475
1476   // Loop until the session is closed or the write becomes blocked.
1477   while (true) {
1478     switch (write_state_) {
1479       case WRITE_STATE_DO_WRITE:
1480         DCHECK_EQ(result, OK);
1481         result = DoWrite();
1482         break;
1483       case WRITE_STATE_DO_WRITE_COMPLETE:
1484         result = DoWriteComplete(result);
1485         break;
1486       case WRITE_STATE_IDLE:
1487       default:
1488         NOTREACHED() << "write_state_: " << write_state_;
1489         break;
1490     }
1491
1492     if (write_state_ == WRITE_STATE_IDLE) {
1493       DCHECK_EQ(result, ERR_IO_PENDING);
1494       break;
1495     }
1496
1497     if (result == ERR_IO_PENDING)
1498       break;
1499   }
1500
1501   CHECK(in_io_loop_);
1502   in_io_loop_ = false;
1503
1504   return result;
1505 }
1506
1507 int SpdySession::DoWrite() {
1508   CHECK(in_io_loop_);
1509
1510   DCHECK(buffered_spdy_framer_);
1511   if (in_flight_write_) {
1512     DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1513   } else {
1514     // Grab the next frame to send.
1515     SpdyFrameType frame_type = DATA;
1516     scoped_ptr<SpdyBufferProducer> producer;
1517     base::WeakPtr<SpdyStream> stream;
1518     if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) {
1519       write_state_ = WRITE_STATE_IDLE;
1520       return ERR_IO_PENDING;
1521     }
1522
1523     if (stream.get())
1524       CHECK(!stream->IsClosed());
1525
1526     // Activate the stream only when sending the SYN_STREAM frame to
1527     // guarantee monotonically-increasing stream IDs.
1528     if (frame_type == SYN_STREAM) {
1529       CHECK(stream.get());
1530       CHECK_EQ(stream->stream_id(), 0u);
1531       scoped_ptr<SpdyStream> owned_stream =
1532           ActivateCreatedStream(stream.get());
1533       InsertActivatedStream(owned_stream.Pass());
1534
1535       if (stream_hi_water_mark_ > kLastStreamId) {
1536         CHECK_EQ(stream->stream_id(), kLastStreamId);
1537         // We've exhausted the stream ID space, and no new streams may be
1538         // created after this one.
1539         MakeUnavailable();
1540         StartGoingAway(kLastStreamId, ERR_ABORTED);
1541       }
1542     }
1543
1544     in_flight_write_ = producer->ProduceBuffer();
1545     if (!in_flight_write_) {
1546       NOTREACHED();
1547       return ERR_UNEXPECTED;
1548     }
1549     in_flight_write_frame_type_ = frame_type;
1550     in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
1551     DCHECK_GE(in_flight_write_frame_size_,
1552               buffered_spdy_framer_->GetFrameMinimumSize());
1553     in_flight_write_stream_ = stream;
1554   }
1555
1556   write_state_ = WRITE_STATE_DO_WRITE_COMPLETE;
1557
1558   // Explicitly store in a scoped_refptr<IOBuffer> to avoid problems
1559   // with Socket implementations that don't store their IOBuffer
1560   // argument in a scoped_refptr<IOBuffer> (see crbug.com/232345).
1561   scoped_refptr<IOBuffer> write_io_buffer =
1562       in_flight_write_->GetIOBufferForRemainingData();
1563   return connection_->socket()->Write(
1564       write_io_buffer.get(),
1565       in_flight_write_->GetRemainingSize(),
1566       base::Bind(&SpdySession::PumpWriteLoop,
1567                  weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE_COMPLETE));
1568 }
1569
1570 int SpdySession::DoWriteComplete(int result) {
1571   CHECK(in_io_loop_);
1572   DCHECK_NE(result, ERR_IO_PENDING);
1573   DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
1574
1575   last_activity_time_ = time_func_();
1576
1577   if (result < 0) {
1578     DCHECK_NE(result, ERR_IO_PENDING);
1579     in_flight_write_.reset();
1580     in_flight_write_frame_type_ = DATA;
1581     in_flight_write_frame_size_ = 0;
1582     in_flight_write_stream_.reset();
1583     write_state_ = WRITE_STATE_DO_WRITE;
1584     DoDrainSession(static_cast<Error>(result), "Write error");
1585     return OK;
1586   }
1587
1588   // It should not be possible to have written more bytes than our
1589   // in_flight_write_.
1590   DCHECK_LE(static_cast<size_t>(result),
1591             in_flight_write_->GetRemainingSize());
1592
1593   if (result > 0) {
1594     in_flight_write_->Consume(static_cast<size_t>(result));
1595
1596     // We only notify the stream when we've fully written the pending frame.
1597     if (in_flight_write_->GetRemainingSize() == 0) {
1598       // It is possible that the stream was cancelled while we were
1599       // writing to the socket.
1600       if (in_flight_write_stream_.get()) {
1601         DCHECK_GT(in_flight_write_frame_size_, 0u);
1602         in_flight_write_stream_->OnFrameWriteComplete(
1603             in_flight_write_frame_type_,
1604             in_flight_write_frame_size_);
1605       }
1606
1607       // Cleanup the write which just completed.
1608       in_flight_write_.reset();
1609       in_flight_write_frame_type_ = DATA;
1610       in_flight_write_frame_size_ = 0;
1611       in_flight_write_stream_.reset();
1612     }
1613   }
1614
1615   write_state_ = WRITE_STATE_DO_WRITE;
1616   return OK;
1617 }
1618
1619 void SpdySession::DcheckGoingAway() const {
1620 #if DCHECK_IS_ON
1621   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1622   for (int i = MINIMUM_PRIORITY; i <= MAXIMUM_PRIORITY; ++i) {
1623     DCHECK(pending_create_stream_queues_[i].empty());
1624   }
1625   DCHECK(created_streams_.empty());
1626 #endif
1627 }
1628
1629 void SpdySession::DcheckDraining() const {
1630   DcheckGoingAway();
1631   DCHECK_EQ(availability_state_, STATE_DRAINING);
1632   DCHECK(active_streams_.empty());
1633   DCHECK(unclaimed_pushed_streams_.empty());
1634 }
1635
1636 void SpdySession::StartGoingAway(SpdyStreamId last_good_stream_id,
1637                                  Error status) {
1638   DCHECK_GE(availability_state_, STATE_GOING_AWAY);
1639
1640   // The loops below are carefully written to avoid reentrancy problems.
1641
1642   while (true) {
1643     size_t old_size = GetTotalSize(pending_create_stream_queues_);
1644     base::WeakPtr<SpdyStreamRequest> pending_request =
1645         GetNextPendingStreamRequest();
1646     if (!pending_request)
1647       break;
1648     // No new stream requests should be added while the session is
1649     // going away.
1650     DCHECK_GT(old_size, GetTotalSize(pending_create_stream_queues_));
1651     pending_request->OnRequestCompleteFailure(ERR_ABORTED);
1652   }
1653
1654   while (true) {
1655     size_t old_size = active_streams_.size();
1656     ActiveStreamMap::iterator it =
1657         active_streams_.lower_bound(last_good_stream_id + 1);
1658     if (it == active_streams_.end())
1659       break;
1660     LogAbandonedActiveStream(it, status);
1661     CloseActiveStreamIterator(it, status);
1662     // No new streams should be activated while the session is going
1663     // away.
1664     DCHECK_GT(old_size, active_streams_.size());
1665   }
1666
1667   while (!created_streams_.empty()) {
1668     size_t old_size = created_streams_.size();
1669     CreatedStreamSet::iterator it = created_streams_.begin();
1670     LogAbandonedStream(*it, status);
1671     CloseCreatedStreamIterator(it, status);
1672     // No new streams should be created while the session is going
1673     // away.
1674     DCHECK_GT(old_size, created_streams_.size());
1675   }
1676
1677   write_queue_.RemovePendingWritesForStreamsAfter(last_good_stream_id);
1678
1679   DcheckGoingAway();
1680 }
1681
1682 void SpdySession::MaybeFinishGoingAway() {
1683   if (active_streams_.empty() && availability_state_ == STATE_GOING_AWAY) {
1684     DoDrainSession(OK, "Finished going away");
1685   }
1686 }
1687
1688 void SpdySession::DoDrainSession(Error err, const std::string& description) {
1689   if (availability_state_ == STATE_DRAINING) {
1690     return;
1691   }
1692   MakeUnavailable();
1693
1694   // If |err| indicates an error occurred, inform the peer that we're closing
1695   // and why. Don't GOAWAY on a graceful or idle close, as that may
1696   // unnecessarily wake the radio. We could technically GOAWAY on network errors
1697   // (we'll probably fail to actually write it, but that's okay), however many
1698   // unit-tests would need to be updated.
1699   if (err != OK &&
1700       err != ERR_ABORTED &&  // Used by SpdySessionPool to close idle sessions.
1701       err != ERR_NETWORK_CHANGED &&  // Used to deprecate sessions on IP change.
1702       err != ERR_SOCKET_NOT_CONNECTED &&
1703       err != ERR_CONNECTION_CLOSED && err != ERR_CONNECTION_RESET) {
1704     // Enqueue a GOAWAY to inform the peer of why we're closing the connection.
1705     SpdyGoAwayIR goaway_ir(last_accepted_push_stream_id_,
1706                            MapNetErrorToGoAwayStatus(err),
1707                            description);
1708     EnqueueSessionWrite(HIGHEST,
1709                         GOAWAY,
1710                         scoped_ptr<SpdyFrame>(
1711                             buffered_spdy_framer_->SerializeFrame(goaway_ir)));
1712   }
1713
1714   availability_state_ = STATE_DRAINING;
1715   error_on_close_ = err;
1716
1717   net_log_.AddEvent(
1718       NetLog::TYPE_SPDY_SESSION_CLOSE,
1719       base::Bind(&NetLogSpdySessionCloseCallback, err, &description));
1720
1721   UMA_HISTOGRAM_SPARSE_SLOWLY("Net.SpdySession.ClosedOnError", -err);
1722   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySession.BytesRead.OtherErrors",
1723                               total_bytes_received_, 1, 100000000, 50);
1724
1725   if (err == OK) {
1726     // We ought to be going away already, as this is a graceful close.
1727     DcheckGoingAway();
1728   } else {
1729     StartGoingAway(0, err);
1730   }
1731   DcheckDraining();
1732   MaybePostWriteLoop();
1733 }
1734
1735 void SpdySession::LogAbandonedStream(SpdyStream* stream, Error status) {
1736   DCHECK(stream);
1737   std::string description = base::StringPrintf(
1738       "ABANDONED (stream_id=%d): ", stream->stream_id()) +
1739       stream->url().spec();
1740   stream->LogStreamError(status, description);
1741   // We don't increment the streams abandoned counter here. If the
1742   // stream isn't active (i.e., it hasn't written anything to the wire
1743   // yet) then it's as if it never existed. If it is active, then
1744   // LogAbandonedActiveStream() will increment the counters.
1745 }
1746
1747 void SpdySession::LogAbandonedActiveStream(ActiveStreamMap::const_iterator it,
1748                                            Error status) {
1749   DCHECK_GT(it->first, 0u);
1750   LogAbandonedStream(it->second.stream, status);
1751   ++streams_abandoned_count_;
1752   base::StatsCounter abandoned_streams("spdy.abandoned_streams");
1753   abandoned_streams.Increment();
1754   if (it->second.stream->type() == SPDY_PUSH_STREAM &&
1755       unclaimed_pushed_streams_.find(it->second.stream->url()) !=
1756       unclaimed_pushed_streams_.end()) {
1757     base::StatsCounter abandoned_push_streams("spdy.abandoned_push_streams");
1758     abandoned_push_streams.Increment();
1759   }
1760 }
1761
1762 SpdyStreamId SpdySession::GetNewStreamId() {
1763   CHECK_LE(stream_hi_water_mark_, kLastStreamId);
1764   SpdyStreamId id = stream_hi_water_mark_;
1765   stream_hi_water_mark_ += 2;
1766   return id;
1767 }
1768
1769 void SpdySession::CloseSessionOnError(Error err,
1770                                       const std::string& description) {
1771   DCHECK_LT(err, ERR_IO_PENDING);
1772   DoDrainSession(err, description);
1773 }
1774
1775 void SpdySession::MakeUnavailable() {
1776   if (availability_state_ == STATE_AVAILABLE) {
1777     availability_state_ = STATE_GOING_AWAY;
1778     pool_->MakeSessionUnavailable(GetWeakPtr());
1779   }
1780 }
1781
1782 base::Value* SpdySession::GetInfoAsValue() const {
1783   base::DictionaryValue* dict = new base::DictionaryValue();
1784
1785   dict->SetInteger("source_id", net_log_.source().id);
1786
1787   dict->SetString("host_port_pair", host_port_pair().ToString());
1788   if (!pooled_aliases_.empty()) {
1789     base::ListValue* alias_list = new base::ListValue();
1790     for (std::set<SpdySessionKey>::const_iterator it =
1791              pooled_aliases_.begin();
1792          it != pooled_aliases_.end(); it++) {
1793       alias_list->Append(new base::StringValue(
1794           it->host_port_pair().ToString()));
1795     }
1796     dict->Set("aliases", alias_list);
1797   }
1798   dict->SetString("proxy", host_port_proxy_pair().second.ToURI());
1799
1800   dict->SetInteger("active_streams", active_streams_.size());
1801
1802   dict->SetInteger("unclaimed_pushed_streams",
1803                    unclaimed_pushed_streams_.size());
1804
1805   dict->SetBoolean("is_secure", is_secure_);
1806
1807   dict->SetString("protocol_negotiated",
1808                   SSLClientSocket::NextProtoToString(
1809                       connection_->socket()->GetNegotiatedProtocol()));
1810
1811   dict->SetInteger("error", error_on_close_);
1812   dict->SetInteger("max_concurrent_streams", max_concurrent_streams_);
1813
1814   dict->SetInteger("streams_initiated_count", streams_initiated_count_);
1815   dict->SetInteger("streams_pushed_count", streams_pushed_count_);
1816   dict->SetInteger("streams_pushed_and_claimed_count",
1817       streams_pushed_and_claimed_count_);
1818   dict->SetInteger("streams_abandoned_count", streams_abandoned_count_);
1819   DCHECK(buffered_spdy_framer_.get());
1820   dict->SetInteger("frames_received", buffered_spdy_framer_->frames_received());
1821
1822   dict->SetBoolean("sent_settings", sent_settings_);
1823   dict->SetBoolean("received_settings", received_settings_);
1824
1825   dict->SetInteger("send_window_size", session_send_window_size_);
1826   dict->SetInteger("recv_window_size", session_recv_window_size_);
1827   dict->SetInteger("unacked_recv_window_bytes",
1828                    session_unacked_recv_window_bytes_);
1829   return dict;
1830 }
1831
1832 bool SpdySession::IsReused() const {
1833   return buffered_spdy_framer_->frames_received() > 0 ||
1834       connection_->reuse_type() == ClientSocketHandle::UNUSED_IDLE;
1835 }
1836
1837 bool SpdySession::GetLoadTimingInfo(SpdyStreamId stream_id,
1838                                     LoadTimingInfo* load_timing_info) const {
1839   return connection_->GetLoadTimingInfo(stream_id != kFirstStreamId,
1840                                         load_timing_info);
1841 }
1842
1843 int SpdySession::GetPeerAddress(IPEndPoint* address) const {
1844   int rv = ERR_SOCKET_NOT_CONNECTED;
1845   if (connection_->socket()) {
1846     rv = connection_->socket()->GetPeerAddress(address);
1847   }
1848
1849   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetPeerAddress",
1850                         rv == ERR_SOCKET_NOT_CONNECTED);
1851
1852   return rv;
1853 }
1854
1855 int SpdySession::GetLocalAddress(IPEndPoint* address) const {
1856   int rv = ERR_SOCKET_NOT_CONNECTED;
1857   if (connection_->socket()) {
1858     rv = connection_->socket()->GetLocalAddress(address);
1859   }
1860
1861   UMA_HISTOGRAM_BOOLEAN("Net.SpdySessionSocketNotConnectedGetLocalAddress",
1862                         rv == ERR_SOCKET_NOT_CONNECTED);
1863
1864   return rv;
1865 }
1866
1867 void SpdySession::EnqueueSessionWrite(RequestPriority priority,
1868                                       SpdyFrameType frame_type,
1869                                       scoped_ptr<SpdyFrame> frame) {
1870   DCHECK(frame_type == RST_STREAM || frame_type == SETTINGS ||
1871          frame_type == WINDOW_UPDATE || frame_type == PING ||
1872          frame_type == GOAWAY);
1873   EnqueueWrite(
1874       priority, frame_type,
1875       scoped_ptr<SpdyBufferProducer>(
1876           new SimpleBufferProducer(
1877               scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
1878       base::WeakPtr<SpdyStream>());
1879 }
1880
1881 void SpdySession::EnqueueWrite(RequestPriority priority,
1882                                SpdyFrameType frame_type,
1883                                scoped_ptr<SpdyBufferProducer> producer,
1884                                const base::WeakPtr<SpdyStream>& stream) {
1885   if (availability_state_ == STATE_DRAINING)
1886     return;
1887
1888   write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
1889   MaybePostWriteLoop();
1890 }
1891
1892 void SpdySession::MaybePostWriteLoop() {
1893   if (write_state_ == WRITE_STATE_IDLE) {
1894     CHECK(!in_flight_write_);
1895     write_state_ = WRITE_STATE_DO_WRITE;
1896     base::MessageLoop::current()->PostTask(
1897         FROM_HERE,
1898         base::Bind(&SpdySession::PumpWriteLoop,
1899                    weak_factory_.GetWeakPtr(), WRITE_STATE_DO_WRITE, OK));
1900   }
1901 }
1902
1903 void SpdySession::InsertCreatedStream(scoped_ptr<SpdyStream> stream) {
1904   CHECK_EQ(stream->stream_id(), 0u);
1905   CHECK(created_streams_.find(stream.get()) == created_streams_.end());
1906   created_streams_.insert(stream.release());
1907 }
1908
1909 scoped_ptr<SpdyStream> SpdySession::ActivateCreatedStream(SpdyStream* stream) {
1910   CHECK_EQ(stream->stream_id(), 0u);
1911   CHECK(created_streams_.find(stream) != created_streams_.end());
1912   stream->set_stream_id(GetNewStreamId());
1913   scoped_ptr<SpdyStream> owned_stream(stream);
1914   created_streams_.erase(stream);
1915   return owned_stream.Pass();
1916 }
1917
1918 void SpdySession::InsertActivatedStream(scoped_ptr<SpdyStream> stream) {
1919   SpdyStreamId stream_id = stream->stream_id();
1920   CHECK_NE(stream_id, 0u);
1921   std::pair<ActiveStreamMap::iterator, bool> result =
1922       active_streams_.insert(
1923           std::make_pair(stream_id, ActiveStreamInfo(stream.get())));
1924   CHECK(result.second);
1925   ignore_result(stream.release());
1926 }
1927
1928 void SpdySession::DeleteStream(scoped_ptr<SpdyStream> stream, int status) {
1929   if (in_flight_write_stream_.get() == stream.get()) {
1930     // If we're deleting the stream for the in-flight write, we still
1931     // need to let the write complete, so we clear
1932     // |in_flight_write_stream_| and let the write finish on its own
1933     // without notifying |in_flight_write_stream_|.
1934     in_flight_write_stream_.reset();
1935   }
1936
1937   write_queue_.RemovePendingWritesForStream(stream->GetWeakPtr());
1938   stream->OnClose(status);
1939
1940   if (availability_state_ == STATE_AVAILABLE) {
1941     ProcessPendingStreamRequests();
1942   }
1943 }
1944
1945 base::WeakPtr<SpdyStream> SpdySession::GetActivePushStream(const GURL& url) {
1946   base::StatsCounter used_push_streams("spdy.claimed_push_streams");
1947
1948   PushedStreamMap::iterator unclaimed_it = unclaimed_pushed_streams_.find(url);
1949   if (unclaimed_it == unclaimed_pushed_streams_.end())
1950     return base::WeakPtr<SpdyStream>();
1951
1952   SpdyStreamId stream_id = unclaimed_it->second.stream_id;
1953   unclaimed_pushed_streams_.erase(unclaimed_it);
1954
1955   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
1956   if (active_it == active_streams_.end()) {
1957     NOTREACHED();
1958     return base::WeakPtr<SpdyStream>();
1959   }
1960
1961   net_log_.AddEvent(NetLog::TYPE_SPDY_STREAM_ADOPTED_PUSH_STREAM);
1962   used_push_streams.Increment();
1963   return active_it->second.stream->GetWeakPtr();
1964 }
1965
1966 bool SpdySession::GetSSLInfo(SSLInfo* ssl_info,
1967                              bool* was_npn_negotiated,
1968                              NextProto* protocol_negotiated) {
1969   *was_npn_negotiated = connection_->socket()->WasNpnNegotiated();
1970   *protocol_negotiated = connection_->socket()->GetNegotiatedProtocol();
1971   return connection_->socket()->GetSSLInfo(ssl_info);
1972 }
1973
1974 bool SpdySession::GetSSLCertRequestInfo(
1975     SSLCertRequestInfo* cert_request_info) {
1976   if (!is_secure_)
1977     return false;
1978   GetSSLClientSocket()->GetSSLCertRequestInfo(cert_request_info);
1979   return true;
1980 }
1981
1982 void SpdySession::OnError(SpdyFramer::SpdyError error_code) {
1983   CHECK(in_io_loop_);
1984
1985   RecordProtocolErrorHistogram(MapFramerErrorToProtocolError(error_code));
1986   std::string description =
1987       base::StringPrintf("Framer error: %d (%s).",
1988                          error_code,
1989                          SpdyFramer::ErrorCodeToString(error_code));
1990   DoDrainSession(MapFramerErrorToNetError(error_code), description);
1991 }
1992
1993 void SpdySession::OnStreamError(SpdyStreamId stream_id,
1994                                 const std::string& description) {
1995   CHECK(in_io_loop_);
1996
1997   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
1998   if (it == active_streams_.end()) {
1999     // We still want to send a frame to reset the stream even if we
2000     // don't know anything about it.
2001     EnqueueResetStreamFrame(
2002         stream_id, IDLE, RST_STREAM_PROTOCOL_ERROR, description);
2003     return;
2004   }
2005
2006   ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, description);
2007 }
2008
2009 void SpdySession::OnDataFrameHeader(SpdyStreamId stream_id,
2010                                     size_t length,
2011                                     bool fin) {
2012   CHECK(in_io_loop_);
2013
2014   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2015
2016   // By the time data comes in, the stream may already be inactive.
2017   if (it == active_streams_.end())
2018     return;
2019
2020   SpdyStream* stream = it->second.stream;
2021   CHECK_EQ(stream->stream_id(), stream_id);
2022
2023   DCHECK(buffered_spdy_framer_);
2024   size_t header_len = buffered_spdy_framer_->GetDataFrameMinimumSize();
2025   stream->IncrementRawReceivedBytes(header_len);
2026 }
2027
2028 void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
2029                                     const char* data,
2030                                     size_t len,
2031                                     bool fin) {
2032   CHECK(in_io_loop_);
2033
2034   if (data == NULL && len != 0) {
2035     // This is notification of consumed data padding.
2036     // TODO(jgraettinger): Properly flow padding into WINDOW_UPDATE frames.
2037     // See crbug.com/353012.
2038     return;
2039   }
2040
2041   DCHECK_LT(len, 1u << 24);
2042   if (net_log().IsLogging()) {
2043     net_log().AddEvent(
2044         NetLog::TYPE_SPDY_SESSION_RECV_DATA,
2045         base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
2046   }
2047
2048   // Build the buffer as early as possible so that we go through the
2049   // session flow control checks and update
2050   // |unacked_recv_window_bytes_| properly even when the stream is
2051   // inactive (since the other side has still reduced its session send
2052   // window).
2053   scoped_ptr<SpdyBuffer> buffer;
2054   if (data) {
2055     DCHECK_GT(len, 0u);
2056     CHECK_LE(len, static_cast<size_t>(kReadBufferSize));
2057     buffer.reset(new SpdyBuffer(data, len));
2058
2059     if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2060       DecreaseRecvWindowSize(static_cast<int32>(len));
2061       buffer->AddConsumeCallback(
2062           base::Bind(&SpdySession::OnReadBufferConsumed,
2063                      weak_factory_.GetWeakPtr()));
2064     }
2065   } else {
2066     DCHECK_EQ(len, 0u);
2067   }
2068
2069   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2070
2071   // By the time data comes in, the stream may already be inactive.
2072   if (it == active_streams_.end())
2073     return;
2074
2075   SpdyStream* stream = it->second.stream;
2076   CHECK_EQ(stream->stream_id(), stream_id);
2077
2078   stream->IncrementRawReceivedBytes(len);
2079
2080   if (it->second.waiting_for_syn_reply) {
2081     const std::string& error = "Data received before SYN_REPLY.";
2082     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2083     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2084     return;
2085   }
2086
2087   stream->OnDataReceived(buffer.Pass());
2088 }
2089
2090 void SpdySession::OnSettings(bool clear_persisted) {
2091   CHECK(in_io_loop_);
2092
2093   if (clear_persisted)
2094     http_server_properties_->ClearSpdySettings(host_port_pair());
2095
2096   if (net_log_.IsLogging()) {
2097     net_log_.AddEvent(
2098         NetLog::TYPE_SPDY_SESSION_RECV_SETTINGS,
2099         base::Bind(&NetLogSpdySettingsCallback, host_port_pair(),
2100                    clear_persisted));
2101   }
2102
2103   if (GetProtocolVersion() >= SPDY4) {
2104     // Send an acknowledgment of the setting.
2105     SpdySettingsIR settings_ir;
2106     settings_ir.set_is_ack(true);
2107     EnqueueSessionWrite(
2108         HIGHEST,
2109         SETTINGS,
2110         scoped_ptr<SpdyFrame>(
2111             buffered_spdy_framer_->SerializeFrame(settings_ir)));
2112   }
2113 }
2114
2115 void SpdySession::OnSetting(SpdySettingsIds id,
2116                             uint8 flags,
2117                             uint32 value) {
2118   CHECK(in_io_loop_);
2119
2120   HandleSetting(id, value);
2121   http_server_properties_->SetSpdySetting(
2122       host_port_pair(),
2123       id,
2124       static_cast<SpdySettingsFlags>(flags),
2125       value);
2126   received_settings_ = true;
2127
2128   // Log the setting.
2129   const SpdyMajorVersion protocol_version = GetProtocolVersion();
2130   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_SETTING,
2131                     base::Bind(&NetLogSpdySettingCallback,
2132                                id,
2133                                protocol_version,
2134                                static_cast<SpdySettingsFlags>(flags),
2135                                value));
2136 }
2137
2138 void SpdySession::OnSendCompressedFrame(
2139     SpdyStreamId stream_id,
2140     SpdyFrameType type,
2141     size_t payload_len,
2142     size_t frame_len) {
2143   if (type != SYN_STREAM && type != HEADERS)
2144     return;
2145
2146   DCHECK(buffered_spdy_framer_.get());
2147   size_t compressed_len =
2148       frame_len - buffered_spdy_framer_->GetSynStreamMinimumSize();
2149
2150   if (payload_len) {
2151     // Make sure we avoid early decimal truncation.
2152     int compression_pct = 100 - (100 * compressed_len) / payload_len;
2153     UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage",
2154                              compression_pct);
2155   }
2156 }
2157
2158 void SpdySession::OnReceiveCompressedFrame(
2159     SpdyStreamId stream_id,
2160     SpdyFrameType type,
2161     size_t frame_len) {
2162   last_compressed_frame_len_ = frame_len;
2163 }
2164
2165 int SpdySession::OnInitialResponseHeadersReceived(
2166     const SpdyHeaderBlock& response_headers,
2167     base::Time response_time,
2168     base::TimeTicks recv_first_byte_time,
2169     SpdyStream* stream) {
2170   CHECK(in_io_loop_);
2171   SpdyStreamId stream_id = stream->stream_id();
2172
2173   if (stream->type() == SPDY_PUSH_STREAM) {
2174     DCHECK(stream->IsReservedRemote());
2175     if (max_concurrent_pushed_streams_ &&
2176         num_active_pushed_streams_ >= max_concurrent_pushed_streams_) {
2177       ResetStream(stream_id,
2178                   RST_STREAM_REFUSED_STREAM,
2179                   "Stream concurrency limit reached.");
2180       return STATUS_CODE_REFUSED_STREAM;
2181     }
2182   }
2183
2184   if (stream->type() == SPDY_PUSH_STREAM) {
2185     // Will be balanced in DeleteStream.
2186     num_active_pushed_streams_++;
2187   }
2188
2189   // May invalidate |stream|.
2190   int rv = stream->OnInitialResponseHeadersReceived(
2191       response_headers, response_time, recv_first_byte_time);
2192   if (rv < 0) {
2193     DCHECK_NE(rv, ERR_IO_PENDING);
2194     DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2195   }
2196
2197   return rv;
2198 }
2199
2200 void SpdySession::OnSynStream(SpdyStreamId stream_id,
2201                               SpdyStreamId associated_stream_id,
2202                               SpdyPriority priority,
2203                               bool fin,
2204                               bool unidirectional,
2205                               const SpdyHeaderBlock& headers) {
2206   CHECK(in_io_loop_);
2207
2208   DCHECK_LE(GetProtocolVersion(), SPDY3);
2209
2210   base::Time response_time = base::Time::Now();
2211   base::TimeTicks recv_first_byte_time = time_func_();
2212
2213   if (net_log_.IsLogging()) {
2214     net_log_.AddEvent(
2215         NetLog::TYPE_SPDY_SESSION_PUSHED_SYN_STREAM,
2216         base::Bind(&NetLogSpdySynStreamReceivedCallback,
2217                    &headers, fin, unidirectional, priority,
2218                    stream_id, associated_stream_id));
2219   }
2220
2221   // Split headers to simulate push promise and response.
2222   SpdyHeaderBlock request_headers;
2223   SpdyHeaderBlock response_headers;
2224   SplitPushedHeadersToRequestAndResponse(
2225       headers, GetProtocolVersion(), &request_headers, &response_headers);
2226
2227   if (!TryCreatePushStream(
2228           stream_id, associated_stream_id, priority, request_headers))
2229     return;
2230
2231   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2232   if (active_it == active_streams_.end()) {
2233     NOTREACHED();
2234     return;
2235   }
2236
2237   if (OnInitialResponseHeadersReceived(response_headers,
2238                                        response_time,
2239                                        recv_first_byte_time,
2240                                        active_it->second.stream) != OK)
2241     return;
2242
2243   base::StatsCounter push_requests("spdy.pushed_streams");
2244   push_requests.Increment();
2245 }
2246
2247 void SpdySession::DeleteExpiredPushedStreams() {
2248   if (unclaimed_pushed_streams_.empty())
2249     return;
2250
2251   // Check that adequate time has elapsed since the last sweep.
2252   if (time_func_() < next_unclaimed_push_stream_sweep_time_)
2253     return;
2254
2255   // Gather old streams to delete.
2256   base::TimeTicks minimum_freshness = time_func_() -
2257       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2258   std::vector<SpdyStreamId> streams_to_close;
2259   for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
2260        it != unclaimed_pushed_streams_.end(); ++it) {
2261     if (minimum_freshness > it->second.creation_time)
2262       streams_to_close.push_back(it->second.stream_id);
2263   }
2264
2265   for (std::vector<SpdyStreamId>::const_iterator to_close_it =
2266            streams_to_close.begin();
2267        to_close_it != streams_to_close.end(); ++to_close_it) {
2268     ActiveStreamMap::iterator active_it = active_streams_.find(*to_close_it);
2269     if (active_it == active_streams_.end())
2270       continue;
2271
2272     LogAbandonedActiveStream(active_it, ERR_INVALID_SPDY_STREAM);
2273     // CloseActiveStreamIterator() will remove the stream from
2274     // |unclaimed_pushed_streams_|.
2275     ResetStreamIterator(
2276         active_it, RST_STREAM_REFUSED_STREAM, "Stream not claimed.");
2277   }
2278
2279   next_unclaimed_push_stream_sweep_time_ = time_func_() +
2280       base::TimeDelta::FromSeconds(kMinPushedStreamLifetimeSeconds);
2281 }
2282
2283 void SpdySession::OnSynReply(SpdyStreamId stream_id,
2284                              bool fin,
2285                              const SpdyHeaderBlock& headers) {
2286   CHECK(in_io_loop_);
2287
2288   base::Time response_time = base::Time::Now();
2289   base::TimeTicks recv_first_byte_time = time_func_();
2290
2291   if (net_log().IsLogging()) {
2292     net_log().AddEvent(
2293         NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
2294         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2295                    &headers, fin, stream_id));
2296   }
2297
2298   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2299   if (it == active_streams_.end()) {
2300     // NOTE:  it may just be that the stream was cancelled.
2301     return;
2302   }
2303
2304   SpdyStream* stream = it->second.stream;
2305   CHECK_EQ(stream->stream_id(), stream_id);
2306
2307   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2308   last_compressed_frame_len_ = 0;
2309
2310   if (GetProtocolVersion() >= SPDY4) {
2311     const std::string& error =
2312         "SPDY4 wasn't expecting SYN_REPLY.";
2313     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2314     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2315     return;
2316   }
2317   if (!it->second.waiting_for_syn_reply) {
2318     const std::string& error =
2319         "Received duplicate SYN_REPLY for stream.";
2320     stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2321     ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2322     return;
2323   }
2324   it->second.waiting_for_syn_reply = false;
2325
2326   ignore_result(OnInitialResponseHeadersReceived(
2327       headers, response_time, recv_first_byte_time, stream));
2328 }
2329
2330 void SpdySession::OnHeaders(SpdyStreamId stream_id,
2331                             bool has_priority,
2332                             SpdyPriority priority,
2333                             bool fin,
2334                             const SpdyHeaderBlock& headers) {
2335   CHECK(in_io_loop_);
2336
2337   if (net_log().IsLogging()) {
2338     net_log().AddEvent(
2339         NetLog::TYPE_SPDY_SESSION_RECV_HEADERS,
2340         base::Bind(&NetLogSpdySynReplyOrHeadersReceivedCallback,
2341                    &headers, fin, stream_id));
2342   }
2343
2344   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2345   if (it == active_streams_.end()) {
2346     // NOTE:  it may just be that the stream was cancelled.
2347     LOG(WARNING) << "Received HEADERS for invalid stream " << stream_id;
2348     return;
2349   }
2350
2351   SpdyStream* stream = it->second.stream;
2352   CHECK_EQ(stream->stream_id(), stream_id);
2353
2354   stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2355   last_compressed_frame_len_ = 0;
2356
2357   base::Time response_time = base::Time::Now();
2358   base::TimeTicks recv_first_byte_time = time_func_();
2359
2360   if (it->second.waiting_for_syn_reply) {
2361     if (GetProtocolVersion() < SPDY4) {
2362       const std::string& error =
2363           "Was expecting SYN_REPLY, not HEADERS.";
2364       stream->LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
2365       ResetStreamIterator(it, RST_STREAM_PROTOCOL_ERROR, error);
2366       return;
2367     }
2368
2369     it->second.waiting_for_syn_reply = false;
2370     ignore_result(OnInitialResponseHeadersReceived(
2371         headers, response_time, recv_first_byte_time, stream));
2372   } else if (it->second.stream->IsReservedRemote()) {
2373     ignore_result(OnInitialResponseHeadersReceived(
2374         headers, response_time, recv_first_byte_time, stream));
2375   } else {
2376     int rv = stream->OnAdditionalResponseHeadersReceived(headers);
2377     if (rv < 0) {
2378       DCHECK_NE(rv, ERR_IO_PENDING);
2379       DCHECK(active_streams_.find(stream_id) == active_streams_.end());
2380     }
2381   }
2382 }
2383
2384 bool SpdySession::OnUnknownFrame(SpdyStreamId stream_id, int frame_type) {
2385   // Validate stream id.
2386   // Was the frame sent on a stream id that has not been used in this session?
2387   if (stream_id % 2 == 1 && stream_id > stream_hi_water_mark_)
2388     return false;
2389
2390   if (stream_id % 2 == 0 && stream_id > last_accepted_push_stream_id_)
2391     return false;
2392
2393   return true;
2394 }
2395
2396 void SpdySession::OnRstStream(SpdyStreamId stream_id,
2397                               SpdyRstStreamStatus status) {
2398   CHECK(in_io_loop_);
2399
2400   std::string description;
2401   net_log().AddEvent(
2402       NetLog::TYPE_SPDY_SESSION_RST_STREAM,
2403       base::Bind(&NetLogSpdyRstCallback,
2404                  stream_id, status, &description));
2405
2406   ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2407   if (it == active_streams_.end()) {
2408     // NOTE:  it may just be that the stream was cancelled.
2409     LOG(WARNING) << "Received RST for invalid stream" << stream_id;
2410     return;
2411   }
2412
2413   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2414
2415   if (status == 0) {
2416     it->second.stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
2417   } else if (status == RST_STREAM_REFUSED_STREAM) {
2418     CloseActiveStreamIterator(it, ERR_SPDY_SERVER_REFUSED_STREAM);
2419   } else {
2420     RecordProtocolErrorHistogram(
2421         PROTOCOL_ERROR_RST_STREAM_FOR_NON_ACTIVE_STREAM);
2422     it->second.stream->LogStreamError(
2423         ERR_SPDY_PROTOCOL_ERROR,
2424         base::StringPrintf("SPDY stream closed with status: %d", status));
2425     // TODO(mbelshe): Map from Spdy-protocol errors to something sensical.
2426     //                For now, it doesn't matter much - it is a protocol error.
2427     CloseActiveStreamIterator(it, ERR_SPDY_PROTOCOL_ERROR);
2428   }
2429 }
2430
2431 void SpdySession::OnGoAway(SpdyStreamId last_accepted_stream_id,
2432                            SpdyGoAwayStatus status) {
2433   CHECK(in_io_loop_);
2434
2435   // TODO(jgraettinger): UMA histogram on |status|.
2436
2437   net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_GOAWAY,
2438       base::Bind(&NetLogSpdyGoAwayCallback,
2439                  last_accepted_stream_id,
2440                  active_streams_.size(),
2441                  unclaimed_pushed_streams_.size(),
2442                  status));
2443   MakeUnavailable();
2444   StartGoingAway(last_accepted_stream_id, ERR_ABORTED);
2445   // This is to handle the case when we already don't have any active
2446   // streams (i.e., StartGoingAway() did nothing). Otherwise, we have
2447   // active streams and so the last one being closed will finish the
2448   // going away process (see DeleteStream()).
2449   MaybeFinishGoingAway();
2450 }
2451
2452 void SpdySession::OnPing(SpdyPingId unique_id, bool is_ack) {
2453   CHECK(in_io_loop_);
2454
2455   net_log_.AddEvent(
2456       NetLog::TYPE_SPDY_SESSION_PING,
2457       base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "received"));
2458
2459   // Send response to a PING from server.
2460   if ((protocol_ >= kProtoSPDY4 && !is_ack) ||
2461       (protocol_ < kProtoSPDY4 && unique_id % 2 == 0)) {
2462     WritePingFrame(unique_id, true);
2463     return;
2464   }
2465
2466   --pings_in_flight_;
2467   if (pings_in_flight_ < 0) {
2468     RecordProtocolErrorHistogram(PROTOCOL_ERROR_UNEXPECTED_PING);
2469     DoDrainSession(ERR_SPDY_PROTOCOL_ERROR, "pings_in_flight_ is < 0.");
2470     pings_in_flight_ = 0;
2471     return;
2472   }
2473
2474   if (pings_in_flight_ > 0)
2475     return;
2476
2477   // We will record RTT in histogram when there are no more client sent
2478   // pings_in_flight_.
2479   RecordPingRTTHistogram(time_func_() - last_ping_sent_time_);
2480 }
2481
2482 void SpdySession::OnWindowUpdate(SpdyStreamId stream_id,
2483                                  uint32 delta_window_size) {
2484   CHECK(in_io_loop_);
2485
2486   DCHECK_LE(delta_window_size, static_cast<uint32>(kint32max));
2487   net_log_.AddEvent(
2488       NetLog::TYPE_SPDY_SESSION_RECEIVED_WINDOW_UPDATE_FRAME,
2489       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2490                  stream_id, delta_window_size));
2491
2492   if (stream_id == kSessionFlowControlStreamId) {
2493     // WINDOW_UPDATE for the session.
2494     if (flow_control_state_ < FLOW_CONTROL_STREAM_AND_SESSION) {
2495       LOG(WARNING) << "Received WINDOW_UPDATE for session when "
2496                    << "session flow control is not turned on";
2497       // TODO(akalin): Record an error and close the session.
2498       return;
2499     }
2500
2501     if (delta_window_size < 1u) {
2502       RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
2503       DoDrainSession(
2504           ERR_SPDY_PROTOCOL_ERROR,
2505           "Received WINDOW_UPDATE with an invalid delta_window_size " +
2506               base::UintToString(delta_window_size));
2507       return;
2508     }
2509
2510     IncreaseSendWindowSize(static_cast<int32>(delta_window_size));
2511   } else {
2512     // WINDOW_UPDATE for a stream.
2513     if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2514       // TODO(akalin): Record an error and close the session.
2515       LOG(WARNING) << "Received WINDOW_UPDATE for stream " << stream_id
2516                    << " when flow control is not turned on";
2517       return;
2518     }
2519
2520     ActiveStreamMap::iterator it = active_streams_.find(stream_id);
2521
2522     if (it == active_streams_.end()) {
2523       // NOTE:  it may just be that the stream was cancelled.
2524       LOG(WARNING) << "Received WINDOW_UPDATE for invalid stream " << stream_id;
2525       return;
2526     }
2527
2528     SpdyStream* stream = it->second.stream;
2529     CHECK_EQ(stream->stream_id(), stream_id);
2530
2531     if (delta_window_size < 1u) {
2532       ResetStreamIterator(it,
2533                           RST_STREAM_FLOW_CONTROL_ERROR,
2534                           base::StringPrintf(
2535                               "Received WINDOW_UPDATE with an invalid "
2536                               "delta_window_size %ud", delta_window_size));
2537       return;
2538     }
2539
2540     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2541     it->second.stream->IncreaseSendWindowSize(
2542         static_cast<int32>(delta_window_size));
2543   }
2544 }
2545
2546 bool SpdySession::TryCreatePushStream(SpdyStreamId stream_id,
2547                                       SpdyStreamId associated_stream_id,
2548                                       SpdyPriority priority,
2549                                       const SpdyHeaderBlock& headers) {
2550   // Server-initiated streams should have even sequence numbers.
2551   if ((stream_id & 0x1) != 0) {
2552     LOG(WARNING) << "Received invalid push stream id " << stream_id;
2553     if (GetProtocolVersion() > SPDY2)
2554       CloseSessionOnError(ERR_SPDY_PROTOCOL_ERROR, "Odd push stream id.");
2555     return false;
2556   }
2557
2558   if (GetProtocolVersion() > SPDY2) {
2559     if (stream_id <= last_accepted_push_stream_id_) {
2560       LOG(WARNING) << "Received push stream id lesser or equal to the last "
2561                    << "accepted before " << stream_id;
2562       CloseSessionOnError(
2563           ERR_SPDY_PROTOCOL_ERROR,
2564           "New push stream id must be greater than the last accepted.");
2565       return false;
2566     }
2567   }
2568
2569   if (IsStreamActive(stream_id)) {
2570     // For SPDY3 and higher we should not get here, we'll start going away
2571     // earlier on |last_seen_push_stream_id_| check.
2572     CHECK_GT(SPDY3, GetProtocolVersion());
2573     LOG(WARNING) << "Received push for active stream " << stream_id;
2574     return false;
2575   }
2576
2577   last_accepted_push_stream_id_ = stream_id;
2578
2579   RequestPriority request_priority =
2580       ConvertSpdyPriorityToRequestPriority(priority, GetProtocolVersion());
2581
2582   if (availability_state_ == STATE_GOING_AWAY) {
2583     // TODO(akalin): This behavior isn't in the SPDY spec, although it
2584     // probably should be.
2585     EnqueueResetStreamFrame(stream_id,
2586                             request_priority,
2587                             RST_STREAM_REFUSED_STREAM,
2588                             "push stream request received when going away");
2589     return false;
2590   }
2591
2592   if (associated_stream_id == 0) {
2593     // In SPDY4 0 stream id in PUSH_PROMISE frame leads to framer error and
2594     // session going away. We should never get here.
2595     CHECK_GT(SPDY4, GetProtocolVersion());
2596     std::string description = base::StringPrintf(
2597         "Received invalid associated stream id %d for pushed stream %d",
2598         associated_stream_id,
2599         stream_id);
2600     EnqueueResetStreamFrame(
2601         stream_id, request_priority, RST_STREAM_REFUSED_STREAM, description);
2602     return false;
2603   }
2604
2605   streams_pushed_count_++;
2606
2607   // TODO(mbelshe): DCHECK that this is a GET method?
2608
2609   // Verify that the response had a URL for us.
2610   GURL gurl = GetUrlFromHeaderBlock(headers, GetProtocolVersion(), true);
2611   if (!gurl.is_valid()) {
2612     EnqueueResetStreamFrame(stream_id,
2613                             request_priority,
2614                             RST_STREAM_PROTOCOL_ERROR,
2615                             "Pushed stream url was invalid: " + gurl.spec());
2616     return false;
2617   }
2618
2619   // Verify we have a valid stream association.
2620   ActiveStreamMap::iterator associated_it =
2621       active_streams_.find(associated_stream_id);
2622   if (associated_it == active_streams_.end()) {
2623     EnqueueResetStreamFrame(
2624         stream_id,
2625         request_priority,
2626         RST_STREAM_INVALID_STREAM,
2627         base::StringPrintf("Received push for inactive associated stream %d",
2628                            associated_stream_id));
2629     return false;
2630   }
2631
2632   // Check that the pushed stream advertises the same origin as its associated
2633   // stream. Bypass this check if and only if this session is with a SPDY proxy
2634   // that is trusted explicitly via the --trusted-spdy-proxy switch.
2635   if (trusted_spdy_proxy_.Equals(host_port_pair())) {
2636     // Disallow pushing of HTTPS content.
2637     if (gurl.SchemeIs("https")) {
2638       EnqueueResetStreamFrame(
2639           stream_id,
2640           request_priority,
2641           RST_STREAM_REFUSED_STREAM,
2642           base::StringPrintf("Rejected push of Cross Origin HTTPS content %d",
2643                              associated_stream_id));
2644     }
2645   } else {
2646     GURL associated_url(associated_it->second.stream->GetUrlFromHeaders());
2647     if (associated_url.GetOrigin() != gurl.GetOrigin()) {
2648       EnqueueResetStreamFrame(
2649           stream_id,
2650           request_priority,
2651           RST_STREAM_REFUSED_STREAM,
2652           base::StringPrintf("Rejected Cross Origin Push Stream %d",
2653                              associated_stream_id));
2654       return false;
2655     }
2656   }
2657
2658   // There should not be an existing pushed stream with the same path.
2659   PushedStreamMap::iterator pushed_it =
2660       unclaimed_pushed_streams_.lower_bound(gurl);
2661   if (pushed_it != unclaimed_pushed_streams_.end() &&
2662       pushed_it->first == gurl) {
2663     EnqueueResetStreamFrame(
2664         stream_id,
2665         request_priority,
2666         RST_STREAM_PROTOCOL_ERROR,
2667         "Received duplicate pushed stream with url: " + gurl.spec());
2668     return false;
2669   }
2670
2671   scoped_ptr<SpdyStream> stream(new SpdyStream(SPDY_PUSH_STREAM,
2672                                                GetWeakPtr(),
2673                                                gurl,
2674                                                request_priority,
2675                                                stream_initial_send_window_size_,
2676                                                stream_initial_recv_window_size_,
2677                                                net_log_));
2678   stream->set_stream_id(stream_id);
2679
2680   // In spdy4/http2 PUSH_PROMISE arrives on associated stream.
2681   if (associated_it != active_streams_.end() && GetProtocolVersion() >= SPDY4) {
2682     associated_it->second.stream->IncrementRawReceivedBytes(
2683         last_compressed_frame_len_);
2684   } else {
2685     stream->IncrementRawReceivedBytes(last_compressed_frame_len_);
2686   }
2687
2688   last_compressed_frame_len_ = 0;
2689
2690   DeleteExpiredPushedStreams();
2691   PushedStreamMap::iterator inserted_pushed_it =
2692       unclaimed_pushed_streams_.insert(
2693           pushed_it,
2694           std::make_pair(gurl, PushedStreamInfo(stream_id, time_func_())));
2695   DCHECK(inserted_pushed_it != pushed_it);
2696
2697   InsertActivatedStream(stream.Pass());
2698
2699   ActiveStreamMap::iterator active_it = active_streams_.find(stream_id);
2700   if (active_it == active_streams_.end()) {
2701     NOTREACHED();
2702     return false;
2703   }
2704
2705   active_it->second.stream->OnPushPromiseHeadersReceived(headers);
2706   DCHECK(active_it->second.stream->IsReservedRemote());
2707   num_pushed_streams_++;
2708   return true;
2709 }
2710
2711 void SpdySession::OnPushPromise(SpdyStreamId stream_id,
2712                                 SpdyStreamId promised_stream_id,
2713                                 const SpdyHeaderBlock& headers) {
2714   CHECK(in_io_loop_);
2715
2716   if (net_log_.IsLogging()) {
2717     net_log_.AddEvent(NetLog::TYPE_SPDY_SESSION_RECV_PUSH_PROMISE,
2718                       base::Bind(&NetLogSpdyPushPromiseReceivedCallback,
2719                                  &headers,
2720                                  stream_id,
2721                                  promised_stream_id));
2722   }
2723
2724   // Any priority will do.
2725   // TODO(baranovich): pass parent stream id priority?
2726   if (!TryCreatePushStream(promised_stream_id, stream_id, 0, headers))
2727     return;
2728
2729   base::StatsCounter push_requests("spdy.pushed_streams");
2730   push_requests.Increment();
2731 }
2732
2733 void SpdySession::SendStreamWindowUpdate(SpdyStreamId stream_id,
2734                                          uint32 delta_window_size) {
2735   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2736   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2737   CHECK(it != active_streams_.end());
2738   CHECK_EQ(it->second.stream->stream_id(), stream_id);
2739   SendWindowUpdateFrame(
2740       stream_id, delta_window_size, it->second.stream->priority());
2741 }
2742
2743 void SpdySession::SendInitialData() {
2744   DCHECK(enable_sending_initial_data_);
2745
2746   if (send_connection_header_prefix_) {
2747     DCHECK_EQ(protocol_, kProtoSPDY4);
2748     scoped_ptr<SpdyFrame> connection_header_prefix_frame(
2749         new SpdyFrame(const_cast<char*>(kHttp2ConnectionHeaderPrefix),
2750                       kHttp2ConnectionHeaderPrefixSize,
2751                       false /* take_ownership */));
2752     // Count the prefix as part of the subsequent SETTINGS frame.
2753     EnqueueSessionWrite(HIGHEST, SETTINGS,
2754                         connection_header_prefix_frame.Pass());
2755   }
2756
2757   // First, notify the server about the settings they should use when
2758   // communicating with us.
2759   SettingsMap settings_map;
2760   // Create a new settings frame notifying the server of our
2761   // max concurrent streams and initial window size.
2762   settings_map[SETTINGS_MAX_CONCURRENT_STREAMS] =
2763       SettingsFlagsAndValue(SETTINGS_FLAG_NONE, kMaxConcurrentPushedStreams);
2764   if (flow_control_state_ >= FLOW_CONTROL_STREAM &&
2765       stream_initial_recv_window_size_ != kSpdyStreamInitialWindowSize) {
2766     settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
2767         SettingsFlagsAndValue(SETTINGS_FLAG_NONE,
2768                               stream_initial_recv_window_size_);
2769   }
2770   SendSettings(settings_map);
2771
2772   // Next, notify the server about our initial recv window size.
2773   if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
2774     // Bump up the receive window size to the real initial value. This
2775     // has to go here since the WINDOW_UPDATE frame sent by
2776     // IncreaseRecvWindowSize() call uses |buffered_spdy_framer_|.
2777     DCHECK_GT(kDefaultInitialRecvWindowSize, session_recv_window_size_);
2778     // This condition implies that |kDefaultInitialRecvWindowSize| -
2779     // |session_recv_window_size_| doesn't overflow.
2780     DCHECK_GT(session_recv_window_size_, 0);
2781     IncreaseRecvWindowSize(
2782         kDefaultInitialRecvWindowSize - session_recv_window_size_);
2783   }
2784
2785   if (protocol_ <= kProtoSPDY31) {
2786     // Finally, notify the server about the settings they have
2787     // previously told us to use when communicating with them (after
2788     // applying them).
2789     const SettingsMap& server_settings_map =
2790         http_server_properties_->GetSpdySettings(host_port_pair());
2791     if (server_settings_map.empty())
2792       return;
2793
2794     SettingsMap::const_iterator it =
2795         server_settings_map.find(SETTINGS_CURRENT_CWND);
2796     uint32 cwnd = (it != server_settings_map.end()) ? it->second.second : 0;
2797     UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwndSent", cwnd, 1, 200, 100);
2798
2799     for (SettingsMap::const_iterator it = server_settings_map.begin();
2800          it != server_settings_map.end(); ++it) {
2801       const SpdySettingsIds new_id = it->first;
2802       const uint32 new_val = it->second.second;
2803       HandleSetting(new_id, new_val);
2804     }
2805
2806     SendSettings(server_settings_map);
2807   }
2808 }
2809
2810
2811 void SpdySession::SendSettings(const SettingsMap& settings) {
2812   const SpdyMajorVersion protocol_version = GetProtocolVersion();
2813   net_log_.AddEvent(
2814       NetLog::TYPE_SPDY_SESSION_SEND_SETTINGS,
2815       base::Bind(&NetLogSpdySendSettingsCallback, &settings, protocol_version));
2816   // Create the SETTINGS frame and send it.
2817   DCHECK(buffered_spdy_framer_.get());
2818   scoped_ptr<SpdyFrame> settings_frame(
2819       buffered_spdy_framer_->CreateSettings(settings));
2820   sent_settings_ = true;
2821   EnqueueSessionWrite(HIGHEST, SETTINGS, settings_frame.Pass());
2822 }
2823
2824 void SpdySession::HandleSetting(uint32 id, uint32 value) {
2825   switch (id) {
2826     case SETTINGS_MAX_CONCURRENT_STREAMS:
2827       max_concurrent_streams_ = std::min(static_cast<size_t>(value),
2828                                          kMaxConcurrentStreamLimit);
2829       ProcessPendingStreamRequests();
2830       break;
2831     case SETTINGS_INITIAL_WINDOW_SIZE: {
2832       if (flow_control_state_ < FLOW_CONTROL_STREAM) {
2833         net_log().AddEvent(
2834             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_NO_FLOW_CONTROL);
2835         return;
2836       }
2837
2838       if (value > static_cast<uint32>(kint32max)) {
2839         net_log().AddEvent(
2840             NetLog::TYPE_SPDY_SESSION_INITIAL_WINDOW_SIZE_OUT_OF_RANGE,
2841             NetLog::IntegerCallback("initial_window_size", value));
2842         return;
2843       }
2844
2845       // SETTINGS_INITIAL_WINDOW_SIZE updates initial_send_window_size_ only.
2846       int32 delta_window_size =
2847           static_cast<int32>(value) - stream_initial_send_window_size_;
2848       stream_initial_send_window_size_ = static_cast<int32>(value);
2849       UpdateStreamsSendWindowSize(delta_window_size);
2850       net_log().AddEvent(
2851           NetLog::TYPE_SPDY_SESSION_UPDATE_STREAMS_SEND_WINDOW_SIZE,
2852           NetLog::IntegerCallback("delta_window_size", delta_window_size));
2853       break;
2854     }
2855   }
2856 }
2857
2858 void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
2859   DCHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2860   for (ActiveStreamMap::iterator it = active_streams_.begin();
2861        it != active_streams_.end(); ++it) {
2862     it->second.stream->AdjustSendWindowSize(delta_window_size);
2863   }
2864
2865   for (CreatedStreamSet::const_iterator it = created_streams_.begin();
2866        it != created_streams_.end(); it++) {
2867     (*it)->AdjustSendWindowSize(delta_window_size);
2868   }
2869 }
2870
2871 void SpdySession::SendPrefacePingIfNoneInFlight() {
2872   if (pings_in_flight_ || !enable_ping_based_connection_checking_)
2873     return;
2874
2875   base::TimeTicks now = time_func_();
2876   // If there is no activity in the session, then send a preface-PING.
2877   if ((now - last_activity_time_) > connection_at_risk_of_loss_time_)
2878     SendPrefacePing();
2879 }
2880
2881 void SpdySession::SendPrefacePing() {
2882   WritePingFrame(next_ping_id_, false);
2883 }
2884
2885 void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
2886                                         uint32 delta_window_size,
2887                                         RequestPriority priority) {
2888   CHECK_GE(flow_control_state_, FLOW_CONTROL_STREAM);
2889   ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
2890   if (it != active_streams_.end()) {
2891     CHECK_EQ(it->second.stream->stream_id(), stream_id);
2892   } else {
2893     CHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
2894     CHECK_EQ(stream_id, kSessionFlowControlStreamId);
2895   }
2896
2897   net_log_.AddEvent(
2898       NetLog::TYPE_SPDY_SESSION_SENT_WINDOW_UPDATE_FRAME,
2899       base::Bind(&NetLogSpdyWindowUpdateFrameCallback,
2900                  stream_id, delta_window_size));
2901
2902   DCHECK(buffered_spdy_framer_.get());
2903   scoped_ptr<SpdyFrame> window_update_frame(
2904       buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
2905   EnqueueSessionWrite(priority, WINDOW_UPDATE, window_update_frame.Pass());
2906 }
2907
2908 void SpdySession::WritePingFrame(uint32 unique_id, bool is_ack) {
2909   DCHECK(buffered_spdy_framer_.get());
2910   scoped_ptr<SpdyFrame> ping_frame(
2911       buffered_spdy_framer_->CreatePingFrame(unique_id, is_ack));
2912   EnqueueSessionWrite(HIGHEST, PING, ping_frame.Pass());
2913
2914   if (net_log().IsLogging()) {
2915     net_log().AddEvent(
2916         NetLog::TYPE_SPDY_SESSION_PING,
2917         base::Bind(&NetLogSpdyPingCallback, unique_id, is_ack, "sent"));
2918   }
2919   if (!is_ack) {
2920     next_ping_id_ += 2;
2921     ++pings_in_flight_;
2922     PlanToCheckPingStatus();
2923     last_ping_sent_time_ = time_func_();
2924   }
2925 }
2926
2927 void SpdySession::PlanToCheckPingStatus() {
2928   if (check_ping_status_pending_)
2929     return;
2930
2931   check_ping_status_pending_ = true;
2932   base::MessageLoop::current()->PostDelayedTask(
2933       FROM_HERE,
2934       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2935                  time_func_()), hung_interval_);
2936 }
2937
2938 void SpdySession::CheckPingStatus(base::TimeTicks last_check_time) {
2939   CHECK(!in_io_loop_);
2940
2941   // Check if we got a response back for all PINGs we had sent.
2942   if (pings_in_flight_ == 0) {
2943     check_ping_status_pending_ = false;
2944     return;
2945   }
2946
2947   DCHECK(check_ping_status_pending_);
2948
2949   base::TimeTicks now = time_func_();
2950   base::TimeDelta delay = hung_interval_ - (now - last_activity_time_);
2951
2952   if (delay.InMilliseconds() < 0 || last_activity_time_ < last_check_time) {
2953     // Track all failed PING messages in a separate bucket.
2954     RecordPingRTTHistogram(base::TimeDelta::Max());
2955     DoDrainSession(ERR_SPDY_PING_FAILED, "Failed ping.");
2956     return;
2957   }
2958
2959   // Check the status of connection after a delay.
2960   base::MessageLoop::current()->PostDelayedTask(
2961       FROM_HERE,
2962       base::Bind(&SpdySession::CheckPingStatus, weak_factory_.GetWeakPtr(),
2963                  now),
2964       delay);
2965 }
2966
2967 void SpdySession::RecordPingRTTHistogram(base::TimeDelta duration) {
2968   UMA_HISTOGRAM_TIMES("Net.SpdyPing.RTT", duration);
2969 }
2970
2971 void SpdySession::RecordProtocolErrorHistogram(
2972     SpdyProtocolErrorDetails details) {
2973   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails2", details,
2974                             NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2975   if (EndsWith(host_port_pair().host(), "google.com", false)) {
2976     UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionErrorDetails_Google2", details,
2977                               NUM_SPDY_PROTOCOL_ERROR_DETAILS);
2978   }
2979 }
2980
2981 void SpdySession::RecordHistograms() {
2982   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPerSession",
2983                               streams_initiated_count_,
2984                               0, 300, 50);
2985   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedPerSession",
2986                               streams_pushed_count_,
2987                               0, 300, 50);
2988   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsPushedAndClaimedPerSession",
2989                               streams_pushed_and_claimed_count_,
2990                               0, 300, 50);
2991   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamsAbandonedPerSession",
2992                               streams_abandoned_count_,
2993                               0, 300, 50);
2994   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsSent",
2995                             sent_settings_ ? 1 : 0, 2);
2996   UMA_HISTOGRAM_ENUMERATION("Net.SpdySettingsReceived",
2997                             received_settings_ ? 1 : 0, 2);
2998   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyStreamStallsPerSession",
2999                               stalled_streams_,
3000                               0, 300, 50);
3001   UMA_HISTOGRAM_ENUMERATION("Net.SpdySessionsWithStalls",
3002                             stalled_streams_ > 0 ? 1 : 0, 2);
3003
3004   if (received_settings_) {
3005     // Enumerate the saved settings, and set histograms for it.
3006     const SettingsMap& settings_map =
3007         http_server_properties_->GetSpdySettings(host_port_pair());
3008
3009     SettingsMap::const_iterator it;
3010     for (it = settings_map.begin(); it != settings_map.end(); ++it) {
3011       const SpdySettingsIds id = it->first;
3012       const uint32 val = it->second.second;
3013       switch (id) {
3014         case SETTINGS_CURRENT_CWND:
3015           // Record several different histograms to see if cwnd converges
3016           // for larger volumes of data being sent.
3017           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd",
3018                                       val, 1, 200, 100);
3019           if (total_bytes_received_ > 10 * 1024) {
3020             UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd10K",
3021                                         val, 1, 200, 100);
3022             if (total_bytes_received_ > 25 * 1024) {
3023               UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd25K",
3024                                           val, 1, 200, 100);
3025               if (total_bytes_received_ > 50 * 1024) {
3026                 UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd50K",
3027                                             val, 1, 200, 100);
3028                 if (total_bytes_received_ > 100 * 1024) {
3029                   UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsCwnd100K",
3030                                               val, 1, 200, 100);
3031                 }
3032               }
3033             }
3034           }
3035           break;
3036         case SETTINGS_ROUND_TRIP_TIME:
3037           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRTT",
3038                                       val, 1, 1200, 100);
3039           break;
3040         case SETTINGS_DOWNLOAD_RETRANS_RATE:
3041           UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdySettingsRetransRate",
3042                                       val, 1, 100, 50);
3043           break;
3044         default:
3045           break;
3046       }
3047     }
3048   }
3049 }
3050
3051 void SpdySession::CompleteStreamRequest(
3052     const base::WeakPtr<SpdyStreamRequest>& pending_request) {
3053   // Abort if the request has already been cancelled.
3054   if (!pending_request)
3055     return;
3056
3057   base::WeakPtr<SpdyStream> stream;
3058   int rv = TryCreateStream(pending_request, &stream);
3059
3060   if (rv == OK) {
3061     DCHECK(stream);
3062     pending_request->OnRequestCompleteSuccess(stream);
3063     return;
3064   }
3065   DCHECK(!stream);
3066
3067   if (rv != ERR_IO_PENDING) {
3068     pending_request->OnRequestCompleteFailure(rv);
3069   }
3070 }
3071
3072 SSLClientSocket* SpdySession::GetSSLClientSocket() const {
3073   if (!is_secure_)
3074     return NULL;
3075   SSLClientSocket* ssl_socket =
3076       reinterpret_cast<SSLClientSocket*>(connection_->socket());
3077   DCHECK(ssl_socket);
3078   return ssl_socket;
3079 }
3080
3081 void SpdySession::OnWriteBufferConsumed(
3082     size_t frame_payload_size,
3083     size_t consume_size,
3084     SpdyBuffer::ConsumeSource consume_source) {
3085   // We can be called with |in_io_loop_| set if a write SpdyBuffer is
3086   // deleted (e.g., a stream is closed due to incoming data).
3087
3088   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3089
3090   if (consume_source == SpdyBuffer::DISCARD) {
3091     // If we're discarding a frame or part of it, increase the send
3092     // window by the number of discarded bytes. (Although if we're
3093     // discarding part of a frame, it's probably because of a write
3094     // error and we'll be tearing down the session soon.)
3095     size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
3096     DCHECK_GT(remaining_payload_bytes, 0u);
3097     IncreaseSendWindowSize(static_cast<int32>(remaining_payload_bytes));
3098   }
3099   // For consumed bytes, the send window is increased when we receive
3100   // a WINDOW_UPDATE frame.
3101 }
3102
3103 void SpdySession::IncreaseSendWindowSize(int32 delta_window_size) {
3104   // We can be called with |in_io_loop_| set if a SpdyBuffer is
3105   // deleted (e.g., a stream is closed due to incoming data).
3106
3107   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3108   DCHECK_GE(delta_window_size, 1);
3109
3110   // Check for overflow.
3111   int32 max_delta_window_size = kint32max - session_send_window_size_;
3112   if (delta_window_size > max_delta_window_size) {
3113     RecordProtocolErrorHistogram(PROTOCOL_ERROR_INVALID_WINDOW_UPDATE_SIZE);
3114     DoDrainSession(
3115         ERR_SPDY_PROTOCOL_ERROR,
3116         "Received WINDOW_UPDATE [delta: " +
3117             base::IntToString(delta_window_size) +
3118             "] for session overflows session_send_window_size_ [current: " +
3119             base::IntToString(session_send_window_size_) + "]");
3120     return;
3121   }
3122
3123   session_send_window_size_ += delta_window_size;
3124
3125   net_log_.AddEvent(
3126       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3127       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3128                  delta_window_size, session_send_window_size_));
3129
3130   DCHECK(!IsSendStalled());
3131   ResumeSendStalledStreams();
3132 }
3133
3134 void SpdySession::DecreaseSendWindowSize(int32 delta_window_size) {
3135   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3136
3137   // We only call this method when sending a frame. Therefore,
3138   // |delta_window_size| should be within the valid frame size range.
3139   DCHECK_GE(delta_window_size, 1);
3140   DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
3141
3142   // |send_window_size_| should have been at least |delta_window_size| for
3143   // this call to happen.
3144   DCHECK_GE(session_send_window_size_, delta_window_size);
3145
3146   session_send_window_size_ -= delta_window_size;
3147
3148   net_log_.AddEvent(
3149       NetLog::TYPE_SPDY_SESSION_UPDATE_SEND_WINDOW,
3150       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3151                  -delta_window_size, session_send_window_size_));
3152 }
3153
3154 void SpdySession::OnReadBufferConsumed(
3155     size_t consume_size,
3156     SpdyBuffer::ConsumeSource consume_source) {
3157   // We can be called with |in_io_loop_| set if a read SpdyBuffer is
3158   // deleted (e.g., discarded by a SpdyReadQueue).
3159
3160   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3161   DCHECK_GE(consume_size, 1u);
3162   DCHECK_LE(consume_size, static_cast<size_t>(kint32max));
3163
3164   IncreaseRecvWindowSize(static_cast<int32>(consume_size));
3165 }
3166
3167 void SpdySession::IncreaseRecvWindowSize(int32 delta_window_size) {
3168   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3169   DCHECK_GE(session_unacked_recv_window_bytes_, 0);
3170   DCHECK_GE(session_recv_window_size_, session_unacked_recv_window_bytes_);
3171   DCHECK_GE(delta_window_size, 1);
3172   // Check for overflow.
3173   DCHECK_LE(delta_window_size, kint32max - session_recv_window_size_);
3174
3175   session_recv_window_size_ += delta_window_size;
3176   net_log_.AddEvent(
3177       NetLog::TYPE_SPDY_STREAM_UPDATE_RECV_WINDOW,
3178       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3179                  delta_window_size, session_recv_window_size_));
3180
3181   session_unacked_recv_window_bytes_ += delta_window_size;
3182   if (session_unacked_recv_window_bytes_ > kSpdySessionInitialWindowSize / 2) {
3183     SendWindowUpdateFrame(kSessionFlowControlStreamId,
3184                           session_unacked_recv_window_bytes_,
3185                           HIGHEST);
3186     session_unacked_recv_window_bytes_ = 0;
3187   }
3188 }
3189
3190 void SpdySession::DecreaseRecvWindowSize(int32 delta_window_size) {
3191   CHECK(in_io_loop_);
3192   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3193   DCHECK_GE(delta_window_size, 1);
3194
3195   // Since we never decrease the initial receive window size,
3196   // |delta_window_size| should never cause |recv_window_size_| to go
3197   // negative. If we do, the receive window isn't being respected.
3198   if (delta_window_size > session_recv_window_size_) {
3199     RecordProtocolErrorHistogram(PROTOCOL_ERROR_RECEIVE_WINDOW_VIOLATION);
3200     DoDrainSession(
3201         ERR_SPDY_FLOW_CONTROL_ERROR,
3202         "delta_window_size is " + base::IntToString(delta_window_size) +
3203             " in DecreaseRecvWindowSize, which is larger than the receive " +
3204             "window size of " + base::IntToString(session_recv_window_size_));
3205     return;
3206   }
3207
3208   session_recv_window_size_ -= delta_window_size;
3209   net_log_.AddEvent(
3210       NetLog::TYPE_SPDY_SESSION_UPDATE_RECV_WINDOW,
3211       base::Bind(&NetLogSpdySessionWindowUpdateCallback,
3212                  -delta_window_size, session_recv_window_size_));
3213 }
3214
3215 void SpdySession::QueueSendStalledStream(const SpdyStream& stream) {
3216   DCHECK(stream.send_stalled_by_flow_control());
3217   RequestPriority priority = stream.priority();
3218   CHECK_GE(priority, MINIMUM_PRIORITY);
3219   CHECK_LE(priority, MAXIMUM_PRIORITY);
3220   stream_send_unstall_queue_[priority].push_back(stream.stream_id());
3221 }
3222
3223 void SpdySession::ResumeSendStalledStreams() {
3224   DCHECK_EQ(flow_control_state_, FLOW_CONTROL_STREAM_AND_SESSION);
3225
3226   // We don't have to worry about new streams being queued, since
3227   // doing so would cause IsSendStalled() to return true. But we do
3228   // have to worry about streams being closed, as well as ourselves
3229   // being closed.
3230
3231   while (!IsSendStalled()) {
3232     size_t old_size = 0;
3233 #if DCHECK_IS_ON
3234     old_size = GetTotalSize(stream_send_unstall_queue_);
3235 #endif
3236
3237     SpdyStreamId stream_id = PopStreamToPossiblyResume();
3238     if (stream_id == 0)
3239       break;
3240     ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
3241     // The stream may actually still be send-stalled after this (due
3242     // to its own send window) but that's okay -- it'll then be
3243     // resumed once its send window increases.
3244     if (it != active_streams_.end())
3245       it->second.stream->PossiblyResumeIfSendStalled();
3246
3247     // The size should decrease unless we got send-stalled again.
3248     if (!IsSendStalled())
3249       DCHECK_LT(GetTotalSize(stream_send_unstall_queue_), old_size);
3250   }
3251 }
3252
3253 SpdyStreamId SpdySession::PopStreamToPossiblyResume() {
3254   for (int i = MAXIMUM_PRIORITY; i >= MINIMUM_PRIORITY; --i) {
3255     std::deque<SpdyStreamId>* queue = &stream_send_unstall_queue_[i];
3256     if (!queue->empty()) {
3257       SpdyStreamId stream_id = queue->front();
3258       queue->pop_front();
3259       return stream_id;
3260     }
3261   }
3262   return 0;
3263 }
3264
3265 }  // namespace net